In [0]:
# Create the athletes dataset directly in Databricks
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

np.random.seed(42)
n = 1000

print("Creating athletes dataset in Databricks...")

# Create data
data = {
    'athlete_id': range(1, n + 1),
    'name': [f'Athlete_{i}' for i in range(1, n + 1)],
    'age': np.random.randint(18, 40, n),
    'height': np.random.normal(175, 10, n),
    'weight': np.random.normal(75, 12, n),
    'country': np.random.choice(['USA', 'China', 'Russia', 'Germany', 'UK', 'France', 'Japan', 'Australia'], n),
    'sport': np.random.choice(['Swimming', 'Athletics', 'Gymnastics', 'Cycling', 'Rowing'], n),
    'years_experience': np.random.randint(1, 20, n),
    'training_hours_per_week': np.random.randint(10, 40, n),
}

df_pandas = pd.DataFrame(data)

# Target variable
medals = ['Gold'] * 100 + ['Silver'] * 150 + ['Bronze'] * 200 + ['None'] * 550
np.random.shuffle(medals)
df_pandas['medal'] = medals

# Convert to Spark DataFrame
df = spark.createDataFrame(df_pandas)

# Save as table
df.write.mode("overwrite").saveAsTable("default.athletes")

print(f"Created table 'default.athletes' with {df.count()} rows")
display(df.limit(5))

Creating athletes dataset in Databricks...
Created table 'default.athletes' with 1000 rows


athlete_id,name,age,height,weight,country,sport,years_experience,training_hours_per_week,medal
1,Athlete_1,24,166.251932862553,69.87891459741175,China,Rowing,17,25,Bronze
2,Athlete_2,37,170.99627126832448,68.4979801479763,Japan,Swimming,13,31,
3,Athlete_3,32,180.09378116336106,82.59554101811791,Russia,Swimming,13,12,
4,Athlete_4,28,187.47327055231725,69.60508754129663,Australia,Swimming,7,33,
5,Athlete_5,25,184.6592200795959,87.21001474901018,Russia,Athletics,10,21,Bronze


In [0]:
# Install CodeCarbon for carbon tracking
%pip install codecarbon scikit-learn matplotlib seaborn

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from codecarbon import EmissionsTracker

In [0]:
# Load athletes table
df = spark.table("default.athletes")

# Create target variable
df = df.withColumn("won_medal", F.when(F.col("medal") != "None", 1).otherwise(0))

# Encode country using simpler approach (serverless compatible)
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

# Country encoding
country_window = Window.orderBy("country")
df = df.withColumn("country_rank", dense_rank().over(country_window))
df = df.withColumn("country_encoded", F.col("country_rank") - 1)
df = df.drop("country_rank")

# Sport encoding
sport_window = Window.orderBy("sport")
df = df.withColumn("sport_rank", dense_rank().over(sport_window))
df = df.withColumn("sport_encoded", F.col("sport_rank") - 1)
df = df.drop("sport_rank")

print(f"Loaded rows: {df.count()}")
display(df.limit(5))



Loaded rows: 1000


athlete_id,name,age,height,weight,country,sport,years_experience,training_hours_per_week,medal,won_medal,country_encoded,sport_encoded
72,Athlete_72,31,173.35464341950305,77.89485765186417,Australia,Athletics,15,19,,0,0,0
32,Athlete_32,29,201.4357158339855,76.13992501627565,Australia,Athletics,3,31,,0,0,0
28,Athlete_28,33,183.35565398349905,68.8747673708764,Australia,Athletics,13,37,,0,0,0
88,Athlete_88,18,177.0370179813885,78.65031503464105,Australia,Athletics,7,25,,0,0,0
134,Athlete_134,24,181.7225728441377,76.80163339645661,Australia,Athletics,4,27,,0,0,0


In [0]:
# Version 1: Basic features
features_v1 = df.select(
    "athlete_id",
    F.col("age").cast("float"),
    F.col("height").cast("float"),
    F.col("weight").cast("float"),
    F.col("years_experience").cast("float"),
    F.col("training_hours_per_week").cast("float"),
    F.col("country_encoded").cast("int"),
    F.col("sport_encoded").cast("int")
)

# Save as table
features_v1.write.mode("overwrite").saveAsTable("default.athlete_features_v1")

print("Feature Table V1 created")
display(features_v1.limit(5))



Feature Table V1 created


athlete_id,age,height,weight,years_experience,training_hours_per_week,country_encoded,sport_encoded
72,31.0,173.35464,77.89486,15.0,19.0,0,0
32,29.0,201.43571,76.13992,3.0,31.0,0,0
28,33.0,183.35565,68.87477,13.0,37.0,0,0
88,18.0,177.03702,78.650314,7.0,25.0,0,0
134,24.0,181.72258,76.801636,4.0,27.0,0,0


