# LendingClub Pipeline
Este notebook integra los agentes principales.

## fetch

In [None]:

"""Download raw LendingClub data from Kaggle with basic validation."""

from pathlib import Path
import os
import zipfile

from dotenv import load_dotenv
from kaggle import api
from kaggle.rest import ApiException


def main() -> None:
    load_dotenv()

    dataset = os.environ.get("KAGGLE_DATASET")
    username = os.environ.get("KAGGLE_USERNAME")
    key = os.environ.get("KAGGLE_KEY")
    file_name = os.environ.get("KAGGLE_FILE", "Loan_status_2007-2020Q3.gzip")

    raw_dir = Path("data/raw")
    raw_dir.mkdir(parents=True, exist_ok=True)
    out_path = raw_dir / file_name

    # Use local copy if present to avoid hitting Kaggle unnecessarily
    if out_path.exists():
        print(f"Using cached dataset at {out_path}")
        return

    if not dataset or not username or not key:
        raise EnvironmentError(
            "Kaggle credentials or dataset not configured.\n"
            "Please set KAGGLE_USERNAME, KAGGLE_KEY and KAGGLE_DATASET in the .env file."
        )

    api.authenticate()
    try:
        api.dataset_download_file(dataset, file_name, path=str(raw_dir), force=True)
    except ApiException as e:
        raise RuntimeError(
            "Failed to download dataset from Kaggle. Check your credentials and dataset permissions."
        ) from e
    zip_path = raw_dir / f"{file_name}.zip"
    if zip_path.exists():
        with zipfile.ZipFile(zip_path) as z:
            z.extractall(path=raw_dir)
        zip_path.unlink()



if __name__ == "__main__":
    main()


## prep

In [None]:

"""Clean raw LendingClub data and generate a processed dataset."""

from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from src.utils.spark import get_spark
from pathlib import Path
from dotenv import load_dotenv
from src.utils.metrics import dump_metrics
import unicodedata
import os



def winsorize(df, cols, lower=0.01, upper=0.99):
    for c in cols:
        q = df.approxQuantile(c, [lower, upper], 0.05)
        df = df.withColumn(
            c,
            F.when(F.col(c) < q[0], q[0])
            .when(F.col(c) > q[1], q[1])
            .otherwise(F.col(c)),
        )
    return df


def _normalize(text: str) -> str:
    if text is None:
        return text
    text = unicodedata.normalize("NFKD", text)
    text = "".join(ch for ch in text if not unicodedata.combining(ch))
    return text.lower()



def top_k(df, col, k=100):
    cats = (
        df.groupBy(col)
        .count()
        .orderBy(F.desc("count"))
        .limit(k)
        .collect()
    )
    cats = [r[0] for r in cats]
    return df.withColumn(col, F.when(F.col(col).isin(cats), F.col(col)).otherwise("other"))


FAST = os.getenv("FAST_MODE", "false").lower() == "true"


