In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, when
from pyspark.sql import functions as F

# Créez la session Spark
spark = SparkSession.builder \
    .appName("PublicTransport") \
    .getOrCreate()

storageaccount = "fidelistorage"
container = "public-transport"
key = "Mea8hwXHm0gWaa3zhrgriV5EjqqECFidwHiUdaN3cqf0wfdP2/SmSEUaxCrNJuApXFIqQno2zJjG+AStx50KwQ=="


spark.conf.set(
    f"fs.azure.account.key.{storageaccount}.dfs.core.windows.net", 
    key
)

# Spécifiez le chemin du répertoire raw dans Azure Data Lake Storage Gen2
raw_data_path = "abfss://public-transport@fidelistorage.dfs.core.windows.net/raw/"
processed_path = "abfss://public-transport@fidelistorage.dfs.core.windows.net/processed/"

# Utilisez dbutils.fs.ls pour lister les fichiers CSV bruts
raw_files = dbutils.fs.ls(raw_data_path)


In [None]:
import pandas as pd

# Fonction pour effectuer les transformations sur un fichier
def process_file(csv_file,count):
    # Chargez le fichier CSV brut dans un DataFrame
    raw_data = spark.read.format('csv').option('header', True).option('delimiter', ',').load(csv_file)

    # Transformations de Date: Extraire l'année, le mois, le jour et le jour de la semaine de la date
    transformed_data = raw_data.withColumn("Annee", year("Date")) \
                               .withColumn("Mois", month("Date")) \
                               .withColumn("Jour", dayofmonth("Date"))

    # Catégorisation des retards
    transformed_data = transformed_data.withColumn("CatégorieRetard",
        when(transformed_data["Delay"] <= 0, "Pas de Retard")
        .when((transformed_data["Delay"] >= 1) & (transformed_data["Delay"] <= 10), "Retard Court")
        .when((transformed_data["Delay"] >= 11) & (transformed_data["Delay"] <= 20), "Retard Moyen")
        .when(transformed_data["Delay"] > 20, "Long Retard")
        .otherwise("Autre"))

    # Détermination des heures de pointe
    seuil_passagers_pointe = 75
    transformed_data = transformed_data.withColumn("HeureDePointe",
        when(transformed_data["Passengers"] > seuil_passagers_pointe, "Oui")
        .otherwise("Non"))
    
    # Analyse des itinéraires
    analytical_data = transformed_data.groupBy("Route").agg(
        F.mean("Delay").alias("RetardMoyen"),
        F.mean("Passengers").alias("NombreMoyenPassagers"),
        F.count("*").alias("NombreTotalVoyages"))

    csv_file_path = processed_path + "data/transformed_data"+str(count)
    transformed_data.write.mode("overwrite").option("header", "true").csv(csv_file_path)

    return analytical_data


In [None]:
import os
import time

raw_csv_files = [f.path for f in raw_files if f.name.endswith(".csv")]

# Définissez le nombre de fichiers à traiter à chaque itération
batch_size = 2
count = 1
while raw_csv_files:
    # Prenez les prochains fichiers à traiter dans la liste
    batch_files = raw_csv_files[:batch_size]
    
    # Mettez à jour la liste des fichiers restants
    raw_csv_files = raw_csv_files[batch_size:]
    
    # Boucle sur les fichiers pour effectuer les transformations sur chaque fichier
    for csv_file in batch_files:
        
        # Chargez le fichier CSV brut dans un DataFrame
        raw_data = spark.read.format('csv').option('header', True).option('delimiter', ',').load(csv_file)
        
        analytical_data = process_file(csv_file,count)
        count = count + 1
        print(analytical_data)
        # Extrayez le nom du fichier (data2.csv) à partir du chemin complet
        filename = os.path.basename(csv_file)
        # print(filename)
        # Enregistrez le DataFrame transformé dans un répertoire de données traitées avec le nom de fichier
        analytical_data.write.mode("overwrite").option("header", "true").csv(processed_path + "analyse/" + filename)


    # Attendez 15 secondes avant de traiter le prochain lot de fichiers
    print("Attendez 15 secondes avant de traiter le prochain lot de fichiers !")
    time.sleep(15)

# Fermez la session Spark
# spark.stop()
