In [None]:
# Databricks notebook source
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType,IntegerType
from pyspark.sql.functions import to_date, year, month, day, hour, minute, when, avg, regexp_replace, mean, count, round
from pyspark.sql import SparkSession


# Mounting data lake
storageAccountName = "azemourstorage"
storageAccountAccessKey = "kEjUrj5seRnPiSRkcQRPmozB610QhZLtZn9GrRhZk/D76nVsr6DNejvGRq0wEQWFiqcJjWgWi7cQ+AStydUbmQ=="
sasToken = "?sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2023-09-27T17:28:21Z&st=2023-09-27T09:28:21Z&spr=https&sig=EfKk2ZhRwqQFy228Dt7Z19oDuI61hNTNiQkAinm2nBc%3D"
blobContainerName = "public-transport-data"
mountPoint = "/mnt/public-transport-data/"
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
  try:
    dbutils.fs.mount(
      source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
      mount_point = mountPoint,
      extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
    )
    print("mount succeeded!")
  except Exception as e:
    print("mount exception", e)

# Define Azure Blob Storage paths
raw_folder_path = "/mnt/public-transport-data/raw/"
processed_folder_path = "/mnt/public-transport-data/processed/"
csv_file_path = "/mnt/public-transport-data/processed_files/processing_state.csv"

# Function to extract the month number from the filename
def extract_month(filename):
    parts = filename.split("_")
    for part in parts:
        if part.isdigit():
            return int(part)
    return 0  # Return 0 if month number is not found

# List to keep track of imported datasets
imported_datasets = []

# Load the existing dataset names from the CSV file
existing_datasets = set()
try:
    # Read existing dataset names from the CSV file
    existing_df = spark.read.option("header", True).csv(csv_file_path)
    existing_datasets = set(existing_df.select("name_file").rdd.flatMap(lambda x: x).collect())
except Exception as e:
    print("Error reading existing datasets:", str(e))

# Get a list of files in the 'raw' folder in Azure Blob Storage
files_in_raw_folder = dbutils.fs.ls(raw_folder_path)
data_files = [file.name for file in files_in_raw_folder if file.isFile() and file.name not in existing_datasets]

# Sort the data files by their numeric part (e.g., month)
data_files.sort(key=lambda x: extract_month(x))

# Function to import a new dataset
def import_new_dataset():
    if data_files:
        # Get the next dataset to import
        dataset_to_import = data_files.pop(0)
        
        # Check if dataset is already imported
        if dataset_to_import in existing_datasets:
            print(f"Dataset {dataset_to_import} already imported. Skipping...")
        else:
            # Update the list of imported datasets
            imported_datasets.append(dataset_to_import)
            
            # Save dataset name and current date to the CSV file
            save_to_csv(dataset_to_import, datetime.now().strftime('%Y-%m-%d'))
            
            # Copy the dataset to the 'processed' folder
            copy_to_processed(dataset_to_import)
            
            # TODO: Import the dataset (use dataset_to_import)
            # Replace the following print statement with your dataset import logic
            print(f"Imported dataset: {dataset_to_import}")
    else:
        print("No more datasets to import.")

# Function to save dataset name and date to the CSV file
def save_to_csv(dataset_name, import_date):
    try:
        # Create a DataFrame with the new dataset import record
        new_record_df = spark.createDataFrame([(dataset_name, import_date)], ["name_file", "date"])
        
        # Append the new record to the existing CSV file
        existing_df = spark.read.option("header", True).csv(csv_file_path)
        updated_df = existing_df.union(new_record_df)
        updated_df.write.option("header", True).mode("overwrite").csv(csv_file_path)
    except Exception as e:
        print("Error saving to CSV:", str(e))

# Function to copy the dataset to the 'processed' folder
def copy_to_processed(dataset_name):
    source_path = raw_folder_path + dataset_name
    destination_path = processed_folder_path + dataset_name
    try:
        # Copy the dataset to the 'processed' folder
        dbutils.fs.cp(source_path, destination_path)
        print(f"Dataset {dataset_name} copied to 'processed' folder.")
    except Exception as e:
        print(f"Failed to copy dataset {dataset_name} to 'processed' folder. Error: {str(e)}")

# Perform dataset imports in the dynamically generated order
import_new_dataset()
import_new_dataset()


Dataset public_transport_data_month_9.csv copied to 'processed' folder.
Imported dataset: public_transport_data_month_9.csv
No more datasets to import.


In [None]:
# Databricks notebook source
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import to_date, year, month, day, hour, minute, when, avg, regexp_replace, mean, count, round
from pyspark.sql import SparkSession

