In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import os

jar_name = "postgresql-42.6.0.jar"
jar_path = os.path.abspath(jar_name)

if not os.path.exists(jar_path):
    print(f"❌ Le fichier est toujours absent du dossier {os.getcwd()}")
else:
    print(f"✅ Fichier trouvé à : {jar_path}")

spark = SparkSession.builder \
    .appName("PostgresFinalTest") \
    .config("spark.jars", jar_path) \
    .config("spark.driver.extraClassPath", jar_path) \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

url = "jdbc:postgresql://localhost:5432/silver_data"
properties = {
        "user": "silver_user",
        "password": "silver_pass123", # Vérifiez bien ce mot de passe dans votre docker-compose
        "driver": "org.postgresql.Driver"
    }
    
# Charger les données depuis Postgres vers un nouveau DataFrame
df_silver = spark.read.jdbc(url=url, table="silver_data", properties=properties)

# Afficher les 5 premières lignes
df_silver.show(5)
    

✅ Fichier trouvé à : /home/saida/Smart-LogiTrack/ml/notebooks/postgresql-42.6.0.jar


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/14 15:28:38 WARN Utils: Your hostname, DESKTOP-A9FN519, resolves to a loopback address: 127.0.1.1; using 172.30.212.146 instead (on interface eth0)
26/01/14 15:28:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
26/01/14 15:28:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+------------------+-----------+-----------+-----+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|     trip_duration|pickup_hour|day_of_week|month|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+------------------+-----------+--

In [2]:
df_silver.describe().show()

26/01/13 20:11:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+-----------------+-------------------+---------------------+------------------+--------------------+------------------+-------------------+--------------------+------------------+------------------+------------------+
|summary|           VendorID|   passenger_count|     trip_distance|        RatecodeID|store_and_fwd_flag|      PULocationID|      DOLocationID|       payment_type|       fare_amount|             extra|            mta_tax|       tip_amount|       tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|       Airport_fee| cbd_congestion_fee|       trip_duration|       pickup_hour|       day_of_week|             month|
+-------+-------------------+------------------+------------------+------------------+------------------+-----------------

In [3]:

features_to_keep = [
    'trip_distance',
    'RatecodeID',
    'tolls_amount',
    'fare_amount',
    'tip_amount',
    'total_amount',
    'Airport_fee',
    'pickup_hour',
    'day_of_week',
    "trip_duration"
]


print(f"\n✅ Features sélectionnées: {len(features_to_keep) - 1}")
print(f"✅ Dataset prêt avec {df_silver.count():,} lignes")


✅ Features sélectionnées: 9


[Stage 4:>                                                          (0 + 1) / 1]

✅ Dataset prêt avec 2,616,166 lignes


                                                                                

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, round as spark_round
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from datetime import datetime
import json
import pickle


In [5]:
feature_columns = [
    'trip_distance',
    'RatecodeID',
    'tolls_amount',
    'fare_amount',
    'tip_amount',
    'total_amount',
    'Airport_fee',
    'pickup_hour',
    'day_of_week',
    
]
target_column = 'trip_duration'
df_Gold = df_silver.select(feature_columns + [target_column])

In [6]:
train_data, test_data = df_Gold.randomSplit([0.8, 0.2], seed=42)
print(f"Nombre d'exemples d'entraînement: {train_data.count()}")
print(f"Nombre d'exemples de test: {test_data.count()}")

                                                                                

Nombre d'exemples d'entraînement: 2092729


[Stage 10:>                                                         (0 + 1) / 1]

Nombre d'exemples de test: 523437


                                                                                

In [7]:
train_data.cache()
test_data.cache()

DataFrame[trip_distance: double, RatecodeID: int, tolls_amount: double, fare_amount: double, tip_amount: double, total_amount: double, Airport_fee: double, pickup_hour: int, day_of_week: int, trip_duration: double]

In [8]:
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol='features_raw'
)

In [9]:
scaler = StandardScaler(
    inputCol='features_raw',
    outputCol='features',
    withStd=True,
    withMean=False
)

In [10]:
# lr = LinearRegression(
#     featuresCol='features',
#     labelCol=target_column,
#     maxIter=10,
#     regParam=0.3,
#     elasticNetParam=0.8
# )

rf = RandomForestRegressor(
    featuresCol='features',
    labelCol=target_column,
    numTrees=50,
    maxDepth=10,
    seed=42
)

In [11]:
pipeline_lr = Pipeline(stages=[assembler, scaler, rf])

In [12]:
model_lr = pipeline_lr.fit(train_data)

