In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# Initialisation de Spark
spark = SparkSession.builder.appName("NYC_Taxi_Trip_Analysis")  \
    .config("spark.master","spark://spark-master:7077")\
    .getOrCreate()

In [4]:
# Charger les données depuis HDFS (ou une autre source)
data_path = "hdfs://namenode:9000/user/data/tripdata.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

In [5]:
# Afficher les colonnes pour comprendre la structure des données
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [6]:
# Filtrer et sélectionner les colonnes utiles
df = df.select(
    "trip_distance",   # Distance du trajet
    "passenger_count", # Nombre de passagers
    "fare_amount",     # Montant payé
    "payment_type",    # Type de paiement (catégorique)
    "tip_amount",      # Pourboire (valeur cible)
)

In [7]:
# Supprimer les lignes avec des valeurs nulles
df = df.dropna()

In [9]:
# Encodage des variables catégoriques (ex. payment_type)
indexer = StringIndexer(inputCol="payment_type", outputCol="payment_type_index")
df = indexer.fit(df).transform(df)

In [10]:
# Assembleur pour créer un vecteur de features
assembler = VectorAssembler(
    inputCols=["trip_distance", "passenger_count", "fare_amount", "payment_type_index"],
    outputCol="features"
)
df = assembler.transform(df)

In [11]:
# Diviser les données en ensembles d'entraînement et de test
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [12]:
# Modèle de régression linéaire pour prédire le montant des pourboires
lr = LinearRegression(featuresCol="features", labelCol="tip_amount")
model = lr.fit(train_data)

In [13]:
# Évaluation sur l'ensemble de test
predictions = model.transform(test_data)

In [14]:
# Évaluation des performances du modèle
evaluator = RegressionEvaluator(
    labelCol="tip_amount", 
    predictionCol="prediction", 
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 1.781677257048296


In [15]:
# Résultats du modèle
predictions.select("trip_distance", "fare_amount", "tip_amount", "prediction").show(10)

+-------------+-----------+----------+-------------------+
|trip_distance|fare_amount|tip_amount|         prediction|
+-------------+-----------+----------+-------------------+
|          0.0|        0.0|       0.0|-3.3664751490020435|
|          0.0|        1.0|       0.0|-1.0226114205335999|
|          0.0|        2.5|       0.0|-3.0870190477283823|
|          0.0|        8.8|       0.0| 2.0813729033992026|
|          0.0|       19.5|       0.0|  3.277445016850472|
|          0.0|      -68.0|       0.0|-10.979599898782944|
|          0.0|      -52.0|       0.0| -9.191080850631511|
|          0.0|      -10.8|       0.0| -6.817725589600556|
|          0.0|       -3.0|       0.0| -3.713741265667753|
|          0.0|       -2.5|       0.0|-1.4257687574540419|
+-------------+-----------+----------+-------------------+
only showing top 10 rows



In [None]:
# Arrêter la session Spark
spark.stop()