# Define Azure Blob Storage paths
raw_folder_path = "/mnt/public-transport-data/raw/"
processed_folder_path = "/mnt/public-transport-data/processed/"
csv_file_path = "/mnt/public-transport-data/processed_fils/processing_state.csv"

# Define the schema for the CSV file
schema = StructType([
    StructField("name_file", StringType(), True),
    StructField("date", StringType(), True)
])

# Function to extract the month number from the filename
def extract_month(filename):
    parts = filename.split("_")
    for part in parts:
        if part.isdigit():
            return int(part)
    return 0  # Return 0 if month number is not found

# List to keep track of imported datasets
imported_datasets = []

# Load the existing dataset names from the CSV file
existing_datasets = set()
try:
    # Read existing dataset names from the CSV file
    existing_df = spark.read.option("header", True).schema(schema).csv(csv_file_path)
    existing_datasets = set(existing_df.select("name_file").rdd.flatMap(lambda x: x).collect())
except Exception as e:
    print("Error reading existing datasets:", str(e))

# Get a list of files in the 'raw' folder in Azure Blob Storage
files_in_raw_folder = dbutils.fs.ls(raw_folder_path)
data_files = [file.name for file in files_in_raw_folder if file.isFile() and file.name not in existing_datasets]

# Sort the data files by their numeric part (e.g., month)
data_files.sort(key=lambda x: extract_month(x))

# Function to import a new dataset
def import_new_dataset():
    if data_files:
        # Get the next dataset to import
        dataset_to_import = data_files.pop(0)
        
        # Check if dataset is already imported
        if dataset_to_import in existing_datasets:
            print(f"Dataset {dataset_to_import} already imported. Skipping...")
        else:
            # Update the list of imported datasets
            imported_datasets.append(dataset_to_import)
            
            # Save dataset name and current date to the CSV file
            save_to_csv(dataset_to_import, datetime.now().strftime('%Y-%m-%d'))
            
            # Copy the dataset to the 'processed' folder
            copy_to_processed(dataset_to_import)
            
            # TODO: Import the dataset (use dataset_to_import)
            # Replace the following print statement with your dataset import logic
            print(f"Imported dataset: {dataset_to_import}")
    else:
        print("No more datasets to import.")

# Function to save dataset name and date to the CSV file
def save_to_csv(dataset_name, import_date):
    try:
        # Create a DataFrame with the new dataset import record
        new_record_df = spark.createDataFrame([(dataset_name, import_date)], schema)
        
        # Read existing dataset names and dates from the CSV file
        existing_df = spark.read.option("header", True).schema(schema).csv(csv_file_path)
        
        # Union the existing dataset with the new record
        updated_df = existing_df.union(new_record_df)
        
        # Write the updated DataFrame to the CSV file
        updated_df.write.option("header", True).mode("overwrite").csv(csv_file_path)
    except Exception as e:
        print("Error saving to CSV:", str(e))

# Function to copy the dataset to the 'processed' folder
def copy_to_processed(dataset_name):
    source_path = raw_folder_path + dataset_name
    destination_path = processed_folder_path + dataset_name
    try:
        # Copy the dataset to the 'processed' folder
        dbutils.fs.cp(source_path, destination_path)
        print(f"Dataset {dataset_name} copied to 'processed' folder.")
    except Exception as e:
        print(f"Failed to copy dataset {dataset_name} to 'processed' folder. Error: {str(e)}")

# Perform dataset imports in the dynamically generated order
import_new_dataset()
import_new_dataset()


Dataset public_transport_data_month_10.csv copied to 'processed' folder.
Imported dataset: public_transport_data_month_10.csv
Dataset public_transport_data_month_11.csv copied to 'processed' folder.
Imported dataset: public_transport_data_month_11.csv


In [None]:
# Databricks notebook source
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import to_date, year, month, day, hour, minute, when, avg, regexp_replace, mean, count, round
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
from pyspark.sql.functions import col, regexp_extract, to_timestamp
from pyspark.sql.functions import unix_timestamp, col, expr

# Define Azure Blob Storage paths
raw_folder_path = "/mnt/public-transport-data/raw/"
processed_folder_path = "/mnt/public-transport-data/processed/"
csv_file_path = "/mnt/public-transport-data/processed_fils/"

# Define the schema for the CSV file
schema = StructType([
    StructField("name_file", StringType(), True),
    StructField("date", StringType(), True)
])

# Function to extract the month number from the filename
def extract_month(filename):
    parts = filename.split("_")
    for part in parts:
        if part.isdigit():
            return int(part)
    return 0  # Return 0 if month number is not found


