In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml import Pipeline
import pandas as pd



In [2]:
spark = SparkSession.builder \
    .appName("MySQL to Spark") \
    .getOrCreate()

In [3]:
df1 = spark.read.parquet('24_03.parquet')
df2 = spark.read.parquet('24_04.parquet')


df = df1.union(df2)

# df.show()

In [4]:
df.printSchema()

root
 |-- jid: string (nullable = true)
 |-- usr: string (nullable = true)
 |-- jnam: string (nullable = true)
 |-- cnumr: long (nullable = true)
 |-- cnumat: long (nullable = true)
 |-- cnumut: long (nullable = true)
 |-- nnumr: long (nullable = true)
 |-- adt: string (nullable = true)
 |-- qdt: string (nullable = true)
 |-- schedsdt: string (nullable = true)
 |-- deldt: string (nullable = true)
 |-- ec: long (nullable = true)
 |-- elpl: double (nullable = true)
 |-- sdt: string (nullable = true)
 |-- edt: string (nullable = true)
 |-- nnuma: long (nullable = true)
 |-- idle_time_ave: double (nullable = true)
 |-- nnumu: long (nullable = true)
 |-- perf1: double (nullable = true)
 |-- perf2: double (nullable = true)
 |-- perf3: double (nullable = true)
 |-- perf4: double (nullable = true)
 |-- perf5: double (nullable = true)
 |-- perf6: double (nullable = true)
 |-- mszl: double (nullable = true)
 |-- pri: long (nullable = true)
 |-- econ: double (nullable = true)
 |-- avgpcon: double

In [5]:
# Extract anonymous integers
def extract_anon_int(col_name):
    return when(
        col(col_name).isNotNull() & col(col_name).rlike("_\\d+$"),
        regexp_extract(col(col_name), "_(\\d+)$", 1).cast("int")
    ).otherwise(0)

df = df.withColumn("jnam_anon", extract_anon_int("jnam")) \
       .withColumn("usr_anon", extract_anon_int("usr")) \
       .withColumn("jobenv_anon", extract_anon_int("jobenv_req"))

# Create targets
df = df.withColumn("target_ec", when(col("exit state") == "completed", 1).otherwise(0)) \
       .withColumn("target_pclass", when(col("pclass") == "compute-bound", 1).otherwise(0)) \
       .withColumn("target_avgpcon", 
                   when((col("nnuma") > 0) & (col("avgpcon").isNotNull()), 
                        (col("avgpcon") / col("nnuma")).cast("double")).otherwise(lit(None))) \
       .withColumn("target_duration",
                   when(col("duration").isNotNull(), (col("duration") / 60.0).cast("double")).otherwise(lit(None)))


In [6]:
numeric_cols = [
    "cnumr", "cnumat", "cnumut", "nnumr", "elpl", "nnuma",
    "idle_time_ave", "nnumu", "perf1", "perf2", "perf3", "perf4",
    "perf5", "perf6", "mszl", "pri", "econ", "minpcon",
    "maxpcon", "msza", "mmszu", "uctmut", "sctmut", "usctmut",
    "freq_req", "freq_alloc", "flops", "mbwidth", "opint",
    "jnam_anon", "usr_anon", "jobenv_anon"
]

In [None]:
# ============================================================================
# 2. DEFINE TASKS
# ============================================================================
print("\nStep 2: Defining tasks...")

tasks = {
    "ec": {
        "type": "classification",
        "target": "target_ec",
        "description": "Exit Code (completed vs not)"
    },
    "pclass": {
        "type": "classification",
        "target": "target_pclass",
        "description": "PClass (compute-bound vs others)"
    },
    "avgpcon": {
        "type": "regression",
        "target": "target_avgpcon",
        "description": "AvgPCon per node"
    },
    "duration": {
        "type": "regression",
        "target": "target_duration",
        "description": "Duration (minutes)"
    }
}

feature_cols = numeric_cols  

print(f"Tasks: {len(tasks)}")
print(f"Features: {len(feature_cols)}")


ðŸ“‹ Step 2: Defining tasks...
   Tasks: 4
   Features: 32


In [None]:

