# 1: Install & Import Dependencies

In [1]:
!pip install -r requirements.txt
!python3 -m pip install pyspark xgboost hyperopt mlflow cloudpickle scikit-learn



# 2: Import Libraries

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.cluster import KMeans
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import sklearn
import time
from mlflow.tracking import MlflowClient
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
from hyperopt.pyll import scope
import xgboost as xgb
import numpy as np
from pyspark.sql import SparkSession
import tempfile
import os

  import pkg_resources


In [3]:
# --- Load Dataset ---
data = pd.read_csv("data/german_credit_data.csv")
data = data.drop(columns=['Unnamed: 0'])

# 3: Data Preprocessing

In [4]:
cat_cols = ['Sex', 'Housing', 'Saving accounts', 'Checking account', 'Purpose']
for col in cat_cols:
    data[col] = data[col].fillna('Unknown')
    data[col] = LabelEncoder().fit_transform(data[col])

# Fill missing numerical values
data = data.fillna(0)



# 4: Train–Validation–Test Split 

In [5]:
# We must split the data *before* scaling or clustering to prevent data leakage.

# Separate features (X) from the data
X = data.copy()

# Split into 60% train, 20% validation, 20% test
X_train, X_rem = train_test_split(X, train_size=0.6, random_state=123)
X_val, X_test = train_test_split(X_rem, test_size=0.5, random_state=123)

# 5: Feature Scaling & Risk Creation 

In [6]:
# --- Scale Data ---
# 1. Fit the scaler ONLY on the training data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

In [7]:
# 2. Use the fitted scaler to transform val and test data
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

In [8]:
# --- Create Target Variable 'Risk' ---
# 1. Fit KMeans ONLY on the scaled training data
kmeans = KMeans(n_clusters=2, random_state=42)
y_train = kmeans.fit_predict(X_train_scaled)

In [9]:
# 2. Use the fitted clusterer to predict 'Risk' for val and test data
y_val = kmeans.predict(X_val_scaled)
y_test = kmeans.predict(X_test_scaled)

print(f"Train shapes: X={X_train_scaled.shape}, y={y_train.shape}")
print(f"Val shapes: X={X_val_scaled.shape}, y={y_val.shape}")
print(f"Test shapes: X={X_test_scaled.shape}, y={y_test.shape}")

Train shapes: X=(600, 9), y=(600,)
Val shapes: X=(200, 9), y=(200,)
Test shapes: X=(200, 9), y=(200,)


# 6: Base Random Forest Model (Baseline Tracking)

In [10]:
class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, model):
        self.model = model
    def predict(self, context, model_input):
        return self.model.predict_proba(model_input)[:, 1]

with mlflow.start_run(run_name='untuned_random_forest'):
    n_estimators = 10
    model = RandomForestClassifier(n_estimators=n_estimators, random_state=123)
    # Use the corrected, non-leaked data
    model.fit(X_train_scaled, y_train)

    # Evaluate on the corrected, non-leaked test set
    preds = model.predict_proba(X_test_scaled)[:, 1]
    auc_score = roc_auc_score(y_test, preds)

    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_metric("auc", auc_score)

    wrappedModel = SklearnModelWrapper(model)
    signature = infer_signature(X_train_scaled, wrappedModel.predict(None, X_train_scaled))
    input_example = X_train_scaled[:2]

    conda_env = _mlflow_conda_env(
        additional_pip_deps=[
            f"cloudpickle=={cloudpickle.__version__}",
            f"scikit-learn=={sklearn.__version__}"
        ]
    )

    mlflow.pyfunc.log_model(
        artifact_path="random_forest_model",
        python_model=wrappedModel,
        conda_env=conda_env,
        signature=signature,
        input_example=input_example
    )

print(f"Random Forest Test AUC (no leakage): {auc_score:.4f}")


2025/11/03 19:08:52 INFO mlflow.pyfunc: Validating input example against model signature


Random Forest Test AUC (no leakage): 0.9885


# 7: Register Random Forest Model in MLflow