def main() -> None:
    load_dotenv()
    spark = get_spark("prep")
    src = Path(os.environ.get("RAW_DATA", "data/raw/Loan_status_2007-2020Q3.gzip"))
    proc_dir = Path("data/processed")
    proc_dir.mkdir(parents=True, exist_ok=True)
    out_file = proc_dir / ("M_fast.parquet" if FAST else "M_full.parquet")

    raw_dir = src.parent
    parts = sorted(raw_dir.glob("loan_data_2007_2020Q*.csv"))
    if len(parts) >= 3:
        df = (
            spark.read.option("header", "true").option("inferSchema", "true").csv([str(p) for p in parts])
        )
    else:
        df = (
            spark.read.option("header", "true").option("inferSchema", "true").option("compression", "gzip").csv(str(src))
        )
    if FAST:
        sample_frac = float(os.getenv("SAMPLE_FRACTION", "0.05"))
        df = df.sample(fraction=sample_frac, seed=42)
    if "_c0" in df.columns:
        df = df.drop("_c0")

    categorical_vars = [
        "term", "grade", "emp_length", "home_ownership",
        "verification_status", "purpose", "loan_status",
    ]

    numerical_vars = [
        "loan_amnt", "int_rate", "installment", "fico_range_low", "fico_range_high",
        "annual_inc", "dti", "open_acc", "total_acc", "revol_bal", "revol_util",
    ]

    percent_cols = ["int_rate", "revol_util"]
    for c in percent_cols:
        df = df.withColumn(c, F.regexp_replace(c, "%", "").cast("double"))
    for c in set(numerical_vars) - set(percent_cols):
        df = df.withColumn(c, F.col(c).cast("double"))

    num_cols = [c for c in df.columns if c in numerical_vars]
    cat_cols = [c for c in df.columns if c in categorical_vars]
    medians = {c: df.approxQuantile(c, [0.5], 0.05)[0] for c in num_cols}
    df = df.fillna(medians)
    df = df.fillna("missing", subset=cat_cols)

    df = df.withColumn("grade_status", F.concat_ws("_", "grade", "loan_status"))
    df = df.withColumn(
        "default_flag",
        F.when(F.col("loan_status").isin("Charged Off", "Default"), 1).otherwise(0),
    )

    df = winsorize(df, ["annual_inc", "dti", "loan_amnt", "int_rate", "revol_util"])

    df = df.withColumn("loan_to_income", F.col("loan_amnt") / (F.col("annual_inc") + F.lit(1)))
    issue_year = F.year(F.to_date(F.concat(F.lit("01-"), F.col("issue_d")), "dd-MMM-yyyy"))
    earliest_year = F.year(F.to_date(F.concat(F.lit("01-"), F.col("earliest_cr_line")), "dd-MMM-yyyy"))
    df = df.withColumn("credit_age", issue_year - earliest_year)

    norm_udf = F.udf(_normalize, StringType())
    df = df.withColumn("emp_title", norm_udf(F.col("emp_title")))

    for c in cat_cols:
        df = top_k(df, c, 100)

    df.write.mode("overwrite").parquet(str(out_file))

    dump_metrics("prep", {"rows": df.count(), "fast": FAST})




if __name__ == "__main__":
    main()


## register

In [None]:
"""Register the best run in the MLflow Model Registry."""
import mlflow
from mlflow.tracking import MlflowClient
from dotenv import load_dotenv


def main() -> None:
    load_dotenv()

    client = MlflowClient()
    runs = mlflow.search_runs(order_by=["metrics.auc DESC"], max_results=1)
    if not runs.empty:
        run_id = runs.loc[0, "run_id"]
        client.create_model_version(name="credit-risk", source=f"runs:/{run_id}/model", run_id=run_id)


if __name__ == "__main__":
    main()


## split

In [None]:
"""Stratified train/test split by grade and loan_status."""
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from functools import reduce
from src.utils.spark import get_spark
from pathlib import Path
from src.utils.metrics import dump_metrics
import os


def stratified_split(df: DataFrame, strat_cols: list, test_frac: float, seed: int):
    df = df.withColumn("stratum", F.concat_ws("_", *[F.col(c) for c in strat_cols]))
    strata = [r[0] for r in df.select("stratum").distinct().collect()]
    train_parts = []
    test_parts = []
    for s in strata:
        sub = df.filter(F.col("stratum") == s)
        train, test = sub.randomSplit([1-test_frac, test_frac], seed)
        train_parts.append(train)
        test_parts.append(test)
    union = lambda dfs: reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), dfs)
    train_df = union(train_parts).drop("stratum")
    test_df = union(test_parts).drop("stratum")
    return train_df, test_df


FAST = os.getenv("FAST_MODE", "false").lower() == "true"


def main() -> None:
    spark = get_spark("split")
    data_path = Path("data/processed") / ("M_fast.parquet" if FAST else "M_full.parquet")
    if not data_path.exists():
        raise FileNotFoundError(f"{data_path} not found. Run prep.py first.")
    df = spark.read.parquet(str(data_path))
    train, test = stratified_split(df, ["grade", "loan_status"], test_frac=0.2, seed=42)
    Path("data/processed").mkdir(parents=True, exist_ok=True)
    n_partitions = 4 if FAST else 8
    (
        train.coalesce(n_partitions)
        .write
        .option("maxRecordsPerFile", 250000)
        .mode("overwrite")
        .parquet("data/processed/train.parquet")
    )
    (
        test.coalesce(n_partitions)
        .write
        .option("maxRecordsPerFile", 250000)
        .mode("overwrite")
        .parquet("data/processed/test.parquet")
    )

    dump_metrics("split", {"train": train.count(), "test": test.count(), "fast": FAST})