# Load the existing dataset names from the CSV file
existing_datasets = set()
try:
    # Read existing dataset names from the CSV file
    existing_df = spark.read.option("header", True).schema(schema).csv(csv_file_path)
    existing_datasets = set(existing_df.select("name_file").rdd.flatMap(lambda x: x).collect())
except Exception as e:
    print("Error reading existing datasets:", str(e))

# Get a list of files in the 'raw' folder in Azure Blob Storage
files_in_raw_folder = dbutils.fs.ls(raw_folder_path)
data_files = [file.name for file in files_in_raw_folder if file.isFile() and file.name not in existing_datasets]

# Sort the data files by their numeric part (e.g., month)
data_files.sort(key=lambda x: extract_month(x))

# Function to import a new dataset
def import_new_dataset():
    if data_files:
        # Get the next dataset to import
        dataset_to_import = data_files.pop(0)
        
        # Check if dataset is already imported
        if dataset_to_import in existing_datasets:
            print(f"Dataset {dataset_to_import} already imported. Skipping...")
        else:
            # Update the list of imported datasets
            imported_datasets.append(dataset_to_import)
            
            # Save dataset name and current date to the CSV file
            save_to_csv(dataset_to_import, datetime.now().strftime('%Y-%m-%d'))
            
            # Display the dataset (use dataset_to_import)
            print(f"Displaying dataset: {dataset_to_import}")
            dataset_df = spark.read.option("header", True).csv(raw_folder_path + dataset_to_import)
            
            def convert_to_24_hour(time_str):
                return to_timestamp(time_str, 'h:mm:ss a').cast('string')

            # Apply the function to DepartureTime and ArrivalTime columns
            dataset_df = (dataset_df.withColumn('DepartureTime', regexp_extract(col('DepartureTime'), r'(\d+:\d+:\d+ [APM]+)',1))
                          .withColumn('ArrivalTime', regexp_extract(col('ArrivalTime'), r'(\d+:\d+:\d+ [APM]+)', 1)))

            # Convert DepartureTime and ArrivalTime to 24-hour format
            dataset_df = (dataset_df.withColumn('DepartureTime', convert_to_24_hour(col('DepartureTime')))
                         .withColumn('ArrivalTime', convert_to_24_hour(col('ArrivalTime'))))
            
            dataset_df = (dataset_df.withColumn('DepartureTime', date_format(col('DepartureTime'), 'HH:mm:ss'))
                         .withColumn('ArrivalTime', date_format(col('ArrivalTime'), 'HH:mm:ss')))
            
            # Calculate the duration in seconds
            dataset_df = dataset_df.withColumn('DurationSeconds', 
                                                (unix_timestamp(col('ArrivalTime'), 'HH:mm:ss') - 
                                                unix_timestamp(col('DepartureTime'), 'HH:mm:ss')))

            # Convert the duration from seconds to HH:mm:ss format
            dataset_df = dataset_df.withColumn('Duration', 
                                                expr("from_unixtime(DurationSeconds, 'HH:mm:ss')"))
            
            dataset_df = dataset_df.withColumn('DelayCategory',
                                        when(col('Delay') == 0, 'Pas de Retard')
                                        .when((col('Delay') >= 1) & (col('Delay') <= 10), 'Retard Court')
                                        .when((col('Delay') >= 11) & (col('Delay') <= 20), 'Retard Moyen')
                                        .when(col('Delay') > 20, 'Long Retard')
                                        .otherwise('Unknown'))
            

            peak_passenger_threshold = 70

            # Identify peak and off-peak hours based on the number of passengers
            dataset_df = (dataset_df.withColumn('Hour', hour(col('DepartureTime')))
                                      .withColumn('HourCategory',
                                                  when(col('Passengers') >= peak_passenger_threshold, 'Peak Hour')
                                                  .otherwise('Off-Peak Hour')))



            
            dataset_df.show()  # Show the contents of the dataset
            
            # Copy the dataset to the 'processed' folder
            copy_to_processed(dataset_to_import)
            
            # TODO: Import the dataset (use dataset_to_import)
            # Replace the following print statement with your dataset import logic
            print(f"Imported dataset: {dataset_to_import}")
    else:
        print("No more datasets to import.")

# Function to save dataset name and date to the CSV file
def save_to_csv(dataset_name, import_date):
    try:
        # Create a DataFrame with the new dataset import record
        new_record_df = spark.createDataFrame([(dataset_name, import_date)], schema)
        
        # Read existing dataset names and dates from the CSV file
        existing_df = spark.read.option("header", True).schema(schema).csv(csv_file_path)
        
        # Union the existing dataset with the new record
        updated_df = existing_df.union(new_record_df)
        
        # Write the updated DataFrame to the CSV file
        updated_df.write.option("header", True).mode("overwrite").csv(csv_file_path)
    except Exception as e:
        print("Error saving to CSV:", str(e))