classification_models = {
    "Logistic Regression": lambda: LogisticRegression(maxIter=100, regParam=0.01, elasticNetParam=0.5),
    "Random Forest": lambda: RandomForestClassifier(numTrees=100, maxDepth=10, seed=42),
    "Gradient Boosting": lambda: GBTClassifier(maxIter=100, maxDepth=5, seed=42)
}

regression_models = {
    "Linear Regression": lambda: LinearRegression(maxIter=100, regParam=0.01, elasticNetParam=0.5),
    
    # Random Forest vá»›i config nháº¹ hÆ¡n
    "Random Forest": lambda: RandomForestRegressor(
        numTrees=30,           
        maxDepth=6,            
        maxBins=32,
        minInstancesPerNode=20,
        subsamplingRate=0.8,   
        seed=42
    ),
    
    "GBT Regression": lambda: GBTRegressor(
        maxIter=50,    
        maxDepth=4,    
        stepSize=0.1,
        seed=42
    )
}


In [None]:
def train_classification_task(df, task_name, target_col, models_dict):
    
    print(f"\n{'â”€' * 80}")
    print(f"CLASSIFICATION: {task_name.upper()}")
    print(f"{'â”€' * 80}")
    
    # Prepare data
    df_task = df.select(feature_cols + [target_col]) \
                .filter(col(target_col).isNotNull()) \
                .withColumnRenamed(target_col, "label")
    
    train_df, test_df = df_task.randomSplit([0.8, 0.2], seed=42)
    train_df.cache()
    test_df.cache()
    
    print(f"   Train: {train_df.count()} | Test: {test_df.count()}")
    
    # Feature transformation
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw", handleInvalid="skip")
    scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
    
    # Evaluators
    eval_auc = BinaryClassificationEvaluator(metricName="areaUnderROC")
    eval_acc = MulticlassClassificationEvaluator(metricName="accuracy")
    eval_f1 = MulticlassClassificationEvaluator(metricName="f1")
    
    # Train models
    results = {}
    trained_models = {}
    
    for i, (model_name, model_fn) in enumerate(models_dict.items(), 1):
        print(f"\n[{i}/{len(models_dict)}] Training {model_name}...")
        
        model = model_fn()
        pipeline = Pipeline(stages=[assembler, scaler, model])
        trained_model = pipeline.fit(train_df)
        predictions = trained_model.transform(test_df)
        
        auc = eval_auc.evaluate(predictions)
        acc = eval_acc.evaluate(predictions)
        f1 = eval_f1.evaluate(predictions)
        
        print(f"AUC: {auc:.4f} | Accuracy: {acc:.4f} | F1: {f1:.4f}")
        
        results[model_name] = {"AUC": auc, "Accuracy": acc, "F1": f1}
        trained_models[model_name] = trained_model
    
    # Cleanup
    train_df.unpersist()
    test_df.unpersist()
    
    return results, trained_models


def train_regression_task(df, task_name, target_col, models_dict):
    
    print(f"\n{'â”€' * 80}")
    print(f"REGRESSION: {task_name.upper()}")
    print(f"{'â”€' * 80}")
    
    # Prepare data
    df_task = df.select(feature_cols + [target_col]) \
                .filter(col(target_col).isNotNull()) \
                .withColumnRenamed(target_col, "label")
    
    train_df, test_df = df_task.randomSplit([0.8, 0.2], seed=42)
    train_df.cache()
    test_df.cache()
    
    print(f"   Train: {train_df.count()} | Test: {test_df.count()}")
    
    # Feature transformation
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw", handleInvalid="skip")
    scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
    
    # Evaluators
    eval_rmse = RegressionEvaluator(metricName="rmse")
    eval_mae = RegressionEvaluator(metricName="mae")
    eval_r2 = RegressionEvaluator(metricName="r2")
    
    # Train models
    results = {}
    trained_models = {}
    
    for i, (model_name, model_fn) in enumerate(models_dict.items(), 1):
        print(f"\n   [{i}/{len(models_dict)}] Training {model_name}...")
        
        model = model_fn()
        pipeline = Pipeline(stages=[assembler, scaler, model])
        trained_model = pipeline.fit(train_df)
        predictions = trained_model.transform(test_df)
        
        rmse = eval_rmse.evaluate(predictions)
        mae = eval_mae.evaluate(predictions)
        r2 = eval_r2.evaluate(predictions)
        
        print(f"RMSE: {rmse:.4f} | MAE: {mae:.4f} | RÂ²: {r2:.4f}")
        
        results[model_name] = {"RMSE": rmse, "MAE": mae, "RÂ²": r2}
        trained_models[model_name] = trained_model
    
    # Cleanup
    train_df.unpersist()
    test_df.unpersist()
    
    return results, trained_models


