# Model Training 

In [1]:

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pickle
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

## 1 . Inisialisation Spark

In [2]:
jdbc_path = "/mnt/c/Users/user/Desktop/taxi-eta-prediction/libs/postgresql-42.6.0.jar"
spark = SparkSession.builder \
    .appName("ETA_Model_Training") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars", jdbc_path) \
    .getOrCreate()

print("‚úÖ Spark Session cr√©√©e")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/13 14:21:24 WARN Utils: Your hostname, DESKTOP-Q0IAP8C, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/13 14:21:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
26/01/13 14:21:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


‚úÖ Spark Session cr√©√©e


## 2 . Chargement de donnes : 

In [3]:

postgres_url = "jdbc:postgresql://localhost:5432/silver_data"
postgres_properties = {
    "user": "silver_user",
    "password": "silver_pass123",
    "driver": "org.postgresql.Driver"
}

print("üì• Chargement des donn√©es Silver...")
df = spark.read.jdbc(
    url=postgres_url,
    table="silver_table",
    properties=postgres_properties
)

print(f"‚úÖ {df.count()} lignes charg√©es")
print(f"üìä Colonnes disponibles: {df.columns}")

# Afficher un aper√ßu
df.show(5)
df.printSchema()


üì• Chargement des donn√©es Silver...


                                                                                