# Function to copy the dataset to the 'processed' folder
def copy_to_processed(dataset_name):
    source_path = raw_folder_path + dataset_name
    destination_path = processed_folder_path + dataset_name
    try:
        # Copy the dataset to the 'processed' folder
        dbutils.fs.cp(source_path, destination_path)
        print(f"Dataset {dataset_name} copied to 'processed' folder.")
    except Exception as e:
        print(f"Failed to copy dataset {dataset_name} to 'processed' folder. Error: {str(e)}")

# Perform dataset imports in the dynamically generated order
import_new_dataset()
import_new_dataset()


Displaying dataset: public_transport_data_month_11.csv
+----------+-------------+--------+-------------+-----------+----------+----------------+--------------+-----+---------------+--------+-------------+----+-------------+
|      Date|TransportType|   Route|DepartureTime|ArrivalTime|Passengers|DepartureStation|ArrivalStation|Delay|DurationSeconds|Duration|DelayCategory|Hour| HourCategory|
+----------+-------------+--------+-------------+-----------+----------+----------------+--------------+-----+---------------+--------+-------------+----+-------------+
|2023-11-01|          Bus| Route_4|     14:41:00|   15:10:00|        49|      Station_10|     Station_8|   13|           1740|00:29:00| Retard Moyen|  14|Off-Peak Hour|
|2023-11-01|        Metro| Route_4|     12:12:00|   13:24:00|        18|      Station_14|     Station_3|    6|           4320|01:12:00| Retard Court|  12|Off-Peak Hour|
|2023-11-01|        Metro| Route_2|     13:17:00|   13:37:00|        30|      Station_13|    Station

In [None]:
# Databricks notebook source
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import to_date, year, month, day, hour, minute, when, avg, regexp_replace, mean, count, round
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
from pyspark.sql.functions import col, regexp_extract, to_timestamp
from pyspark.sql.functions import unix_timestamp, col, expr

# Define Azure Blob Storage paths
raw_folder_path = "/mnt/public-transport-data/raw/"
processed_folder_path = "/mnt/public-transport-data/processed/"
csv_file_path = "/mnt/public-transport-data/processed_fils/"

# Define the schema for the CSV file
schema = StructType([
    StructField("name_file", StringType(), True),
    StructField("date", StringType(), True)
])

# Function to extract the month number from the filename
def extract_month(filename):
    parts = filename.split("_")
    for part in parts:
        if part.isdigit():
            return int(part)
    return 0  # Return 0 if month number is not found

# Load the existing dataset names from the CSV file
existing_datasets = set()
try:
    # Read existing dataset names from the CSV file
    existing_df = spark.read.option("header", True).schema(schema).csv(csv_file_path)
    existing_datasets = set(existing_df.select("name_file").rdd.flatMap(lambda x: x).collect())
except Exception as e:
    print("Error reading existing datasets:", str(e))

# Get a list of files in the 'raw' folder in Azure Blob Storage
files_in_raw_folder = dbutils.fs.ls(raw_folder_path)
data_files = [file.name for file in files_in_raw_folder if file.isFile() and file.name not in existing_datasets]

# Sort the data files by their numeric part (e.g., month)
data_files.sort(key=lambda x: extract_month(x))

