In [0]:
# Importer les modules PySpark nécessaires :
# - functions : pour les opérations sur les colonnes
# - types : pour la définition des schémas
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
# Lister tous les conteneurs et fichiers disponibles dans le point de montage
# Utile pour vérifier l'accès aux données et la structure du stockage
dbutils.fs.ls("/mnt/")

[FileInfo(path='dbfs:/mnt/container/', name='container/', size=0, modificationTime=0)]

In [0]:
# Lister le contenu du conteneur
# Affiche tous les fichiers et dossiers dans le conteneur spécifié
dbutils.fs.ls("/mnt/container")

[FileInfo(path='dbfs:/mnt/container/API/', name='API/', size=0, modificationTime=1736443008000),
 FileInfo(path='dbfs:/mnt/container/Azure_SQL/', name='Azure_SQL/', size=0, modificationTime=1736113228000),
 FileInfo(path='dbfs:/mnt/container/DataBricks/', name='DataBricks/', size=0, modificationTime=1736458654000),
 FileInfo(path='dbfs:/mnt/container/api-dataset/', name='api-dataset/', size=0, modificationTime=1737398786000),
 FileInfo(path='dbfs:/mnt/container/azcopyinputdata/', name='azcopyinputdata/', size=0, modificationTime=1736091229000),
 FileInfo(path='dbfs:/mnt/container/df-tuto-instance/', name='df-tuto-instance/', size=0, modificationTime=1736806021000),
 FileInfo(path='dbfs:/mnt/container/df-tuto-instance.csv/', name='df-tuto-instance.csv/', size=0, modificationTime=1736805905000),
 FileInfo(path='dbfs:/mnt/container/fichier txt/', name='fichier txt/', size=0, modificationTime=1736090394000),
 FileInfo(path='dbfs:/mnt/container/france_traffic/', name='france_traffic/', size

In [0]:
# Lire le fichier CSV
# Note : Ajuster le chemin selon votre environnement
data = spark.read.csv('/mnt/container/france_traffic/Travel_titles_validations_in_Paris_and_suburbs.csv',
                      header=True, inferSchema=True)

# Afficher les premières lignes
# Il inclut le nombre de personnes qui ont pris le transport par jour, par station (code postal) et par catégorie de titre
data.show()

+----------+--------------------+-----------+--------------+-----------+
|      DATE|        STATION_NAME|ID_REFA_LDA|TITLE_CATEGORY|   NB_VALID|
+----------+--------------------+-----------+--------------+-----------+
|2019-07-21|    LA TOUR MAUBOURG|      71242|        NAVIGO|       1141|
|2019-07-21|          PARMENTIER|      71801|   NOT DEFINED|Less than 5|
|2019-07-21|          PARMENTIER|      71801|           TST|         97|
|2019-07-21|   PEREIRE-LEVALLOIS|      71453|           FGT|         53|
|2019-07-21|             PERNETY|     412687|         OTHER|         36|
|2019-07-21|              PICPUS|      71639|           FGT|         25|
|2019-07-21|        PIERRE CURIE|      70537|     IMAGINE R|        264|
|2019-07-21|             PIGALLE|      71409|  DAILY NAVIGO|          7|
|2019-07-21|     PLACE DES FETES|      71885|             ?|         20|
|2019-07-21|     PLACE DES FETES|      71885|     AMETHYSTE|        297|
|2019-07-21|      PLACE D'ITALIE|      71033|      

In [0]:
# Nombre de lignes et de colonnes
print(f"Nombre de lignes : {data.count()}")
print(f"Nombre de colonnes : {len(data.columns)}")

Number of rows: 883958
Number of columns: 5


In [0]:
# Afficher la structure des données
data.printSchema()

root
 |-- DATE: date (nullable = true)
 |-- STATION_NAME: string (nullable = true)
 |-- ID_REFA_LDA: integer (nullable = true)
 |-- TITLE_CATEGORY: string (nullable = true)
 |-- NB_VALID: string (nullable = true)



In [0]:
# Compter le nombre de valeurs nulles dans chaque colonne du DataFrame
for col in data.columns:
    null_count = data.filter(F.col(col).isNull()).count()
    print(f"Valeurs nulles dans {col}: {null_count}")

Null values in DATE: 0
Null values in STATION_NAME: 0
Null values in ID_REFA_LDA: 1499
Null values in TITLE_CATEGORY: 0
Null values in NB_VALID: 0


In [0]:
# Supprimer les valeurs nulles
data = data.dropna()

# Supprimer les doublons
data = data.dropDuplicates()

In [0]:
# Supprimer les lignes où ID_REFA_LDA est égal à -1
data = data.filter(F.col("ID_REFA_LDA") != -1)

# Remplacer 'Less than 5' par 5 dans la colonne NB_VALID
data = data.withColumn("NB_VALID",
    F.when(F.col("NB_VALID") == "Less than 5", 5)
    .otherwise(F.col("NB_VALID")))

In [0]:
# Convertir NB_VALID en format numérique
data = data.withColumn("NB_VALID", F.col("NB_VALID").cast("double"))

In [0]:
# Convertir la colonne DATE et extraire le mois et le jour
data = data.withColumn("DATE", F.to_timestamp("DATE"))
data = data.withColumn("month", F.month("DATE"))
data = data.withColumn("Day", F.dayofmonth("DATE"))

In [0]:
# Afficher la structure des données
data.printSchema()

root
 |-- DATE: timestamp (nullable = true)
 |-- STATION_NAME: string (nullable = true)
 |-- ID_REFA_LDA: integer (nullable = true)
 |-- TITLE_CATEGORY: string (nullable = true)
 |-- NB_VALID: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- Day: integer (nullable = true)



In [0]:
# Grouper par mois et calculer les sommes
monthly_stats = data.groupBy("month") \
    .agg(F.sum("NB_VALID").alias("total_valid"),
         F.sum("ID_REFA_LDA").alias("total_id"))
monthly_stats.show()

+-----+------------+-----------+
|month| total_valid|   total_id|
+-----+------------+-----------+
|   12| 4.5083173E7| 6186871989|
|    9|1.25184301E8|10254267287|
|    8| 9.0006039E7|10187666606|
|    7|1.19062443E8|10330256665|
|   10|1.40334259E8|10935676848|
|   11|1.33983102E8|10888002954|
+-----+------------+-----------+



In [0]:
# Créer des catégories NB en utilisant les quartiles
quartiles = data.select(
    F.min("NB_VALID").alias("min"),
    F.percentile_approx("NB_VALID", 0.25).alias("q1"),
    F.percentile_approx("NB_VALID", 0.5).alias("q2"),
    F.percentile_approx("NB_VALID", 0.75).alias("q3"),
    F.max("NB_VALID").alias("max")
).collect()[0]

# Créer les catégories en utilisant when-otherwise
data = data.withColumn("NB_Category",
    F.when(F.col("NB_VALID") <= quartiles.q1, 1)
    .when(F.col("NB_VALID") <= quartiles.q2, 2)
    .when(F.col("NB_VALID") <= quartiles.q3, 3)
    .otherwise(4)
)

In [0]:
data.printSchema()

root
 |-- DATE: timestamp (nullable = true)
 |-- STATION_NAME: string (nullable = true)
 |-- ID_REFA_LDA: integer (nullable = true)
 |-- TITLE_CATEGORY: string (nullable = true)
 |-- NB_VALID: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- NB_Category: integer (nullable = false)



In [0]:
def path_exists(path):
    try:
        dbutils.fs.ls(path)
        return True
    except Exception as e:
        return False

In [0]:
base_file_path = "/mnt/container/france_traffic/processed_data_new"
if not path_exists(base_file_path):
    print("PATH Not found")
    dbutils.fs.mkdirs(base_file_path)

PATH Not found


In [0]:
import os
data.write.csv(os.path.join(base_file_path, "OutPut_Travel_titles_validations.csv"), header=True)