In [0]:

%pyspark
# Importation des bibliothèques
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, hour, to_timestamp, col, when, median, expr, sum as Fsum
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import (
    StringIndexer,
    OneHotEncoder,
    VectorAssembler,
    StandardScaler,
    MinMaxScaler
)

from pyspark.ml import Pipeline

In [1]:
%pyspark
# Création de la SparkSession
spark = SparkSession.builder \
    .appName("TrafficVolumeEDA") \
    .getOrCreate()

In [2]:


%pyspark
# Chargement du dataset
traffic_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/data/raw/Metro_Interstate_Traffic_Volume.csv")

traffic_df.printSchema()
traffic_df.show(5)


In [3]:

%pyspark
# Sauvegarde des données au format Parquet dans HDFS
traffic_df.write \
    .mode("overwrite") \
    .parquet("hdfs://namenode:9000/traffic_volume_parquet")

df_hdfs = spark.read.parquet("hdfs://namenode:9000/traffic_volume_parquet")
df_hdfs.show(5)

In [4]:

%pyspark
traffic_df = spark.read.parquet("hdfs://namenode:9000/traffic_volume_parquet")

# Afficher le schéma
traffic_df.printSchema()

# Afficher quelques lignes
traffic_df.show(5)

# Nombre de lignes
print(f"Nombre de lignes : {traffic_df.count()}")

In [5]:
%pyspark
# Statistiques de base
traffic_df.describe().show()

In [6]:
%pyspark
# Vérifier les nulls
traffic_df.select([Fsum(col(c).isNull().cast("int")).alias(c) for c in traffic_df.columns]).show()

In [7]:
%pyspark
# Conversion vers Pandas pour EDA
# Colonnes numériques à analyser
num_cols = ['clouds_all','rain_1h','snow_1h','temp','traffic_volume']
eda_df = traffic_df.select(num_cols).toPandas()

In [8]:
%pyspark
%matplotlib inline
# Corrélation
corr = eda_df.corr()

plt.figure(figsize=(10,4))
sns.heatmap(corr, annot=True, linewidths=0.5, cmap='twilight')
plt.show()

In [9]:
%pyspark
%matplotlib inline
# Histogrammes
eda_df.hist(figsize=(12,8))
plt.show()

In [10]:
%pyspark
%matplotlib inline
#Analyse des valeurs aberrantes
plt.figure(figsize=(14,15))

for i, c in enumerate(num_cols, 1):
    plt.subplot(3,2,i)
    plt.boxplot(eda_df[c])
    plt.title(c)

plt.show()

In [11]:
%pyspark
# Identification du maximum de pluie horaire
traffic_df.filter(col('rain_1h') == traffic_df.agg({"rain_1h": "max"}).collect()[0][0]).show()

In [12]:
%pyspark
# Filtrage des conditions de pluie très intense
very_heavy_rain_df = traffic_df.filter(col('weather_description') == 'very heavy rain').show()

In [13]:
%pyspark
# Recherche du minimum de température
traffic_df.filter(col('temp') == traffic_df.agg({"temp": "min"}).collect()[0][0]).show()

In [14]:
%pyspark
# Calculer la médiane conditionnelle
median_val = traffic_df.filter((col('weather_description')=='very heavy rain') & (col('traffic_volume') > 4000)) \
                        .approxQuantile("rain_1h", [0.5], 0.01)[0]

In [15]:
%pyspark
# Remplacer les valeurs maximales par la médiane
max_rain = traffic_df.agg({"rain_1h": "max"}).collect()[0][0]
traffic_df = traffic_df.withColumn("rain_1h",
                                   when(col("rain_1h") == max_rain, median_val)
                                   .otherwise(col("rain_1h")))

In [16]:
%pyspark
# Convertir date_time en timestamp
traffic_df = traffic_df.withColumn("date_time", to_timestamp(col("date_time"), "yyyy-MM-dd HH:mm:ss"))

In [17]:
%pyspark
# Extraire Year, Month, Day, Hour
traffic_df = traffic_df.withColumn("Year", year(col("date_time"))) \
                       .withColumn("Month", month(col("date_time"))) \
                       .withColumn("Day", dayofmonth(col("date_time"))) \
                       .withColumn("Hour", hour(col("date_time")))

In [18]:
%pyspark
# Nettoyage des données de température
traffic_df = traffic_df.withColumn("temp",
                                   when((col("Year")==2014) & (col("Month")==1) & (col("Day")==31) & (col("temp")==0.0),
                                        255.93)
                                   .otherwise(col("temp")))

In [19]:
%pyspark
#Sélection des colonnes pertinentes
traffic_df = traffic_df.select(
    'holiday','temp','rain_1h','snow_1h','Year','Month','Day','Hour',
    'weather_main','weather_description','traffic_volume'
)

In [20]:
%pyspark
#Sauvegarde des données nettoyées dans HDFS
traffic_df.write.mode("overwrite").parquet("hdfs://namenode:9000/traffic_volume_cleaned")

In [21]:
%pyspark
#Lecture des données nettoyées depuis HDFS
traffic_df = spark.read.parquet("hdfs://namenode:9000/traffic_volume_cleaned")

In [22]:
%pyspark

# 1️⃣ Créer les indexers avec outputCol temporaire
indexers = [
    StringIndexer(inputCol="holiday", outputCol="holiday_tmp", handleInvalid="keep"),
    StringIndexer(inputCol="weather_main", outputCol="weather_main_tmp", handleInvalid="keep"),
    StringIndexer(inputCol="weather_description", outputCol="weather_description_tmp", handleInvalid="keep")
]

# 2️⃣ Créer le pipeline
pipeline = Pipeline(stages=indexers)

# 3️⃣ Appliquer le pipeline
traffic_df = pipeline.fit(traffic_df).transform(traffic_df)

# 4️⃣ Supprimer les colonnes originales et renommer les colonnes encodées pour garder les mêmes noms
traffic_df = traffic_df.drop("holiday", "weather_main", "weather_description") \
                       .withColumnRenamed("holiday_tmp", "holiday") \
                       .withColumnRenamed("weather_main_tmp", "weather_main") \
                       .withColumnRenamed("weather_description_tmp", "weather_description")

# 5️⃣ Vérifier le résultat
traffic_df.select("holiday", "weather_main", "weather_description").show(10, truncate=False)


# 5️⃣ Écrire en Parquet
traffic_df.write.mode("overwrite").parquet(
    "hdfs://namenode:9000/traffic_volume_cleaned_encoded.parquet"
)

# 6️⃣ Lire plus tard (optionnel)
encoded_df = spark.read.parquet(
    "hdfs://namenode:9000/traffic_volume_cleaned_encoded.parquet"
)
encoded_df.show(5)