if __name__ == "__main__":
    main()


## train_sup

In [None]:
"""Train supervised models on default_flag."""
import os

# Disable noisy Spark autologging of datasets
os.environ["MLFLOW_ENABLE_SPARK_DATASET_AUTOLOGGING"] = "false"

from pyspark.ml.classification import (
    RandomForestClassifier,
    GBTClassifier,
    MultilayerPerceptronClassifier,
)
from pyspark.ml.feature import (
    StringIndexer,
    OneHotEncoder,
    VectorAssembler,
)
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from src.utils.spark import get_spark
from src.utils.balancing import add_weight_column
from src.utils.metrics import METRICS_DIR
import mlflow
from pathlib import Path
from dotenv import load_dotenv
from py4j.protocol import Py4JJavaError
import logging
import sys
import re
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    average_precision_score,
    confusion_matrix,
    roc_curve,
    precision_recall_curve,
)



def main() -> int:
    load_dotenv()

    try:
        mlflow.spark.autolog()
    except Exception as e:
        print(f"MLflow autologging not available: {e}")

    FAST = os.getenv("FAST_MODE", "false").lower() == "true"
    seed = 42

    spark = get_spark("train_sup")
    spark.conf.set("spark.python.worker.broadcastTimeout", "600")
    try:
        train = spark.read.parquet("data/processed/train.parquet")
        test = spark.read.parquet("data/processed/test.parquet")
        target = "default_flag"

        train = train.cache()
        test = test.cache()
    
        # Auto-detect available driver memory (GB) and compute batch factor
        mem_str = os.environ.get("SPARK_DRIVER_MEMORY", "6")
        digits = re.findall(r"\d+(?:\.\d+)?", mem_str)
        driver_mem_gb = float(digits[0]) if digits else 6.0
        max_rows = int(1_000_000 * (driver_mem_gb / 6))
        total_rows = train.count()
        if total_rows > max_rows:
            fraction = max_rows / total_rows
            train = train.sample(fraction=fraction, seed=42)
            print(
                f"Downsampled train from {total_rows} to {max_rows} rows (driver_mem={driver_mem_gb}G)"
            )
    
        # Basic preprocessing
        num_cols = [c for c, t in train.dtypes if t != "string" and c != target]
        cat_cols = [c for c, t in train.dtypes if t == "string" and c != target]
        train = train.fillna(0.0, subset=num_cols)
        test = test.fillna(0.0, subset=num_cols)

        train = add_weight_column(train, target)

        # Split into train/validation
        train_df, val_df = train.randomSplit([0.8, 0.2], seed=seed)

        # Preprocessing pipeline: StringIndexer -> OneHotEncoder -> VectorAssembler
        indexers = [
            StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
            for c in cat_cols
        ]
        encoder = OneHotEncoder(
            inputCols=[f"{c}_idx" for c in cat_cols],
            outputCols=[f"{c}_ohe" for c in cat_cols],
            dropLast=False,
        )
        assembler = VectorAssembler(
            inputCols=num_cols + [f"{c}_ohe" for c in cat_cols],
            outputCol="features",
        )
        prep_pipeline = Pipeline(stages=indexers + [encoder, assembler])
        prep_model = prep_pipeline.fit(train_df)
        train_pre = prep_model.transform(train_df)

        n_features = train_pre.select("features").first()["features"].size

        models = {
            "RandomForest": RandomForestClassifier(
                labelCol=target,
                featuresCol="features",
                weightCol="weight",
                seed=seed,
            ),
            "GBT": GBTClassifier(
                labelCol=target,
                featuresCol="features",
                weightCol="weight",
                seed=seed,
            ),
        }
        if not FAST:
            layers = [n_features, 64, 32, 2]
            models["MLP"] = MultilayerPerceptronClassifier(
                labelCol=target,
                featuresCol="features",
                layers=layers,
                seed=seed,
            )

        results = []
        Path("models/supervised").mkdir(parents=True, exist_ok=True)
        for name, algo in models.items():
            with mlflow.start_run(run_name=name) as run:
                pipeline = Pipeline(stages=[prep_model, algo])
                retry_fraction = 1.0
                while True:
                    subset = (
                        train_df
                        if retry_fraction >= 1.0
                        else train_df.sample(fraction=retry_fraction, seed=seed)
                    )
                    subset = subset.cache()
                    try:
                        model = pipeline.fit(subset)
                        break
                    except (Py4JJavaError, MemoryError) as e:
                        if "java.lang.OutOfMemoryError" in str(e):
                            if retry_fraction <= 0.01:
                                logging.critical("Model training failed due to OOM")
                                return 1
                            retry_fraction *= 0.5
                            print(
                                f"OOM detected â†’ retrying fit with fraction={retry_fraction}"
                            )
                        else:
                            logging.critical(str(e))
                            return 1

                model.transform(val_df)  # ensure pipeline reused on validation
                preds = model.transform(test)
                preds_pd = preds.select(target, "prediction", "probability").toPandas()
                y_true = preds_pd[target].astype(int)
                y_pred = preds_pd["prediction"].astype(int)
                probs = np.array(preds_pd["probability"].tolist())[:, 1]

                metrics = {
                    "auc": float(roc_auc_score(y_true, probs)),
                    "accuracy": float(accuracy_score(y_true, y_pred)),
                    "precision": float(precision_score(y_true, y_pred, zero_division=0)),
                    "recall": float(recall_score(y_true, y_pred, zero_division=0)),
                    "f1": float(f1_score(y_true, y_pred, zero_division=0)),
                    "pr_auc": float(average_precision_score(y_true, probs)),
                    "rows": subset.count(),
                    "fast": FAST,
                }

                run_dir = METRICS_DIR / run.info.run_id
                run_dir.mkdir(parents=True, exist_ok=True)

                with (run_dir / "metrics.json").open("w", encoding="utf-8") as f:
                    json.dump(metrics, f, indent=2)

                cm = confusion_matrix(y_true, y_pred)
                pd.DataFrame(cm).to_csv(run_dir / "cmatrix.csv", index=False)
                fig, ax = plt.subplots()
                ax.imshow(cm, cmap="Blues")
                ax.set_xlabel("Predicted")
                ax.set_ylabel("True")
                for i in range(cm.shape[0]):
                    for j in range(cm.shape[1]):
                        ax.text(j, i, int(cm[i, j]), ha="center", va="center")
                plt.tight_layout()
                fig.savefig(run_dir / "cmatrix.png")
                plt.close(fig)

                fpr, tpr, _ = roc_curve(y_true, probs)
                fig, ax = plt.subplots()
                ax.plot(fpr, tpr)
                ax.set_xlabel("FPR")
                ax.set_ylabel("TPR")
                plt.tight_layout()
                fig.savefig(run_dir / "roc.png")
                plt.close(fig)

                prec_c, rec_c, _ = precision_recall_curve(y_true, probs)
                fig, ax = plt.subplots()
                ax.plot(rec_c, prec_c)
                ax.set_xlabel("Recall")
                ax.set_ylabel("Precision")
                plt.tight_layout()
                fig.savefig(run_dir / "pr.png")
                plt.close(fig)

                if hasattr(model.stages[-1], "featureImportances"):
                    fi = model.stages[-1].featureImportances.toArray().tolist()
                    with (run_dir / "feature_importance.json").open("w", encoding="utf-8") as f:
                        json.dump(fi, f)
                    mlflow.log_artifact(str(run_dir / "feature_importance.json"))

                mlflow.log_metrics({k: metrics[k] for k in ["auc", "accuracy", "precision", "recall", "f1", "pr_auc"]})
                mlflow.log_artifact(str(run_dir / "metrics.json"))
                mlflow.log_artifact(str(run_dir / "cmatrix.csv"))
                mlflow.log_artifact(str(run_dir / "cmatrix.png"))
                mlflow.log_artifact(str(run_dir / "roc.png"))
                mlflow.log_artifact(str(run_dir / "pr.png"))

                mlflow.spark.log_model(
                    model,
                    f"models/supervised/{name}",
                    registered_model_name="credit-risk",
                )

                results.append({
                    "model": name,
                    **{k: metrics[k] for k in ["auc", "accuracy", "precision", "recall", "f1", "pr_auc"]},
                })

        summary_df = pd.DataFrame(results)
        print(summary_df)
        return 0
    finally:
        spark.stop()