In [0]:
# Version 2: Engineered features
features_v2 = df.select(
    "athlete_id", "age", "height", "weight", "years_experience", 
    "training_hours_per_week", "country_encoded", "sport_encoded"
).withColumn("age", F.col("age").cast("float")) \
 .withColumn("bmi", F.col("weight") / F.pow(F.col("height") / 100, 2)) \
 .withColumn("experience_per_age", F.col("years_experience") / F.col("age")) \
 .withColumn("training_intensity", F.col("training_hours_per_week") / (F.col("years_experience") + 1)) \
 .withColumn("age_group", 
     F.when(F.col("age") <= 25, 0)
      .when((F.col("age") > 25) & (F.col("age") <= 30), 1)
      .when((F.col("age") > 30) & (F.col("age") <= 35), 2)
      .otherwise(3)) \
 .withColumn("height_weight_ratio", F.col("height") / F.col("weight")) \
 .select("athlete_id", "age", "bmi", "experience_per_age", "training_intensity", 
         "age_group", "height_weight_ratio", "country_encoded", "sport_encoded")

# Save as table
features_v2.write.mode("overwrite").saveAsTable("default.athlete_features_v2")

print("Feature Table V2 created")
display(features_v2.limit(5))



Feature Table V2 created


athlete_id,age,bmi,experience_per_age,training_intensity,age_group,height_weight_ratio,country_encoded,sport_encoded
72,31.0,25.92016906914373,0.4838709677419355,1.1875,2,2.2254953490547185,0,0
32,29.0,18.76460783051093,0.1034482758620689,7.75,1,2.645599083410269,0,0
28,33.0,20.486677556500418,0.3939393939393939,2.642857142857143,2,2.6621600476146328,0,0
88,18.0,25.09413875946069,0.3888888888888889,3.125,0,2.250938447015929,0,0
134,24.0,23.256946972708256,0.1666666666666666,5.4,0,2.366129010643175,0,0


In [0]:
def train_model(features_df, target_df, feature_version, n_estimators, max_depth, exp_name):
    """Train model with MLflow and CodeCarbon tracking"""
    
    print("=" * 70)
    print(f"TRAINING: {exp_name}")
    print("=" * 70)
    
    # Merge features and target
    data = features_df.merge(target_df, on="athlete_id")
    X = data.drop(["athlete_id", "won_medal"], axis=1)
    y = data["won_medal"]
    
    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    print(f"Train: {X_train.shape}, Test: {X_test.shape}")
    
    # Start CodeCarbon tracking
    tracker = EmissionsTracker(project_name=exp_name, log_level="error")
    tracker.start()
    
    # MLflow run
    with mlflow.start_run(run_name=exp_name):
        mlflow.log_param("feature_version", feature_version)
        mlflow.log_param("n_estimators", n_estimators)
        mlflow.log_param("max_depth", max_depth)
        
        # Train model
        model = RandomForestClassifier(
            n_estimators=n_estimators, 
            max_depth=max_depth, 
            random_state=42, 
            n_jobs=-1
        )
        model.fit(X_train, y_train)
        
        # Predictions
        y_pred = model.predict(X_test)
        y_proba = model.predict_proba(X_test)
        y_proba = y_proba[:, 1] if y_proba.shape[1] == 2 else y_proba[:, 0]
        
        # Calculate metrics
        acc = accuracy_score(y_test, y_pred)
        prec = precision_score(y_test, y_pred, zero_division=0)
        rec = recall_score(y_test, y_pred, zero_division=0)
        f1 = f1_score(y_test, y_pred, zero_division=0)
        roc = roc_auc_score(y_test, y_proba) if len(np.unique(y_test)) > 1 else 0.5
        
        # Log metrics
        mlflow.log_metric("accuracy", acc)
        mlflow.log_metric("precision", prec)
        mlflow.log_metric("recall", rec)
        mlflow.log_metric("f1_score", f1)
        mlflow.log_metric("roc_auc", roc)
        
        print(f"Accuracy: {acc:.4f}, F1: {f1:.4f}, ROC AUC: {roc:.4f}")
        
        # Confusion Matrix
        cm = confusion_matrix(y_test, y_pred)
        plt.figure(figsize=(6, 5))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
        plt.title(f'Confusion Matrix - {exp_name}')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        mlflow.log_figure(plt.gcf(), f"confusion_matrix_{exp_name}.png")
        plt.close()
        
        # Feature Importance
        imp = pd.DataFrame({
            'feature': X.columns, 
            'importance': model.feature_importances_
        }).sort_values('importance', ascending=False)
        
        plt.figure(figsize=(8, 6))
        sns.barplot(data=imp.head(10), x='importance', y='feature')
        plt.title(f'Feature Importance - {exp_name}')
        mlflow.log_figure(plt.gcf(), f"feature_importance_{exp_name}.png")
        plt.close()
        
        # Log model
        mlflow.sklearn.log_model(model, "model")
        
        # Stop carbon tracking
        emissions = tracker.stop()
        mlflow.log_metric("carbon_emissions_kg", emissions)
        print(f"Carbon Emissions: {emissions:.6f} kg CO2")
        print("=" * 70)
    
    return {
        "experiment_name": exp_name, 
        "feature_version": feature_version, 
        "n_estimators": n_estimators, 
        "max_depth": max_depth, 
        "accuracy": acc, 
        "f1_score": f1,
        "roc_auc": roc,
        "carbon_emissions_kg": emissions
    }