26/01/12 14:39:51 WARN DAGScheduler: Broadcasting large task binary with size 1047.7 KiB
26/01/12 14:41:54 WARN DAGScheduler: Broadcasting large task binary with size 2028.5 KiB
26/01/12 14:44:44 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
26/01/12 14:48:02 WARN DAGScheduler: Broadcasting large task binary with size 1197.9 KiB
26/01/12 14:48:16 WARN DAGScheduler: Broadcasting large task binary with size 7.3 MiB
26/01/12 14:51:52 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
                                                                                

In [13]:
predictions_lr = model_lr.transform(test_data)

In [14]:
evaluator_mae = RegressionEvaluator(labelCol=target_column, predictionCol='prediction', metricName='mae')
evaluator_rmse = RegressionEvaluator(labelCol=target_column, predictionCol='prediction', metricName='rmse')
evaluator_r2 = RegressionEvaluator(labelCol=target_column, predictionCol='prediction', metricName='r2')

In [15]:
mae_lr = evaluator_mae.evaluate(predictions_lr)
rmse_lr = evaluator_rmse.evaluate(predictions_lr)
r2_lr = evaluator_r2.evaluate(predictions_lr)

                                                                                

In [16]:
print(f"\n✅ RÉSULTATS - Régression Linéaire:")
print(f"   MAE:  {mae_lr:.2f} minutes")
print(f"   RMSE: {rmse_lr:.2f} minutes")
print(f"   R²:   {r2_lr:.4f}")


✅ RÉSULTATS - Régression Linéaire:
   MAE:  1.09 minutes
   RMSE: 1.76 minutes
   R²:   0.9374


### GBTRegressor

In [17]:
import joblib
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features"
)


gbt = GBTRegressor(
    featuresCol="features",
    labelCol=target_column,
    maxIter=50,        # nombre d'arbres
    maxDepth=6,        # profondeur (⚠️ augmente = plus lent)
    stepSize=0.1,      # learning rate
    subsamplingRate=0.8,
    seed=42
)

pipeline_gbt = Pipeline(stages=[assembler, gbt])


model_gbt = pipeline_gbt.fit(train_data)


predictions_gbt = model_gbt.transform(test_data)



                                                                                

In [18]:
evaluator_mae = RegressionEvaluator(
    labelCol=target_column,
    predictionCol="prediction",
    metricName="mae"
)

evaluator_rmse = RegressionEvaluator(
    labelCol=target_column,
    predictionCol="prediction",
    metricName="rmse"
)

evaluator_r2 = RegressionEvaluator(
    labelCol=target_column,
    predictionCol="prediction",
    metricName="r2"
)

print("\n✅ RÉSULTATS - GBT")
print(f"MAE  : {evaluator_mae.evaluate(predictions_gbt):.2f}")
print(f"RMSE : {evaluator_rmse.evaluate(predictions_gbt):.2f}")
print(f"R²   : {evaluator_r2.evaluate(predictions_gbt):.4f}")



✅ RÉSULTATS - GBT


26/01/12 15:11:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

MAE  : 0.99


                                                                                

RMSE : 1.61


[Stage 650:>                                                        (0 + 1) / 1]

R²   : 0.9474


                                                                                

In [23]:
# Sauvegarder le modèle
model_gbt.write().overwrite().save('../models/eta_spark')

                                                                                

In [None]:
test_df = spark.createDataFrame([
    (9.55, 1, 0, 41.5, 10.25, 63.25, 1.75, 13, 6)
],
[
    'trip_distance',
    'RatecodeID',
    'tolls_amount',
    'fare_amount',
    'tip_amount',
    'total_amount',
    'Airport_fee',
    'pickup_hour',
    'day_of_week'
])

from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("../models/eta_spark")

predictions = loaded_model.transform(test_df)
predictions.show()
predictions.select("prediction").show()
pred_value = predictions.select("prediction").collect()[0]["prediction"]
print(pred_value)



                                                                                

+-------------+----------+------------+-----------+----------+------------+-----------+-----------+-----------+--------------------+-----------------+
|trip_distance|RatecodeID|tolls_amount|fare_amount|tip_amount|total_amount|Airport_fee|pickup_hour|day_of_week|            features|       prediction|
+-------------+----------+------------+-----------+----------+------------+-----------+-----------+-----------+--------------------+-----------------+
|         9.55|         1|           0|       41.5|     10.25|       63.25|       1.75|         13|          6|[9.55,1.0,0.0,41....|26.23872154637569|
+-------------+----------+------------+-----------+----------+------------+-----------+-----------+-----------+--------------------+-----------------+



                                                                                

+-----------------+
|       prediction|
+-----------------+
|26.23872154637569|
+-----------------+





26.23872154637569


                                                                                