if __name__ == "__main__":
    sys.exit(main())


## train_unsup

In [None]:
"""Train KMeans clustering model with robust preprocessing."""

import json
import os

# Disable dataset autologging noise from mlflow-spark
os.environ["MLFLOW_ENABLE_SPARK_DATASET_AUTOLOGGING"] = "false"

from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import (
    OneHotEncoder,
    StandardScaler,
    StringIndexer,
    VectorAssembler,
)
from src.utils.spark import get_spark
from src.utils.metrics import METRICS_DIR
import mlflow
from dotenv import load_dotenv


def main() -> None:
    load_dotenv()

    fast = os.getenv("FAST_MODE", "false").lower() == "true"
    sample_frac = float(os.getenv("SAMPLE_FRACTION", "0.05"))

    mlflow.spark.autolog()

    spark = get_spark("train_unsup")
    spark.conf.set("spark.sql.debug.maxToStringFields", 50)
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

    df = spark.read.parquet("data/processed/M.parquet")
    if fast:
        df = df.sample(fraction=sample_frac, seed=42)

    num_cols = [c for c, t in df.dtypes if t != "string"]
    cat_cols = [c for c, t in df.dtypes if t == "string"]

    indexers = [
        StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
        for c in cat_cols
    ]
    encoder = OneHotEncoder(
        inputCols=[f"{c}_idx" for c in cat_cols],
        outputCols=[f"{c}_ohe" for c in cat_cols],
        dropLast=False,
    )
    assembler = VectorAssembler(
        inputCols=num_cols + [f"{c}_ohe" for c in cat_cols],
        outputCol="raw_feats",
    )
    scaler = StandardScaler(inputCol="raw_feats", outputCol="features")
    kmeans = KMeans(k=8, seed=42, featuresCol="features", predictionCol="cluster")

    pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, kmeans])

    with mlflow.start_run(run_name="kmeans"):
        model = pipeline.fit(df)
        mlflow.spark.log_model(
            model,
            "models/unsupervised/kmeans",
            registered_model_name="credit-risk-segmentation",
        )
        inertia = model.stages[-1].summary.trainingCost
        metrics = {"k": 8, "inertia": float(inertia), "rows": df.count(), "fast": fast}

        METRICS_DIR.mkdir(parents=True, exist_ok=True)
        with (METRICS_DIR / "kmeans_metrics.json").open("w", encoding="utf-8") as f:
            json.dump(metrics, f, indent=2)

        mlflow.log_metrics({"k": 8, "inertia": metrics["inertia"], "rows": metrics["rows"]})

    spark.stop()