print("Function train_model ready")

Function train_model ready


In [0]:
# Load data
target_df = df.select("athlete_id", "won_medal").toPandas()
features_v1_pd = spark.table("default.athlete_features_v1").toPandas()
features_v2_pd = spark.table("default.athlete_features_v2").toPandas()

# Set MLflow experiment - IMPORTANT: Replace YOUR_EMAIL with your actual email
mlflow.set_experiment("/Users/aigul.azamat7@gmail.com/athlete_prediction")

results = []

print("\nRUNNING 4 EXPERIMENTS\n")

print("[1/4] Experiment 1: V1 + HP1...")
results.append(train_model(features_v1_pd, target_df, "v1", 100, 10, "exp1_v1_hp1"))

print("\n[2/4] Experiment 2: V1 + HP2...")
results.append(train_model(features_v1_pd, target_df, "v1", 200, 20, "exp2_v1_hp2"))

print("\n[3/4] Experiment 3: V2 + HP1...")
results.append(train_model(features_v2_pd, target_df, "v2", 100, 10, "exp3_v2_hp1"))

print("\n[4/4] Experiment 4: V2 + HP2...")
results.append(train_model(features_v2_pd, target_df, "v2", 200, 20, "exp4_v2_hp2"))

# Display results
results_df = pd.DataFrame(results)
display(results_df)


2026/02/09 21:48:15 INFO mlflow.tracking.fluent: Experiment with name '/Users/aigul.azamat7@gmail.com/athlete_prediction' does not exist. Creating a new experiment.



RUNNING 4 EXPERIMENTS

[1/4] Experiment 1: V1 + HP1...
TRAINING: exp1_v1_hp1
Train: (800, 7), Test: (200, 7)
Accuracy: 0.4750, F1: 0.2953, ROC AUC: 0.4976




Carbon Emissions: 0.000009 kg CO2

[2/4] Experiment 2: V1 + HP2...
TRAINING: exp2_v1_hp2
Train: (800, 7), Test: (200, 7)
Accuracy: 0.4850, F1: 0.3522, ROC AUC: 0.4785




Carbon Emissions: 0.000005 kg CO2

[3/4] Experiment 3: V2 + HP1...
TRAINING: exp3_v2_hp1
Train: (800, 8), Test: (200, 8)
Accuracy: 0.5400, F1: 0.3947, ROC AUC: 0.5201




Carbon Emissions: 0.000006 kg CO2

[4/4] Experiment 4: V2 + HP2...
TRAINING: exp4_v2_hp2
Train: (800, 8), Test: (200, 8)
Accuracy: 0.5350, F1: 0.4294, ROC AUC: 0.5299




Carbon Emissions: 0.000007 kg CO2


experiment_name,feature_version,n_estimators,max_depth,accuracy,f1_score,roc_auc,carbon_emissions_kg
exp1_v1_hp1,v1,100,10,0.475,0.2953020134228188,0.4975757575757575,8.71059703869429e-06
exp2_v1_hp2,v1,200,20,0.485,0.3522012578616352,0.4784848484848484,5.418676692254616e-06
exp3_v2_hp1,v2,100,10,0.54,0.3947368421052631,0.5201010101010101,6.143828597676312e-06
exp4_v2_hp2,v2,200,20,0.535,0.4294478527607362,0.5298989898989899,6.975094372777969e-06


In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
exp = client.get_experiment_by_name("/Users/aigul.azamat7@gmail.com/athlete_prediction")

runs = client.search_runs(
    experiment_ids=[exp.experiment_id],
    order_by=["metrics.f1_score DESC", "metrics.accuracy DESC"],
    max_results=1
)

if runs:
    best = runs[0]

    print(f"Name: {best.data.tags.get('mlflow.runName', '')}")
    print(f"F1 Score: {best.data.metrics.get('f1_score', 0):.4f}")
    print(f"Accuracy: {best.data.metrics.get('accuracy', 0):.4f}")
    print(f"ROC AUC: {best.data.metrics.get('roc_auc', 0):.4f}")
    print(f"Carbon Emissions: {best.data.metrics.get('carbon_emissions_kg', 0):.6f} kg CO2")    
    model = mlflow.sklearn.load_model(f"runs:/{best.info.run_id}/model")

else:
    print("No experiments found")

Name: exp4_v2_hp2
F1 Score: 0.4294
Accuracy: 0.5350
ROC AUC: 0.5299
Carbon Emissions: 0.000007 kg CO2
