In [17]:
import mlflow
from read_delta_files_utils import load_train, load_test, create_spark_with_delta
from model_training_utils import MLExperimentManager

In [18]:
spark = create_spark_with_delta()
train = load_train(spark)
test = load_test(spark)

In [19]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType, IntegerType, LongType, FloatType, BooleanType
import os

# Convert boolean target to integer if needed
if train.schema["is_terminated"].dataType == BooleanType():
    train = train.withColumn("is_terminated", 
                             when(col("is_terminated") == True, 1).otherwise(0).cast("integer"))
    test = test.withColumn("is_terminated", 
                           when(col("is_terminated") == True, 1).otherwise(0).cast("integer"))

# Filter to numeric columns only
numeric_cols = [field.name for field in train.schema.fields 
                if isinstance(field.dataType, (DoubleType, IntegerType, LongType, FloatType))]

train = train.dropna(how="any", subset=numeric_cols)
test = test.dropna(how="any", subset=numeric_cols)

target_col = "is_terminated"
feature_cols = [c for c in numeric_cols if c != target_col]

# Data loaders for manager
def load_train_converted(spark):
    return train.select(feature_cols + [target_col])

def load_test_converted(spark):
    return test.select(feature_cols + [target_col])

# Initialize manager
mlruns_path = os.path.abspath("./mlruns")
os.makedirs(mlruns_path, exist_ok=True)

manager = MLExperimentManager(
    spark=spark,
    target_col=target_col,
    feature_cols=feature_cols,
    experiment_name="nsf_models",
    tracking_uri=f"file:{mlruns_path}",
    problem_type="multiclass",
)

manager.load_data(load_train_converted, load_test_converted)

Exception: Invalid parent directory '/home/gerard/Documents/UPC/41_4t-Q1/BDA/Projecte-2/cancelled-nsf-grants-predictive-analytics-pipeline/scripts/mlruns/.trash'

In [None]:
rf_model, rf_metrics = manager.train_random_forest(
    run_name="rf_baseline",
    num_trees=100,
    max_depth=10,
)

rf_test_metrics = manager.evaluate_on_test(rf_model)
print(f"Random Forest - Val: {rf_metrics['valid_accuracy']:.4f}, Test: {rf_test_metrics['test_accuracy']:.4f}")

25/12/12 17:13:46 WARN DAGScheduler: Broadcasting large task binary with size 1298.5 KiB
25/12/12 17:13:46 WARN DAGScheduler: Broadcasting large task binary with size 1587.3 KiB
25/12/12 17:13:46 WARN DAGScheduler: Broadcasting large task binary with size 1839.7 KiB
25/12/12 17:13:47 WARN DAGScheduler: Broadcasting large task binary with size 1185.9 KiB


Random Forest - Val: 0.8336, Test: 0.8420


25/12/12 17:13:47 WARN DAGScheduler: Broadcasting large task binary with size 1178.1 KiB


In [None]:
gbt_model, gbt_metrics = manager.train_gbt(
    run_name="gbt_baseline",
    max_iter=100,
    max_depth=5,
)

gbt_test_metrics = manager.evaluate_on_test(gbt_model)
print(f"Gradient Boosted Trees - Val: {gbt_metrics['valid_accuracy']:.4f}, Test: {gbt_test_metrics['test_accuracy']:.4f}")

Gradient Boosted Trees - Val: 0.7128, Test: 0.7174


In [None]:
lr_model, lr_metrics = manager.train_logistic_regression(
    run_name="lr_baseline",
    reg_param=0.01,
    elastic_net_param=0.0,
)

lr_test_metrics = manager.evaluate_on_test(lr_model)
print(f"Logistic Regression - Val: {lr_metrics['valid_accuracy']:.4f}, Test: {lr_test_metrics['test_accuracy']:.4f}")

Logistic Regression - Val: 0.7648, Test: 0.7367


In [None]:
import pandas as pd

comparison_df = pd.DataFrame({
    'Model': ['Random Forest', 'Gradient Boosted Trees', 'Logistic Regression'],
    'Val Accuracy': [rf_metrics['valid_accuracy'], gbt_metrics['valid_accuracy'], lr_metrics['valid_accuracy']],
    'Test Accuracy': [rf_test_metrics['test_accuracy'], gbt_test_metrics['test_accuracy'], lr_test_metrics['test_accuracy']],
})

print("\n" + "="*80)
print("MODEL COMPARISON")
print("="*80)
print(comparison_df.to_string(index=False))
print("="*80)


MODEL COMPARISON
                 Model  Val Accuracy  Test Accuracy
         Random Forest        0.8336       0.842004
Gradient Boosted Trees        0.7128       0.717405
   Logistic Regression        0.7648       0.736673


In [None]:
mlflow_client = mlflow.tracking.MlflowClient()
experiment = mlflow_client.get_experiment_by_name("nsf_models")

if experiment:
    runs = mlflow_client.search_runs(experiment_ids=[experiment.experiment_id])
    
    print("\n" + "="*80)
    print("MLFLOW RUNS")
    print("="*80)
    
    for run in runs:
        accuracy = run.data.metrics['valid_accuracy']
        print(f"{run.info.run_name:30s} | Val Acc: {accuracy:.4f}")
    
    best_run = max(runs, key=lambda r: r.data.metrics['valid_accuracy'])
    mlflow_client.set_tag(best_run.info.run_id, "deployment", "production")
    
    print(f"\n✅ Best model: {best_run.info.run_name} ({best_run.data.metrics['valid_accuracy']:.4f})")
    print("="*80)


MLFLOW RUNS
lr_baseline                    | Val Acc: 0.7648
gbt_baseline                   | Val Acc: 0.7128
rf_baseline                    | Val Acc: 0.8336
lr_baseline                    | Val Acc: 0.7648
gbt_baseline                   | Val Acc: 0.7128
rf_baseline                    | Val Acc: 0.8336

✅ Best model: rf_baseline (0.8336)