if __name__ == "__main__":
    main()


## get_spark

In [None]:
from pyspark.sql import SparkSession
import os
import re


def get_spark(app_name: str = "credit-risk") -> SparkSession:
    """Create or retrieve a Spark session with common config."""
    os.environ.setdefault("PYSPARK_PIN_THREAD", "false")
    os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")

    mem_str = os.environ.get("SPARK_DRIVER_MEMORY", "10g")
    digits = re.findall(r"\d+(?:\.\d+)?", mem_str)
    driver_mem_gb = float(digits[0]) if digits else 10.0


    builder = (
        SparkSession.builder
        .appName(app_name)
        .config("spark.driver.memory", mem_str)
        .config("spark.executor.memory", os.environ.get("SPARK_EXECUTOR_MEMORY", mem_str))
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.kryoserializer.buffer.max", "512m")
        .config("spark.driver.maxResultSize", os.environ.get("SPARK_DRIVER_MAXRESULTSIZE", "3g"))
    )

    if driver_mem_gb <= 8:
        builder = builder.config("spark.sql.shuffle.partitions", "200")

    return builder.getOrCreate()


In [None]:
import mlflow
import pandas as pd

records = []
for name in ['RandomForest', 'GBT']:
    df = mlflow.search_runs(filter_string=f"tags.mlflow.runName = '{name}'", order_by=['metrics.auc DESC'], max_results=1)
    if not df.empty:
        records.append(df[["run_id", 'tags.mlflow.runName', 'metrics.auc', 'metrics.accuracy', 'params.maxDepth', 'params.numTrees', 'params.maxIter', 'params.stepSize']])
if records:
    display(pd.concat(records))