‚úÖ 2630868 lignes charg√©es
üìä Colonnes disponibles: ['VendorID', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee', 'cbd_congestion_fee', 'trip_duration', 'pickup_hour', 'pickup_day_of_week', 'pickup_day', 'pickup_month', 'time_of_day', 'is_weekend', 'is_rush_hour']


26/01/13 14:22:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------+-----------+------------------+----------+------------+-----------+----------+------------+
|VendorID|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|trip_duration|pickup_hour|pickup_day_of_week|pickup_day|pickup_month|time_of_day|is_weekend|is_rush_hour|
+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------+-----------+------------------+-----

In [4]:
df.count()

                                                                                

2630868

## Drop useless colimn 

In [5]:
columns_to_drop=['VendorID','store_and_fwd_flag','total_amount','extra','mta_tax','congestion_surcharge','improvement_surcharge','is_rush_hour','time_of_day','cbd_congestion_fee','payment_type','pickup_day','pickup_day_of_week','PULocationID','DOLocationID','pickup_month','is_weekend']

df=df.drop(*columns_to_drop)

In [6]:
df.columns

['passenger_count',
 'trip_distance',
 'RatecodeID',
 'fare_amount',
 'tip_amount',
 'tolls_amount',
 'Airport_fee',
 'trip_duration',
 'pickup_hour']

## 3 .  SPLIT TRAIN/TEST

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import pickle
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# ============================================
# 2. IDENTIFICATION DES FEATURES (UPDATED)


# Features num√©riques continues (√† normaliser)
numerical_features = [
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'tip_amount',
    'tolls_amount',
    'Airport_fee',
    'pickup_hour'
]

# Features cat√©gorielles (√† encoder)
categorical_features = [
    'RatecodeID'
]

# Target (IMPORTANT: trip_duration est maintenant la TARGET, pas une feature)
target_column = 'trip_duration'

all_features = numerical_features + categorical_features

print(f"\nüéØ CONFIGURATION DES FEATURES:")
print(f"   Num√©riques: {len(numerical_features)} features - {numerical_features}")
print(f"   Cat√©gorielles: {len(categorical_features)} features - {categorical_features}")
print(f"   Target: {target_column}")
print(f"   Total features: {len(all_features)}")

# ============================================ 
# 5. SPLIT TRAIN/TEST
# ============================================

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df = train_df.cache()
test_df = test_df.cache()

print(f"\nüìä SPLIT DES DONN√âES:")
print(f"   Train: {train_df.count():,} lignes ({train_df.count()/df.count()*100:.1f}%)")
print(f"   Test:  {test_df.count():,} lignes ({test_df.count()/df.count()*100:.1f}%)")

# ============================================
# 6. CR√âATION DES PIPELINES PREPROCESSING
# ============================================

print("\nüîß CR√âATION DES STAGES DE PREPROCESSING...")

def create_preprocessing_stages():
    """Cr√©e les stages de preprocessing r√©utilisables"""
    stages = []
    
    # √âTAPE 1: Encoder RatecodeID (cat√©gorielle)
    indexer = StringIndexer(
        inputCol='RatecodeID',
        outputCol='RatecodeID_indexed',
        handleInvalid='keep'
    )
    stages.append(indexer)
    
    encoder = OneHotEncoder(
        inputCols=['RatecodeID_indexed'],
        outputCols=['RatecodeID_encoded'],
        dropLast=True
    )
    stages.append(encoder)
    
    # √âTAPE 2: Assembler features num√©riques
    numerical_assembler = VectorAssembler(
        inputCols=numerical_features,
        outputCol='numerical_features_vec',
        handleInvalid='skip'
    )
    stages.append(numerical_assembler)
    
    # √âTAPE 3: Normaliser les features num√©riques
    scaler = StandardScaler(
        inputCol='numerical_features_vec',
        outputCol='numerical_features_scaled',
        withMean=True,
        withStd=True
    )
    stages.append(scaler)
    
    # √âTAPE 4: Assembler toutes les features (num√©riques + cat√©gorielles)
    final_assembler = VectorAssembler(
        inputCols=['numerical_features_scaled', 'RatecodeID_encoded'],
        outputCol='features',
        handleInvalid='skip'
    )
    stages.append(final_assembler)
    
    return stages

# ============================================
# 7. D√âFINITION DES MOD√àLES
# ============================================

print("\nü§ñ CONFIGURATION DES MOD√àLES...")

models = {
    'RandomForest': RandomForestRegressor(
        featuresCol='features',
        labelCol=target_column,
        predictionCol='prediction',
        numTrees=100,
        maxDepth=10,
        minInstancesPerNode=5,
        subsamplingRate=0.8,
        featureSubsetStrategy='sqrt',
        seed=42
    ),
    
    'GradientBoostedTrees': GBTRegressor(
        featuresCol='features',
        labelCol=target_column,
        predictionCol='prediction',
        maxIter=100,
        maxDepth=8,
        stepSize=0.1,
        subsamplingRate=0.8,
        featureSubsetStrategy='sqrt',
        seed=42
    ),
    
    'LinearRegression': LinearRegression(
        featuresCol='features',
        labelCol=target_column,
        predictionCol='prediction',
        maxIter=100,
        regParam=0.1,
        elasticNetParam=0.5,
        standardization=False  # Already scaled
    )
}

print(f"‚úÖ {len(models)} mod√®les configur√©s: {list(models.keys())}")

# ============================================
# 8. ENTRA√éNEMENT ET √âVALUATION
# ============================================

results = {}
trained_models = {}

for model_name, model in models.items():
    print(f"\n{'='*70}")
    print(f"üöÄ ENTRA√éNEMENT: {model_name}")
    print(f"{'='*70}")
    
    # Cr√©er le pipeline
    preprocessing_stages = create_preprocessing_stages()
    pipeline = Pipeline(stages=preprocessing_stages + [model])
    
    # Entra√Æner
    start_time = datetime.now()
    trained_model = pipeline.fit(train_df)
    training_time = (datetime.now() - start_time).total_seconds()
    
    print(f"‚è±Ô∏è  Temps d'entra√Ænement: {training_time:.2f}s")
    
    # Pr√©dire
    predictions = trained_model.transform(test_df)
    
    # √âvaluer
    evaluator_rmse = RegressionEvaluator(
        labelCol=target_column,
        predictionCol='prediction',
        metricName='rmse'
    )
    evaluator_mae = RegressionEvaluator(
        labelCol=target_column,
        predictionCol='prediction',
        metricName='mae'
    )
    evaluator_r2 = RegressionEvaluator(
        labelCol=target_column,
        predictionCol='prediction',
        metricName='r2'
    )
    
    rmse = evaluator_rmse.evaluate(predictions)
    mae = evaluator_mae.evaluate(predictions)
    r2 = evaluator_r2.evaluate(predictions)
    
    print(f"\nüìà M√âTRIQUES:")
    print(f"   RMSE: {rmse:.2f} minutes")
    print(f"   MAE:  {mae:.2f} minutes")
    print(f"   R¬≤:   {r2:.4f}")
    
    # Sauvegarder les r√©sultats
    results[model_name] = {
        'rmse': rmse,
        'mae': mae,
        'r2': r2,
        'training_time': training_time,
        'predictions': predictions
    }
    trained_models[model_name] = trained_model

# ============================================
# 9. COMPARAISON DES MOD√àLES
# ============================================

print(f"\n{'='*70}")
print("üìä COMPARAISON DES MOD√àLES")
print(f"{'='*70}")

comparison_df = pd.DataFrame({
    'Mod√®le': list(results.keys()),
    'RMSE': [results[m]['rmse'] for m in results.keys()],
    'MAE': [results[m]['mae'] for m in results.keys()],
    'R¬≤': [results[m]['r2'] for m in results.keys()],
    'Temps (s)': [results[m]['training_time'] for m in results.keys()]
})

comparison_df = comparison_df.sort_values('RMSE')
print(comparison_df.to_string(index=False))

best_model_name = comparison_df.iloc[0]['Mod√®le']
print(f"\nüèÜ MEILLEUR MOD√àLE: {best_model_name}")

# ============================================
# 10. SAUVEGARDE DU MEILLEUR MOD√àLE
# ============================================

print(f"\nüíæ Sauvegarde du meilleur mod√®le: {best_model_name}")
best_model = trained_models[best_model_name]

# Sauvegarder le mod√®le PySpark
model_path = f"./models/{best_model_name}_model"
best_model.write().overwrite().save(model_path)
print(f"‚úÖ Mod√®le sauvegard√©: {model_path}")#x

üìä Chargement des donn√©es depuis PostgreSQL...


                                                                                

‚úÖ Donn√©es charg√©es: 2,630,868 lignes
üìã Colonnes: ['VendorID', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee', 'cbd_congestion_fee', 'trip_duration', 'pickup_hour', 'pickup_day_of_week', 'pickup_day', 'pickup_month', 'time_of_day', 'is_weekend', 'is_rush_hour']
root
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = tr

                                                                                

   Train: 2,104,448 lignes (80.0%)


                                                                                

   Test:  526,420 lignes (20.0%)

üîß CR√âATION DES STAGES DE PREPROCESSING...

ü§ñ CONFIGURATION DES MOD√àLES...
‚úÖ 3 mod√®les configur√©s: ['RandomForest', 'GradientBoostedTrees', 'LinearRegression']

üöÄ ENTRA√éNEMENT: RandomForest


26/01/13 14:27:48 WARN DAGScheduler: Broadcasting large task binary with size 1060.4 KiB
26/01/13 14:28:50 WARN DAGScheduler: Broadcasting large task binary with size 1898.9 KiB
26/01/13 14:30:06 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
26/01/13 14:31:33 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
26/01/13 14:33:59 WARN DAGScheduler: Broadcasting large task binary with size 1587.8 KiB
26/01/13 14:34:01 WARN DAGScheduler: Broadcasting large task binary with size 9.7 MiB
26/01/13 14:36:46 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
                                                                                

‚è±Ô∏è  Temps d'entra√Ænement: 775.07s


                                                                                


üìà M√âTRIQUES:
   RMSE: 2.08 minutes
   MAE:  1.35 minutes
   R¬≤:   0.9166

üöÄ ENTRA√éNEMENT: GradientBoostedTrees


26/01/13 14:41:29 WARN DAGScheduler: Broadcasting large task binary with size 1007.3 KiB
26/01/13 14:41:29 WARN DAGScheduler: Broadcasting large task binary with size 1024.4 KiB
26/01/13 14:41:31 WARN DAGScheduler: Broadcasting large task binary with size 1023.5 KiB
26/01/13 14:41:33 WARN DAGScheduler: Broadcasting large task binary with size 1024.0 KiB
26/01/13 14:41:33 WARN DAGScheduler: Broadcasting large task binary with size 1024.7 KiB
26/01/13 14:41:34 WARN DAGScheduler: Broadcasting large task binary with size 1026.0 KiB
26/01/13 14:41:35 WARN DAGScheduler: Broadcasting large task binary with size 1028.6 KiB
26/01/13 14:41:36 WARN DAGScheduler: Broadcasting large task binary with size 1033.0 KiB
26/01/13 14:41:37 WARN DAGScheduler: Broadcasting large task binary with size 1040.6 KiB
26/01/13 14:41:37 WARN DAGScheduler: Broadcasting large task binary with size 1054.4 KiB
26/01/13 14:41:38 WARN DAGScheduler: Broadcasting large task binary with size 1053.3 KiB
26/01/13 14:41:40 WAR

‚è±Ô∏è  Temps d'entra√Ænement: 860.82s


26/01/13 14:52:08 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                


üìà M√âTRIQUES:
   RMSE: 1.73 minutes
   MAE:  1.04 minutes
   R¬≤:   0.9421

üöÄ ENTRA√éNEMENT: LinearRegression


                                                                                

‚è±Ô∏è  Temps d'entra√Ænement: 28.87s


                                                                                


üìà M√âTRIQUES:
   RMSE: 3.77 minutes
   MAE:  2.55 minutes
   R¬≤:   0.7245

üìä COMPARAISON DES MOD√àLES
              Mod√®le     RMSE      MAE       R¬≤  Temps (s)
GradientBoostedTrees 1.729451 1.035113 0.942134 860.817631
        RandomForest 2.076438 1.352545 0.916584 775.067789
    LinearRegression 3.773379 2.548275 0.724532  28.868071

üèÜ MEILLEUR MOD√àLE: GradientBoostedTrees

üíæ Sauvegarde du meilleur mod√®le: GradientBoostedTrees


                                                                                

‚úÖ Mod√®le sauvegard√©: ./models/GradientBoostedTrees_model