In [11]:
run_id = mlflow.search_runs(filter_string='tags."mlflow.runName" = "untuned_random_forest"').iloc[0].run_id
model_name = "german_credit_tracking"
model_version = mlflow.register_model(f"runs:/{run_id}/random_forest_model", model_name)

print(f"Registered model {model_name} version {model_version.version}")
time.sleep(10)

Successfully registered model 'german_credit_tracking'.
Created version '1' of model 'german_credit_tracking'.


Registered model german_credit_tracking version 1


# 8: Move Model to Production

In [12]:
client = MlflowClient()
client.transition_model_version_stage(
    name=model_name,
    version=model_version.version,
    stage="Production",
)
print("Moved RF model to Production")

Moved RF model to Production


  client.transition_model_version_stage(


# 10: Load & Test Production Model

In [13]:
model_prod = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
# Test with the corrected, non-leaked test set
test_auc = roc_auc_score(y_test, model_prod.predict(X_test_scaled))
print(f"Loaded Production RF Model Test AUC: {test_auc:.4f}")

Loaded Production RF Model Test AUC: 0.9885


# 10: XGBoost Hyperparameter Tuning with Hyperopt

In [14]:
search_space = {
    'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
    'learning_rate': hp.loguniform('learning_rate', -3, 0),
    'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
    'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
    'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
    'objective': 'binary:logistic',
    'seed': 123,
}

def train_model(params):
    mlflow.xgboost.autolog(disable=True) 
    
    with mlflow.start_run(run_name="xgboost_training", nested=True) as run:
        train = xgb.DMatrix(X_train_scaled, label=y_train)
        val = xgb.DMatrix(X_val_scaled, label=y_val)

        # Train model
        booster = xgb.train(
            params=params,
            dtrain=train,
            num_boost_round=1000,
            evals=[(val, "validation")],
            early_stopping_rounds=50,
            verbose_eval=False # Quieten the output
        )

        # Evaluate
        val_preds = booster.predict(val)
        auc = roc_auc_score(y_val, val_preds) # Use y_val
        mlflow.log_params(params)
        mlflow.log_metric("auc", auc)

        # Save and log model properly
        with tempfile.TemporaryDirectory() as tmp_dir:
            model_path = os.path.join(tmp_dir, "xgb_model.json")
            booster.save_model(model_path)

            # Log model artifact explicitly under path "model"
            mlflow.xgboost.log_model(
                xgb_model=booster,
                artifact_path="model", # This path is crucial
                input_example=X_train_scaled[:2],
                signature=infer_signature(X_train_scaled, booster.predict(xgb.DMatrix(X_train_scaled)))
            )
        
        return {"status": STATUS_OK, "loss": -auc}

# 11: Run Hyperparameter Optimization

In [15]:
from hyperopt import Trials
trials = Trials() 

print("Starting Hyperparameter Optimization for XGBoost...")
with mlflow.start_run(run_name='xgboost_parent_run'):
    best_params = fmin(
        fn=train_model,
        space=search_space,
        algo=tpe.suggest,
        max_evals=30, # Increased evals for better search
        trials=trials, # Use standard Trials
    )
print("Hyperparameter Optimization finished.")

Starting Hyperparameter Optimization for XGBoost...
  0%|          | 0/30 [00:00<?, ?trial/s, best loss=?]




  3%|▎         | 1/30 [00:02<01:00,  2.09s/trial, best loss: -0.9786206896551723]




  7%|▋         | 2/30 [00:03<00:52,  1.87s/trial, best loss: -0.9786206896551723]




 10%|█         | 3/30 [00:05<00:48,  1.81s/trial, best loss: -0.9899686520376176]




 13%|█▎        | 4/30 [00:07<00:46,  1.78s/trial, best loss: -0.9899686520376176]




 17%|█▋        | 5/30 [00:08<00:43,  1.75s/trial, best loss: -0.9899686520376176]




 20%|██        | 6/30 [00:10<00:41,  1.73s/trial, best loss: -0.9899686520376176]




 23%|██▎       | 7/30 [00:12<00:41,  1.78s/trial, best loss: -0.9900940438871473]




 27%|██▋       | 8/30 [00:14<00:38,  1.76s/trial, best loss: -0.9900940438871473]




 30%|███       | 9/30 [00:15<00:36,  1.74s/trial, best loss: -0.9902194357366771]




 33%|███▎      | 10/30 [00:17<00:34,  1.71s/trial, best loss: -0.9902194357366771]




 37%|███▋      | 11/30 [00:19<00:31,  1.68s/trial, best loss: -0.9902194357366771]




 40%|████      | 12/30 [00:20<00:30,  1.67s/trial, best loss: -0.9902194357366771]




 43%|████▎     | 13/30 [00:22<00:29,  1.73s/trial, best loss: -0.9902194357366771]




 47%|████▋     | 14/30 [00:24<00:28,  1.75s/trial, best loss: -0.9902194357366771]




 50%|█████     | 15/30 [00:26<00:25,  1.72s/trial, best loss: -0.9902194357366771]




 53%|█████▎    | 16/30 [00:27<00:23,  1.70s/trial, best loss: -0.9902194357366771]




 57%|█████▋    | 17/30 [00:29<00:21,  1.68s/trial, best loss: -0.9902194357366771]




 60%|██████    | 18/30 [00:31<00:20,  1.67s/trial, best loss: -0.9902194357366771]




 63%|██████▎   | 19/30 [00:32<00:18,  1.67s/trial, best loss: -0.9902194357366771]




 67%|██████▋   | 20/30 [00:34<00:16,  1.67s/trial, best loss: -0.9902194357366771]




 70%|███████   | 21/30 [00:36<00:15,  1.70s/trial, best loss: -0.9902194357366771]




 73%|███████▎  | 22/30 [00:37<00:13,  1.70s/trial, best loss: -0.9902194357366771]




 77%|███████▋  | 23/30 [00:39<00:12,  1.73s/trial, best loss: -0.9902194357366771]




 80%|████████  | 24/30 [00:41<00:10,  1.73s/trial, best loss: -0.9902194357366771]




 83%|████████▎ | 25/30 [00:43<00:08,  1.71s/trial, best loss: -0.9902194357366771]




 87%|████████▋ | 26/30 [00:44<00:06,  1.71s/trial, best loss: -0.9902194357366771]




 90%|█████████ | 27/30 [00:46<00:05,  1.73s/trial, best loss: -0.9902194357366771]




 93%|█████████▎| 28/30 [00:48<00:03,  1.73s/trial, best loss: -0.9902194357366771]




 97%|█████████▋| 29/30 [00:50<00:01,  1.71s/trial, best loss: -0.9902194357366771]




100%|██████████| 30/30 [00:51<00:00,  1.72s/trial, best loss: -0.9902194357366771]
Hyperparameter Optimization finished.


# 12: Find and Register Best XGBoost Model

In [16]:
search_filter = 'tags."mlflow.runName" = "xgboost_training"'
best_run = mlflow.search_runs(filter_string=search_filter, order_by=["metrics.auc DESC"]).iloc[0]

print(f"\n--- Best XGBoost Run ---")
print(f"Best Run Name: {best_run['tags.mlflow.runName']}")
print(f"AUC of Best Run: {best_run['metrics.auc']:.4f}")
print(f"Best Run ID: {best_run.run_id}")

# Register using the correct artifact path "model"
new_model_version = mlflow.register_model(
    f"runs:/{best_run.run_id}/model", 
    model_name
)
print(f"Registered new XGBoost model version: {new_model_version.version}")
time.sleep(10)

Registered model 'german_credit_tracking' already exists. Creating a new version of this model...



--- Best XGBoost Run ---
Best Run Name: xgboost_training
AUC of Best Run: 0.9902
Best Run ID: c2d954bfcf304fcd8e9d80ae7c44e916
Registered new XGBoost model version: 2


Created version '2' of model 'german_credit_tracking'.


The following code promotes the new version to production

In [17]:
client.transition_model_version_stage(
    name=model_name,
    version=model_version.version,
    stage='Archived'
)

client.transition_model_version_stage(
    name=model_name,
    version=new_model_version.version,
    stage='Production'
)

  client.transition_model_version_stage(
  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1762214994648, current_stage='Production', deployment_job_state=None, description=None, last_updated_timestamp=1762215194525, metrics=[<Metric: dataset_digest=None, dataset_name=None, key='auc', model_id='m-e9462f42c19244fe8429f6fe510a063d', run_id='c2d954bfcf304fcd8e9d80ae7c44e916', step=0, timestamp=1762214957189, value=0.9902194357366771>], model_id='m-e9462f42c19244fe8429f6fe510a063d', name='german_credit_tracking', params={'learning_rate': '0.10129203520715963',
 'max_depth': '5',
 'min_child_weight': '0.4012331709439813',
 'objective': 'binary:logistic',
 'reg_alpha': '0.049418872057621244',
 'reg_lambda': '0.003175556895589857',
 'seed': '123'}, run_id='c2d954bfcf304fcd8e9d80ae7c44e916', run_link=None, source='models:/m-e9462f42c19244fe8429f6fe510a063d', status='READY', status_message=None, tags={}, user_id=None, version=2>

Clients that call load_model now receive the new model.

In [20]:
mlflow.pyfunc.spark_udf

<function mlflow.pyfunc.spark_udf(spark, model_uri, result_type=None, env_manager=None, params: dict[str, typing.Any] | None = None, extra_env: dict[str, str] | None = None, prebuilt_env_uri: str | None = None, model_config: str | pathlib.Path | dict[str, typing.Any] | None = None)>

In [21]:
import mlflow.pyfunc

from pyspark.sql import SparkSession

# Create or retrieve a Spark session
spark = SparkSession.builder \
    .appName("MLflow Integration") \
    .config("spark.some.config.option", "config-value") \
    .getOrCreate()

apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/production")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/03 19:14:48 WARN Utils: Your hostname, Ganapriyas-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.88 instead (on interface en0)
25/11/03 19:14:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/03 19:14:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  from .autonotebook import tqdm as notebook_tqdm
Downloading artifacts: 100%|██████████| 8/8 [00:00<00:00, 4142.52it/s] 
Downloading artifacts: 100%|██████████| 8/8 [00:00<00:00, 4228.66it/s] 
2025/11/03 19:14:50 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


In [23]:
new_model_version

<ModelVersion: aliases=[], creation_timestamp=1762214994648, current_stage='None', deployment_job_state=None, description=None, last_updated_timestamp=1762214994648, metrics=[<Metric: dataset_digest=None, dataset_name=None, key='auc', model_id='m-e9462f42c19244fe8429f6fe510a063d', run_id='c2d954bfcf304fcd8e9d80ae7c44e916', step=0, timestamp=1762214957189, value=0.9902194357366771>], model_id='m-e9462f42c19244fe8429f6fe510a063d', name='german_credit_tracking', params={'learning_rate': '0.10129203520715963',
 'max_depth': '5',
 'min_child_weight': '0.4012331709439813',
 'objective': 'binary:logistic',
 'reg_alpha': '0.049418872057621244',
 'reg_lambda': '0.003175556895589857',
 'seed': '123'}, run_id='c2d954bfcf304fcd8e9d80ae7c44e916', run_link=None, source='models:/m-e9462f42c19244fe8429f6fe510a063d', status='READY', status_message=None, tags={}, user_id=None, version=2>

In [32]:
!mlflow ui --port=5001

[MLflow] Security middleware enabled with default settings (localhost-only). To allow connections from other hosts, use --host 0.0.0.0 and configure --allowed-hosts and --cors-allowed-origins.
[32mINFO[0m:     Uvicorn running on [1mhttp://127.0.0.1:5001[0m (Press CTRL+C to quit)
[32mINFO[0m:     Started parent process [[36m[1m55445[0m]
[32mINFO[0m:     Started server process [[36m55450[0m]
[32mINFO[0m:     Started server process [[36m55447[0m]
[32mINFO[0m:     Started server process [[36m55448[0m]
[32mINFO[0m:     Started server process [[36m55449[0m]
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m