# Function to import a new dataset
def import_new_dataset():
    if data_files:
        # Get the next dataset to import
        dataset_to_import = data_files.pop(0)
        
        # Check if dataset is already imported
        if dataset_to_import in existing_datasets:
            print(f"Dataset {dataset_to_import} already imported. Skipping...")
        else:
            # Update the list of imported datasets
            imported_datasets.append(dataset_to_import)
            
            # Save dataset name and current date to the CSV file
            save_to_csv(dataset_to_import, datetime.now().strftime('%Y-%m-%d'))
            
            # Display the dataset (use dataset_to_import)
            print(f"Displaying dataset: {dataset_to_import}")
            dataset_df = spark.read.option("header", True).csv(raw_folder_path + dataset_to_import)
            
            def convert_to_24_hour(time_str):
                return to_timestamp(time_str, 'h:mm:ss a').cast('string')

            # Apply the function to DepartureTime and ArrivalTime columns
            dataset_df = (dataset_df.withColumn('DepartureTime', regexp_extract(col('DepartureTime'), r'(\d+:\d+:\d+ [APM]+)',1))
                          .withColumn('ArrivalTime', regexp_extract(col('ArrivalTime'), r'(\d+:\d+:\d+ [APM]+)', 1)))

            # Convert DepartureTime and ArrivalTime to 24-hour format
            dataset_df = (dataset_df.withColumn('DepartureTime', convert_to_24_hour(col('DepartureTime')))
                         .withColumn('ArrivalTime', convert_to_24_hour(col('ArrivalTime'))))
            
            dataset_df = (dataset_df.withColumn('DepartureTime', date_format(col('DepartureTime'), 'HH:mm:ss'))
                         .withColumn('ArrivalTime', date_format(col('ArrivalTime'), 'HH:mm:ss')))
            
            # Calculate the duration in seconds
            dataset_df = dataset_df.withColumn('DurationSeconds', 
                                                (unix_timestamp(col('ArrivalTime'), 'HH:mm:ss') - 
                                                unix_timestamp(col('DepartureTime'), 'HH:mm:ss')))

            # Convert the duration from seconds to HH:mm:ss format
            dataset_df = dataset_df.withColumn('Duration', 
                                                expr("from_unixtime(DurationSeconds, 'HH:mm:ss')"))
            
            dataset_df = dataset_df.withColumn('DelayCategory',
                                        when(col('Delay') == 0, 'Pas de Retard')
                                        .when((col('Delay') >= 1) & (col('Delay') <= 10), 'Retard Court')
                                        .when((col('Delay') >= 11) & (col('Delay') <= 20), 'Retard Moyen')
                                        .when(col('Delay') > 20, 'Long Retard')
                                        .otherwise('Unknown'))
            

            peak_passenger_threshold = 70

            # Identify peak and off-peak hours based on the number of passengers
            dataset_df = (dataset_df.withColumn('Hour', hour(col('DepartureTime')))
                                      .withColumn('HourCategory',
                                                  when(col('Passengers') >= peak_passenger_threshold, 'Peak Hour')
                                                  .otherwise('Off-Peak Hour')))
            
            route_analysis = (dataset_df.groupBy('Route')
                                .agg(avg('Delay').alias('AvgDelay'),
                                     avg('Passengers').alias('AvgPassengers'),
                                     count('*').alias('TotalJourneys')))

            # Show the DataFrame with route analysis
            route_analysis.show()

            dataset_df.show()  # Show the contents of the dataset
            
            # Copy the dataset to the 'processed' folder
            export_to_processed(dataset_to_import, dataset_df)
            
            print(f"Imported dataset: {dataset_to_import}")
    else:
        print("No more datasets to import.")

# Function to save dataset name and date to the CSV file
def save_to_csv(dataset_name, import_date):
    try:
        # Create a DataFrame with the new dataset import record
        new_record_df = spark.createDataFrame([(dataset_name, import_date)], schema)
        
        # Read existing dataset names and dates from the CSV file
        existing_df = spark.read.option("header", True).schema(schema).csv(csv_file_path)
        
        # Union the existing dataset with the new record
        updated_df = existing_df.union(new_record_df)
        
        # Write the updated DataFrame to the CSV file
        updated_df.write.option("header", True).mode("overwrite").csv(csv_file_path)
    except Exception as e:
        print("Error saving to CSV:", str(e))

def export_to_processed(dataset_name, dataset_df):
    destination_path = processed_folder_path + dataset_name
    try:
        # Write the dataset_df to the 'processed' folder
        dataset_df.write.option("header", True).mode("overwrite").csv(destination_path)
        print(f"Dataset {dataset_name} exporteded to 'processed' folder.")
    except Exception as e:
        print(f"Failed to export dataset {dataset_name} to 'processed' folder. Error: {str(e)}")
    
import_new_dataset()
import_new_dataset()


Displaying dataset: public_transport_data_month_4.csv
+----------+-------------+--------+-------------+-----------+----------+----------------+--------------+-----+---------------+--------+-------------+----+-------------+
|      Date|TransportType|   Route|DepartureTime|ArrivalTime|Passengers|DepartureStation|ArrivalStation|Delay|DurationSeconds|Duration|DelayCategory|Hour| HourCategory|
+----------+-------------+--------+-------------+-----------+----------+----------------+--------------+-----+---------------+--------+-------------+----+-------------+
|2023-04-01|          Bus| Route_6|     10:05:00|   11:20:00|        65|       Station_9|     Station_9|   12|           4500|01:15:00| Retard Moyen|  10|Off-Peak Hour|
|2023-04-01|        Metro| Route_6|     08:03:00|   10:05:00|        30|      Station_11|     Station_7|    4|           7320|02:02:00| Retard Court|   8|Off-Peak Hour|
|2023-04-01|        Metro| Route_3|     07:30:00|   09:32:00|        58|      Station_17|    Station_