In [0]:
def transform_dataframe(df):
    # Extract year
    df = df.withColumn("Year", year(df["Date"]))
    # Extract month
    df = df.withColumn("Month", month(df["Date"]))
    # Extract day
    df = df.withColumn("Day", dayofmonth(df["Date"]))
    # Extract day of week
    df = df.withColumn("DayOfWeek", dayofweek(df["Date"]))
    # Extract duration (calculate the duration in minutes)
    df = df.withColumn("Duration", expr("(unix_timestamp(ArrivalTime, 'HH:mm') - unix_timestamp(DepartureTime, 'HH:mm')) / 60"))
    # Calculate average passengers
    avgPassengers = df.select(avg("Passengers")).first()[0]
    # Extract passengers traffic condition
    df = df.withColumn("PassengersTraffic", expr("CASE WHEN Passengers <= {0} THEN 'Non' ELSE 'Oui' END".format(avgPassengers)))
    return df

def calcul_avg(df):
    df = df.groupBy("Route").agg(avg("Passengers").alias("AvgPassengers"),avg("Delay").alias("AvgDelay"),count("Route").alias("RouteCount"))
    return df


In [0]:
def get_file_path(storage_account_name,storage_account_access_key,container_name):

    spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_access_key)

    raw = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/public_transport_data/raw/"
    processed = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/public_transport_data/processed/"
    archived = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/public_transport_data/archived/"
    
    raw_files = dbutils.fs.ls(raw)
    processed_files = dbutils.fs.ls(processed)
    archived_files = dbutils.fs.ls(archived)

    return [raw_files, processed_files, archived_files]

In [0]:
from pyspark.sql.functions import year,month,dayofmonth,dayofweek,to_timestamp,from_unixtime,unix_timestamp,expr,avg,count,when,col

storage_account_name = "tarifihicham1cs"
storage_account_access_key = "OCGL4AOQKWaFu6lezWKGDCVXDe7534tiifLMFUgdrPm6YJ3Vff3CMX5EGbxwIXGgBkdqnO6xomBP+ASti5On2w=="
container_name = "tarifihichamcontainer"

files_paths = get_file_path(storage_account_name,storage_account_access_key,container_name)
files_processed = []
for processed in files_paths[1]:
    files_processed.append(processed.name)

counter = 0
for raw in files_paths[0]:
    if (raw.name not in files_processed) and (counter < 2):
        filepath = dbutils.fs.ls(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/public_transport_data/raw/"+raw.name)
        for filename in filepath:
            if filename.name.endswith(".csv"):
                
                df = spark.read.format("csv")\
                    .option("header", "true")\
                    .load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/public_transport_data/raw/{raw.name}/{filename.name}")
                    
                df = transform_dataframe(df)
                dfm = calcul_avg(df)
                
                # Reduce the number of partitions to one
                df = df.coalesce(1)
                dfm = dfm.coalesce(1)
                # Export csv file processed
                df.write.format("csv")\
                .option("header", "true")\
                .mode("overwrite")\
                .save(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/public_transport_data/processed/{raw.name}data")
                # Export csv file of analysing
                dfm.write.format("csv")\
                .option("header", "true")\
                .mode("overwrite")\
                .save(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/public_transport_data/processed/{raw.name}analyse")
                counter = counter + 1