In [1]:
import os, joblib
from pyspark.sql import SparkSession
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score

In [2]:
train_path = f"hdfs://namenode:9000/data/train.csv"
test_path  = f"hdfs://namenode:9000/data/test.csv"

id_col = "SK_ID_CURR"
label_col = "TARGET"

model_dir = "/opt/model"
os.makedirs(model_dir, exist_ok=True)

JDBC_URL = "jdbc:postgresql://postgres:5432/finrisk"
JDBC_PROPS = {"driver":"org.postgresql.Driver",
              "user":"finuser","password":"finpass"}

In [3]:
spark = (
    SparkSession.builder
    .appName("train_models")
    .config("spark.hadoop.fs.defaultFS","hdfs://namenode:9000" )
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/14 14:42:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
train_sp = spark.read.csv(train_path, header=True, inferSchema=True)
test_sp  = spark.read.csv(test_path, header=True, inferSchema=True)


                                                                                

In [5]:
train = train_sp.toPandas()
test  = test_sp.toPandas()

25/11/14 14:42:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [6]:
train.dtypes

SK_ID_CURR                                int32
TARGET                                    int32
NAME_CONTRACT_TYPE                       object
CODE_GENDER                              object
FLAG_OWN_CAR                             object
                                         ...   
cc_bal_AMT_PAYMENT_TOTAL_CURRENT_max    float64
cc_bal_AMT_PAYMENT_TOTAL_CURRENT_sum    float64
cc_bal_SK_DPD_mean                      float64
cc_bal_SK_DPD_max                       float64
cc_bal_SK_DPD_sum                       float64
Length: 172, dtype: object

In [7]:
y = train[label_col]
y

0         1
1         0
2         0
3         0
4         0
         ..
307506    0
307507    0
307508    0
307509    1
307510    0
Name: TARGET, Length: 307511, dtype: int32

In [8]:
X_train = train.drop(columns = label_col)

In [9]:
X_test = test.copy()

In [10]:
train_id = train[id_col].values
test_id = test[id_col].values

In [11]:
X_train = X_train.drop(columns = id_col)

In [12]:
X_test = X_test.drop(columns = id_col)

In [13]:
cat_list = X_train.select_dtypes(include = ["object"]).columns.to_list()

In [14]:
num_list = [num for num in X_train.columns if num not in cat_list]

In [15]:
numeric_tf = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

categorical_tf = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("ohe", OneHotEncoder(handle_unknown="ignore"))
])

preprocess = ColumnTransformer(
    transformers=[
        ("num", numeric_tf, num_list),
        ("cat", categorical_tf, cat_list),
    ],
    remainder="drop"
)


In [16]:
X_train, X_test, y_train, y_test = train_test_split(X_train, y, test_size = 0.2, stratify = y, random_state = 1)

In [17]:
models = {
    "logistic": LogisticRegression(
        random_state = 1,
        max_iter=3000,
        class_weight="balanced",
        n_jobs=-1
    ),

    "random_forest": RandomForestClassifier(
        random_state = 1,
        class_weight="balanced",
        n_jobs=-1
    ),
    
    "xgboost": XGBClassifier(
        random_state = 1,
        objective="binary:logistic",
        eval_metric = "logloss",
        n_job = -1
    )
}

In [20]:
param_grids = {
    "logistic": {
        "clf__C": [0.1, 1, 10],
        "clf__solver": ["liblinear", "lbfgs"],
    },
    "random_forest": {
        "clf__n_estimators": [100, 200],
        "clf__max_depth": [8, 12, 16, None],
        "clf__min_samples_split" : [2,5,10],
        "clf__min_samples_leaf" : [1,2,4]
    },
    "xgboost": {
        "clf__n_estimators": [100, 200],
        "clf__max_depth": [3, 5, 7],
        "clf__learning_rate": [0.05, 0.1, 0.2],
        "clf__subsample": [0.8, 1.0],
        "clf__colsample_bytree" : [0.7 , 1]
    }
}

In [2]:
results = {}
pipelines = {}

for name, clf in models.items():

    pipe = Pipeline(steps=[
        ("prep", preprocess),
        ("clf", clf)
    ])

    grid = GridSearchCV(pipe,
                        param_grids[name],
                        cv=3,
                        scoring="roc_auc",
                        n_jobs=-1,
                        verbose=1)

    grid.fit(X_train, y_train)

    best_model = grid.best_estimator_
    pipelines[name] = best_model

    y_pred = best_model.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, y_pred)
    results[name] = auc

    print(f"Best params: {grid.best_params_}")
    print(f"AUC({name}) = {auc:.4f}")

    joblib.dump(best_model, f"{MODEL_DIR}/{name}_model_tuned.pkl")
    print(f"Saved: {MODEL_DIR}/{name}_model_tuned.pkl")

NameError: name 'models' is not defined

In [1]:
print("\n===== SUMMARY AUC =====")
for k, v in results.items():
    print(f"{k}: {v:.4f}")

best_model_name = max(results, key=results.get)
best_model = pipelines[best_model_name]

print(f"\n BEST MODEL = {best_model_name.upper()} (AUC={results[best_model_name]:.4f})")



===== SUMMARY AUC =====


NameError: name 'results' is not defined

In [None]:
train_proba = best_model.predict_proba(X_train)[:, 1]
test_proba  = best_model.predict_proba(X_test)[:, 1]

train_out = pd.DataFrame({
    ID_COL: train_id,
    LABEL_COL: y.values,
    "pd_1": train_proba
})

test_out = pd.DataFrame({
    ID_COL: test_id,
    "pd_1": test_proba
})

In [None]:
sdf_train_out = spark.createDataFrame(train_out)
sdf_test_out  = spark.createDataFrame(test_out)

sdf_train_out.write.mode("overwrite").jdbc(
    url=JDBC_URL, table="public.sklearn_train_scores", properties=JDBC_PROPS)

sdf_test_out.write.mode("overwrite").jdbc(
    url=JDBC_URL, table="public.sklearn_test_scores", properties=JDBC_PROPS)

print("\n TRAINING DONE")

spark.stop()