In [None]:
import os
import time
import pickle
from datetime import datetime
from pyspark.sql.functions import col
from sklearn.metrics import classification_report
import pandas as pd
import builtins  

print("\n" + "=" * 80)
print("TRAINING ALL MODELS")
print("=" * 80)

# Táº¡o thÆ° má»¥c results vÃ  checkpoints
results_dir = "D:/BigData/results"
checkpoints_dir = "D:/BigData/checkpoints"
os.makedirs(results_dir, exist_ok=True)
os.makedirs(checkpoints_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

all_results = {}
all_models = {}
training_times = {}

# Training vá»›i timing
for task_id, task_info in tasks.items():
    task_type = task_info["type"]
    target_col = task_info["target"]
    description = task_info["description"]
    
    print(f"\nStarting task: {task_id} ({description})")
    task_start_time = time.time()
    
    if task_type == "classification":
        results, models = train_classification_task(
            df, description, target_col, classification_models
        )
    else:
        results, models = train_regression_task(
            df, description, target_col, regression_models
        )
    
    task_end_time = time.time()
    task_duration = task_end_time - task_start_time
    
    all_results[task_id] = results
    all_models[task_id] = models
    training_times[task_id] = {
        'duration_seconds': task_duration,
        'duration_minutes': task_duration / 60,
        'start_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(task_start_time)),
        'end_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(task_end_time))
    }
    
    print(f"Task completed in {task_duration:.2f} seconds ({task_duration/60:.2f} minutes)")
    
    # Save checkpoints for each task (PySpark models)
    task_checkpoint_dir = os.path.join(checkpoints_dir, f"{task_id}_{timestamp}")
    os.makedirs(task_checkpoint_dir, exist_ok=True)
    
    for model_name, model_pipeline in models.items():
        if model_pipeline is not None:
            try:
                # Save PySpark model pipeline
                model_path = os.path.join(task_checkpoint_dir, f"{model_name.replace(' ', '_').lower()}")
                model_pipeline.write().overwrite().save(model_path)
                print(f"   ðŸ’¾ Saved PySpark model: {model_name} -> {model_path}")
                
            except Exception as e:
                print(f"Error saving {model_name}: {str(e)}")
                continue
    
    print(f"Checkpoints saved to: {task_checkpoint_dir}")

# ============================================================================
# 6. EXPORT COMPREHENSIVE REPORTS
# ============================================================================
print("\nExporting comprehensive results...")

# Export timing summary
timing_file = f"{results_dir}/training_times_{timestamp}.txt"
with open(timing_file, 'w', encoding='utf-8') as f:
    f.write("TRAINING TIME SUMMARY\n")
    f.write("=" * 80 + "\n\n")
    
    total_time = builtins.sum(times['duration_seconds'] for times in training_times.values())
    f.write(f"Total Training Time: {total_time:.2f} seconds ({total_time/60:.2f} minutes)\n\n")
    
    for task_id, times in training_times.items():
        task_info = tasks[task_id]
        f.write(f"{task_info['description']}:\n")
        f.write(f"  Start Time: {times['start_time']}\n")
        f.write(f"  End Time: {times['end_time']}\n")
        f.write(f"  Duration: {times['duration_seconds']:.2f} seconds ({times['duration_minutes']:.2f} minutes)\n\n")

print(f"Timing report: {timing_file}")

# Export detailed reports for all tasks
for task_id, task_info in tasks.items():
    task_type = task_info["type"]
    target_col = task_info["target"]
    
    # File path
    report_file = f"{results_dir}/{task_id}_detailed_report_{timestamp}.txt"
    
    with open(report_file, 'w', encoding='utf-8') as f:
        f.write(f"DETAILED REPORT: {task_info['description']}\n")
        f.write("=" * 80 + "\n\n")
        
        # Training time info
        times = training_times[task_id]
        f.write(f"Training Time: {times['duration_seconds']:.2f} seconds ({times['duration_minutes']:.2f} minutes)\n")
        f.write(f"Start: {times['start_time']} | End: {times['end_time']}\n\n")
        
        # Model performance
        results = all_results[task_id]
        models = all_models[task_id]
        
        if task_type == "classification":
            f.write("CLASSIFICATION PERFORMANCE\n")
            f.write("-" * 50 + "\n")
            f.write(f"{'Model':<20} {'AUC':<10} {'Accuracy':<12} {'F1':<10}\n")
            f.write("-" * 52 + "\n")
            
            for model_name, metrics in results.items():
                f.write(f"{model_name:<20} {metrics['AUC']:<10.4f} {metrics['Accuracy']:<12.4f} {metrics['F1']:<10.4f}\n")
            
            f.write("\n\nDETAILED CLASSIFICATION REPORTS\n")
            f.write("=" * 80 + "\n")
            
            # Generate detailed classification reports for PySpark models
            for model_name, model_pipeline in models.items():
                if model_pipeline is None:
                    f.write(f"\n{model_name}: TRAINING FAILED\n")
                    continue
                
                f.write(f"\n{model_name}\n")
                f.write("-" * 80 + "\n")
                
                try:
                    # Recreate test data for this model
                    df_task = df.select(feature_cols + [target_col]) \
                                .filter(col(target_col).isNotNull()) \
                                .withColumnRenamed(target_col, "label")
                    
                    train_df, test_df = df_task.randomSplit([0.8, 0.2], seed=42)
                    test_df.cache()
                    
                    # Get predictions
                    predictions = model_pipeline.transform(test_df)
                    pred_pandas = predictions.select("label", "prediction").toPandas()
                    
                    
                    if len(pred_pandas) > 0:
                        report = classification_report(
                            pred_pandas["label"], 
                            pred_pandas["prediction"],
                            target_names=['Class 0', 'Class 1'],
                            zero_division=0
                        )
                        f.write(report)
                        
                        from sklearn.metrics import confusion_matrix
                        cm = confusion_matrix(pred_pandas["label"], pred_pandas["prediction"])
                        f.write(f"\nConfusion Matrix:\n")
                        f.write(f"True\\Pred    0    1\n")
                        if cm.shape == (2, 2):
                            f.write(f"    0     {cm[0,0]:4d} {cm[0,1]:4d}\n")
                            f.write(f"    1     {cm[1,0]:4d} {cm[1,1]:4d}\n")
                        else:
                            f.write(f"Confusion matrix shape: {cm.shape}\n{cm}\n")
                    else:
                        f.write("No predictions available for detailed report\n")
                    
                    test_df.unpersist()
                    
                except Exception as e:
                    f.write(f"Error generating detailed classification report: {str(e)}\n")
                
                f.write("\n")
        
        else:  # Regression
            f.write("REGRESSION PERFORMANCE\n")
            f.write("-" * 50 + "\n")
            f.write(f"{'Model':<20} {'RMSE':<12} {'MAE':<12} {'RÂ²':<10}\n")
            f.write("-" * 54 + "\n")
            
            for model_name, metrics in results.items():
                f.write(f"{model_name:<20} {metrics['RMSE']:<12.4f} {metrics['MAE']:<12.4f} {metrics['RÂ²']:<10.4f}\n")
            
            f.write("\n\nDETAILED REGRESSION ANALYSIS\n")
            f.write("=" * 80 + "\n")
            
            # Generate detailed regression reports for PySpark models
            for model_name, model_pipeline in models.items():
                if model_pipeline is None:
                    f.write(f"\n{model_name}: TRAINING FAILED\n")
                    continue
                
                f.write(f"\n{model_name}\n")
                f.write("-" * 80 + "\n")
                
                try:
                    # Recreate test data for this model
                    df_task = df.select(feature_cols + [target_col]) \
                                .filter(col(target_col).isNotNull()) \
                                .withColumnRenamed(target_col, "label")
                    
                    train_df, test_df = df_task.randomSplit([0.8, 0.2], seed=42)
                    test_df.cache()
                    
                    # Get predictions
                    predictions = model_pipeline.transform(test_df)
                    pred_pandas = predictions.select("label", "prediction").toPandas()
                    
                    if len(pred_pandas) > 0:
                        y_test = pred_pandas["label"].values
                        y_pred = pred_pandas["prediction"].values
                        
                        # Calculate detailed regression metrics
                        from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
                        import numpy as np
                        
                        rmse = np.sqrt(mean_squared_error(y_test, y_pred))
                        mae = mean_absolute_error(y_test, y_pred)
                        r2 = r2_score(y_test, y_pred)
                        
                        mask = y_test != 0
                        if mask.any():
                            mape = np.mean(np.abs((y_test[mask] - y_pred[mask]) / y_test[mask])) * 100
                        else:
                            mape = float('inf')
                        
                        f.write(f"Root Mean Square Error (RMSE): {rmse:.4f}\n")
                        f.write(f"Mean Absolute Error (MAE): {mae:.4f}\n")
                        f.write(f"R-squared (RÂ²): {r2:.4f}\n")
                        if mape != float('inf'):
                            f.write(f"Mean Absolute Percentage Error (MAPE): {mape:.2f}%\n")
                        
                        # Prediction statistics
                        f.write(f"\nPrediction Statistics:\n")
                        f.write(f"  Actual - Min: {y_test.min():.4f}, Max: {y_test.max():.4f}, Mean: {y_test.mean():.4f}\n")
                        f.write(f"  Predicted - Min: {y_pred.min():.4f}, Max: {y_pred.max():.4f}, Mean: {y_pred.mean():.4f}\n")
                        
                        # Residual analysis
                        residuals = y_test - y_pred
                        f.write(f"  Residuals - Min: {residuals.min():.4f}, Max: {residuals.max():.4f}, Mean: {residuals.mean():.4f}\n")
                        f.write(f"  Residuals Std: {residuals.std():.4f}\n")
                        
                        # Additional metrics
                        f.write(f"\nAdditional Metrics:\n")
                        f.write(f"  Mean Squared Error (MSE): {mean_squared_error(y_test, y_pred):.4f}\n")
                        f.write(f"  Explained Variance: {1 - np.var(residuals) / np.var(y_test):.4f}\n")
                        
                    else:
                        f.write("No predictions available for detailed report\n")
                    
                    test_df.unpersist()
                    
                except Exception as e:
                    f.write(f"Error calculating regression metrics: {str(e)}\n")
                
                f.write("\n")
    
    print(f"{task_id}: {report_file}")

# ============================================================================
# 7. EXPORT SUMMARY AND BEST MODELS
# ============================================================================

# Helper function to find best models safely
def find_best_classification_model(results):
    """Find best classification model by AUC"""
    if not results:
        return None
    best_auc = -1
    best_model = None
    for model_name, metrics in results.items():
        if metrics['AUC'] > best_auc:
            best_auc = metrics['AUC']
            best_model = (model_name, metrics)
    return best_model

def find_best_regression_model(results):
    """Find best regression model by lowest RMSE"""
    if not results:
        return None
    best_rmse = float('inf')
    best_model = None
    for model_name, metrics in results.items():
        if metrics['RMSE'] < best_rmse:
            best_rmse = metrics['RMSE']
            best_model = (model_name, metrics)
    return best_model

# Export overall summary
summary_file = f"{results_dir}/overall_summary_{timestamp}.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
    f.write("OVERALL TRAINING SUMMARY\n")
    f.write("=" * 80 + "\n\n")
    
    # Use builtin sum explicitly
    total_time_seconds = builtins.sum(times['duration_seconds'] for times in training_times.values())
    
    f.write(f"Training Session: {timestamp}\n")
    f.write(f"Total Training Time: {total_time_seconds:.2f} seconds ({total_time_seconds/60:.2f} minutes)\n")
    f.write(f"Number of Tasks: {builtins.len(tasks)}\n")
    f.write(f"Results Directory: {results_dir}\n")
    f.write(f"Checkpoints Directory: {checkpoints_dir}\n\n")
    
    f.write("BEST PERFORMING MODELS\n")
    f.write("-" * 50 + "\n")
    
    for task_id, task_info in tasks.items():
        results = all_results[task_id]
        if not results:
            continue
            
        f.write(f"\n{task_info['description']}:\n")
        
        if task_info["type"] == "classification":
            # Find best model by AUC using helper function
            try:
                best_model = find_best_classification_model(results)
                if best_model:
                    model_name, metrics = best_model
                    f.write(f"Best Model: {model_name}\n")
                    f.write(f"AUC: {metrics['AUC']:.4f} | Accuracy: {metrics['Accuracy']:.4f} | F1: {metrics['F1']:.4f}\n")
                else:
                    f.write(f"No valid models found\n")
            except Exception as e:
                f.write(f"Error finding best model: {str(e)}\n")
        else:
            # Find best model by lowest RMSE using helper function
            try:
                best_model = find_best_regression_model(results)
                if best_model:
                    model_name, metrics = best_model
                    f.write(f"Best Model: {model_name}\n")
                    f.write(f"RMSE: {metrics['RMSE']:.4f} | MAE: {metrics['MAE']:.4f} | RÂ²: {metrics['RÂ²']:.4f}\n")
                else:
                    f.write(f"No valid models found\n")
            except Exception as e:
                f.write(f"Error finding best model: {str(e)}\n")
        
        f.write(f"Training Time: {training_times[task_id]['duration_minutes']:.2f} minutes\n")

print(f"Overall summary: {summary_file}")

# ============================================================================
# 8. SAVE TRAINING METADATA
# ============================================================================

# Save metadata as pickle for easy loading (exclude PySpark objects)
metadata = {
    'timestamp': timestamp,
    'training_times': training_times,
    'all_results': all_results,
    'tasks': tasks,
    'feature_cols': feature_cols,
    'total_records': df.count(),
    'results_dir': results_dir,
    'checkpoints_dir': checkpoints_dir
}

metadata_file = f"{results_dir}/training_metadata_{timestamp}.pkl"
try:
    with open(metadata_file, 'wb') as f:
        pickle.dump(metadata, f)
    print(f"Metadata saved: {metadata_file}")
except Exception as e:
    print(f"Error saving metadata: {str(e)}")

print(f"\nAll results exported to: {results_dir}")
print(f"All model checkpoints saved to: {checkpoints_dir}")

# ============================================================================
# 9. FINAL SUMMARY DISPLAY
# ============================================================================

print("\n" + "=" * 80)
print("FINAL TRAINING SUMMARY")
print("=" * 80)

# Use builtin sum explicitly
total_training_time = builtins.sum(times['duration_seconds'] for times in training_times.values())
print(f"Total Training Time: {total_training_time:.2f} seconds ({total_training_time/60:.2f} minutes)")
print(f"Results saved to: {results_dir}")
print(f"Model checkpoints: {checkpoints_dir}")

for task_id, task_info in tasks.items():
    print(f"\n{task_info['description']}:")
    results = all_results[task_id]
    
    if task_info["type"] == "classification":
        if results:
            try:
                best_model = find_best_classification_model(results)
                if best_model:
                    model_name, metrics = best_model
                    print(f"Best: {model_name} (AUC: {metrics['AUC']:.4f})")
                else:
                    print(f"No valid models")
            except:
                print(f"Error finding best model")
        else:
            print(f"No successful models")
    else:
        if results:
            try:
                best_model = find_best_regression_model(results)
                if best_model:
                    model_name, metrics = best_model
                    print(f"Best: {model_name} (RMSE: {metrics['RMSE']:.4f})")
                else:
                    print(f"No valid models")
            except:
                print(f"Error finding best model")
        else:
            print(f"No successful models")
    
    print(f"Training time: {training_times[task_id]['duration_minutes']:.2f} minutes")

print(f"\nTraining session completed successfully!")

# ============================================================================
# 10. HOW TO LOAD SAVED MODELS
# ============================================================================

print(f"\nTo load saved models later:")
print(f"   from pyspark.ml import Pipeline")
print(f"   model = Pipeline.load('path_to_model')")
print(f"   # Example:")
for task_id in tasks.keys():
    model_example = f"{checkpoints_dir}/{task_id}_{timestamp}/logistic_regression"
    print(f"   # model = Pipeline.load('{model_example}')")
    break
