# 01 - Customer Profiles (Hung)
Baseline: PySpark Logistic Regression (khong dung 'duration' de tranh leakage).

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pathlib import Path
import json, os

In [2]:
spark = SparkSession.builder.appName('bdml_baseline_hung').getOrCreate()
# Tim duong dan tuyet doi toi file du lieu bat ke CWD hien tai la gi
rel = Path('data/bank-additional/bank-additional-full.csv')
data_path = None
# Uu tien PROJECT_ROOT neu duoc cai dat
if os.environ.get('PROJECT_ROOT'):
    cand = Path(os.environ['PROJECT_ROOT']) / rel
    if cand.exists():
        data_path = str(cand.resolve())
# Thu lan luot CWD va cac thu muc cha
if data_path is None:
    cwd = Path.cwd()
    for base in [cwd] + list(cwd.parents):
        cand = base / rel
        if cand.exists():
            data_path = str(cand.resolve())
            break
if data_path is None:
    raise FileNotFoundError(f'Khong tim thay {rel} tu {Path.cwd()} — vui long kiem tra cau truc thu muc')

df = spark.read.csv(data_path, header=True, sep=';', inferSchema=True)
# Tao nhan 'label' va chi dung dac diem khach hang (personal)
df = df.withColumn('label', when(col('y')=='yes', lit(1.0)).otherwise(lit(0.0)))
cols_cat = ['job','marital','education','default','housing','loan']
cols_num = ['age']
# Tranh leakage: khong dua 'duration' vao features
selected = cols_num + cols_cat + ['label']
df = df.select(*selected).dropna()
train, test = df.randomSplit([0.8, 0.2], seed=42)
pos = train.filter(col('label')==1.0).count(); neg = train.filter(col('label')==0.0).count(); tot = pos+neg
w_pos = float(tot)/(2.0*pos) if pos>0 else 1.0; w_neg = float(tot)/(2.0*neg) if neg>0 else 1.0
train = train.withColumn('weight', when(col('label')==1.0, lit(w_pos)).otherwise(lit(w_neg)))

25/10/15 16:42:44 WARN Utils: Your hostname, hung resolves to a loopback address: 127.0.1.1; using 192.168.1.29 instead (on interface wlp0s20f3)
25/10/15 16:42:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/15 16:42:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
indexers = [StringIndexer(inputCol=c, outputCol=f'{c}_idx', handleInvalid='keep') for c in cols_cat]
enc = OneHotEncoder(inputCols=[f'{c}_idx' for c in cols_cat], outputCols=[f'{c}_ohe' for c in cols_cat])
assembler = VectorAssembler(inputCols=cols_num + [f'{c}_ohe' for c in cols_cat], outputCol='features')
lr = LogisticRegression(featuresCol='features', labelCol='label', weightCol='weight', maxIter=50, regParam=0.01, elasticNetParam=0.0)
pipeline = Pipeline(stages=indexers + [enc, assembler, lr])
model = pipeline.fit(train)
pred = model.transform(test)

                                                                                

In [4]:
e_roc = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
e_pr = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderPR')
e_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
roc = e_roc.evaluate(pred); pr = e_pr.evaluate(pred); f1 = e_f1.evaluate(pred)
cm = pred.groupBy('label','prediction').count().orderBy('label','prediction')
print('AUC-ROC:', roc); print('AUC-PR:', pr); print('F1:', f1); cm.show()

AUC-ROC: 0.6584022402684366
AUC-PR: 0.19510756061821388
F1: 0.6665755868454523
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0| 4266|
|  0.0|       1.0| 3060|
|  1.0|       0.0|  320|
|  1.0|       1.0|  532|
+-----+----------+-----+



In [5]:
# Xac dinh PROJECT_ROOT de luu artifacts dung thu muc goc repo
proj = None
if os.environ.get('PROJECT_ROOT') and Path(os.environ['PROJECT_ROOT']).exists():
    proj = Path(os.environ['PROJECT_ROOT'])
if proj is None:
    cwd = Path.cwd()
    for base in [cwd] + list(cwd.parents):
        if (base/'AGENTS.md').exists() or (base/'README.md').exists():
            proj = base; break
if proj is None:
    proj = Path.cwd()
artifacts_dir = proj / 'artifacts'
artifacts_dir.mkdir(parents=True, exist_ok=True)

# Tinh prevalence va quet cac nguong xac suat cho lop duong
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
get_p1 = udf(lambda v: float(v[1]), DoubleType())
pp = pred.withColumn('p1', get_p1(col('probability')))
total = pred.count()
pos = pred.filter(col('label')==1.0).count()
prevalence = (pos/total) if total else 0.0
thresholds = [0.3, 0.4, 0.5, 0.6, 0.7]
best = {'threshold': None, 'precision': 0.0, 'recall': 0.0, 'f1': 0.0}
for t in thresholds:
    m = pp.withColumn('hat', (col('p1') >= lit(float(t))).cast('double'))
    tp = m.filter((col('label')==1.0) & (col('hat')==1.0)).count()
    fp = m.filter((col('label')==0.0) & (col('hat')==1.0)).count()
    fn = m.filter((col('label')==1.0) & (col('hat')==0.0)).count()
    prec = (tp/(tp+fp)) if (tp+fp)>0 else 0.0
    rec = (tp/(tp+fn)) if (tp+fn)>0 else 0.0
    f1_t = (2*prec*rec/(prec+rec)) if (prec+rec)>0 else 0.0
    if f1_t > best['f1']:
        best = {'threshold': float(t), 'precision': float(prec), 'recall': float(rec), 'f1': float(f1_t)}
print('Prevalence:', prevalence)
print('Best threshold by F1:', best)

# Ghi metrics va model
metrics = {
    'auc_roc': float(roc),
    'auc_pr': float(pr),
    'f1_macro': float(f1),
    'class_prevalence': float(prevalence),
    'best_threshold_by_f1': best,
    'w_pos': float(w_pos),
    'w_neg': float(w_neg)
}
with open(artifacts_dir / 'hung_lr_metrics.json', 'w') as f:
    json.dump(metrics, f, indent=2)
model.write().overwrite().save(str(artifacts_dir / 'hung_lr_model'))
print(f'Saved metrics and model to {artifacts_dir}/')

                                                                                

Prevalence: 0.10418195157740279
Best threshold by F1: {'threshold': 0.5, 'precision': 0.14810690423162584, 'recall': 0.6244131455399061, 'f1': 0.23942394239423942}
Saved metrics and model to /home/hungfnguyen/Documents/1-bigdata-ml/marketing-bank-prediction/artifacts/
