# New York Shared Bike systems

## Téléchargement des données

On télécharge les fichiers à partir des URLs fournies et les enregistre localement.

In [None]:
import requests
import os

# Liste des années pour lesquelles nous voulons télécharger les données
years = range(2014, 2024)

# URL de base pour les fichiers de données
base_url = "https://s3.amazonaws.com/tripdata"

# Dossier où les fichiers seront enregistrés
data_dir = "citibike_data"
os.makedirs(data_dir, exist_ok=True)

# Fonction pour télécharger un fichier
def download_file(url, dest_path):
    if not os.path.exists(dest_path):
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            with open(dest_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
            print(f"Downloaded: {dest_path}")
        else:
            print(f"Failed to download: {url}")
    else:
        print(f"File already exists: {dest_path}")

# Télécharger les fichiers pour chaque année
for year in years:
        # Construire les différentes variations de noms de fichiers possibles
        possible_file_names = [
            f"{year}-citibike-tripdata.zip",
        ]

        downloaded = False
        for file_name in possible_file_names:
            file_url = f"{base_url}/{file_name}"
            
            # Chemin de destination pour enregistrer le fichier
            dest_path = os.path.join(data_dir, file_name)
            
            # Télécharger le fichier
            download_file(file_url, dest_path)
            if os.path.exists(dest_path):
                downloaded = True
                break

Extraction des fichiers ZIP

In [None]:
import zipfile
import glob

zip_dir = "citibike_data"
csv_dir = "citibike_data_csv"
os.makedirs(csv_dir, exist_ok=True)

# Fonction pour extraire les fichiers ZIP et imprimer les fichiers CSV extraits
def extract_zip_files(zip_dir, csv_dir):
    zip_files = glob.glob(os.path.join(zip_dir, "*.zip"))
    for zip_file in zip_files:
        # Obtenir le nom du fichier sans extension
        base_name = os.path.basename(zip_file).replace('.zip', '')
        # Vérifier si le fichier CSV existe déjà dans le répertoire cible
        if not any(glob.glob(os.path.join(csv_dir, f"{base_name}*.csv"))):
            with zipfile.ZipFile(zip_file, 'r') as zip_ref:
                zip_ref.extractall(csv_dir)
            print(f"Extracted: {zip_file}")
            # Lister les fichiers CSV extraits
            extracted_files = glob.glob(os.path.join(csv_dir, f"{base_name}*.csv"))
            for extracted_file in extracted_files:
                print(f"Fichier CSV extrait : {os.path.basename(extracted_file)}")
        else:
            print(f"File already extracted: {zip_file}")

# Extraire les fichiers ZIP et imprimer les fichiers CSV extraits
extract_zip_files(zip_dir, csv_dir)

Création d'une session Spark

In [None]:
import os
import glob
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Créer une session Spark
spark = SparkSession.builder \
    .appName("CitiBikeData") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.num.executors", "4") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

### Sauvegarde des données en un format parquet

In [None]:
from pyspark.sql.types import *
# Colonnes avant 2020
columns_2014_2020 = [
    "tripduration", "starttime", "stoptime", "start station id", "start station name",
    "start station latitude", "start station longitude", "end station id", "end station name",
    "end station latitude", "end station longitude", "bikeid", "usertype", "birth year", "gender"
]

# Colonnes après 2020
column_mapping_2021 = {
    "ride_id": "trip_id",
    "rideable_type": "bike_type",
    "started_at": "starttime",
    "ended_at": "stoptime",
    "start_station_name": "start_station_name",
    "start_station_id": "start_station_id",
    "end_station_name": "end_station_name",
    "end_station_id": "end_station_id",
    "start_lat": "start_station_latitude",
    "start_lng": "start_station_longitude",
    "end_lat": "end_station_latitude",
    "end_lng": "end_station_longitude",
    "member_casual": "usertype"
}

# Colonnes avec noms en majuscules et espaces
column_mapping_uppercase = {
    "Trip Duration": "tripduration",
    "Start Time": "starttime",
    "Stop Time": "stoptime",
    "Start Station ID": "start_station_id",
    "Start Station Name": "start_station_name",
    "Start Station Latitude": "start_station_latitude",
    "Start Station Longitude": "start_station_longitude",
    "End Station ID": "end_station_id",
    "End Station Name": "end_station_name",
    "End Station Latitude": "end_station_latitude",
    "End Station Longitude": "end_station_longitude",
    "Bike ID": "bikeid",
    "User Type": "usertype",
    "Birth Year": "birth_year",
    "Gender": "gender"
}

# Colonnes uniformisées
uniform_columns = [
    "trip_id", "tripduration", "starttime", "stoptime", "start_station_id", "start_station_name",
    "start_station_latitude", "start_station_longitude", "end_station_id", "end_station_name",
    "end_station_latitude", "end_station_longitude", "bikeid", "bike_type", "usertype", 
    "birth_year", "gender", "year", "month"
]

In [None]:
def ensure_columns_exist(df, columns):
    for col_name in columns:
        if col_name not in df.columns:
            df = df.withColumn(col_name, lit(None))
    return df

def process_csv_to_parquet_2014_2020(spark_df, year, month):
    spark_df = spark_df.toDF(*columns_2014_2020).withColumnRenamed("start station id", "start_station_id") \
        .withColumnRenamed("start station name", "start_station_name") \
        .withColumnRenamed("start station latitude", "start_station_latitude") \
        .withColumnRenamed("start station longitude", "start_station_longitude") \
        .withColumnRenamed("end station id", "end_station_id") \
        .withColumnRenamed("end station name", "end_station_name") \
        .withColumnRenamed("end station latitude", "end_station_latitude") \
        .withColumnRenamed("end station longitude", "end_station_longitude") \
        .withColumnRenamed("birth year", "birth_year")

    spark_df = spark_df.withColumn("starttime", col("starttime").cast("timestamp")) \
        .withColumn("stoptime", col("stoptime").cast("timestamp"))
    
    spark_df = spark_df.withColumn("bike_type", lit("classic"))
    spark_df = spark_df.withColumn("trip_id", monotonically_increasing_id().cast("string"))

    spark_df = spark_df.withColumn("tripduration", col("tripduration").cast("integer"))
    spark_df = spark_df.withColumn("birth_year", col("birth_year").cast("integer"))
    spark_df = spark_df.withColumn("gender", col("gender").cast("integer"))

    spark_df = spark_df.withColumn("year", lit(year)).withColumn("month", lit(month))
    
    spark_df = ensure_columns_exist(spark_df, uniform_columns)
    
    spark_df = spark_df.select(*uniform_columns)
    
    return spark_df

In [None]:
def process_csv_to_parquet_2021(spark_df, year, month):
    for original, new in column_mapping_2021.items():
        spark_df = spark_df.withColumnRenamed(original, new)
    
    spark_df = spark_df.withColumn("starttime", col("starttime").cast("timestamp")) \
        .withColumn("stoptime", col("stoptime").cast("timestamp"))
    
    spark_df = spark_df.withColumn("tripduration", (col("stoptime").cast("long") - col("starttime").cast("long")).cast("int"))
    spark_df = spark_df.withColumn("bikeid", concat_ws("_", col("trip_id"), col("bike_type")))

    spark_df = spark_df.withColumn("year", lit(year)).withColumn("month", lit(month))
    
    spark_df = spark_df.withColumn("birth_year", lit(None).cast("integer")) \
        .withColumn("gender", lit(None).cast("integer"))
    
    spark_df = ensure_columns_exist(spark_df, uniform_columns)
    
    spark_df = spark_df.select(*uniform_columns)
    
    return spark_df


In [None]:
def process_csv(file_path, year, month):
    spark_df = spark.read.option("header", "true").csv(file_path)
    
    columns = spark_df.columns
    if set(columns_2014_2020).issubset(columns):
        return process_csv_to_parquet_2014_2020(spark_df, year, month)
    elif set(column_mapping_2021.keys()).issubset(columns):
        return process_csv_to_parquet_2021(spark_df, year, month)
    elif set(column_mapping_uppercase.keys()).issubset(columns):
        for original, new in column_mapping_uppercase.items():
            spark_df = spark_df.withColumnRenamed(original, new)
        return process_csv_to_parquet_2014_2020(spark_df, year, month)
    else:
        raise ValueError(f"Unknown columns in file: {file_path}")

In [None]:
data_dir = "citibike_data_csv"

for year in range(2014, 2024):
    year_path = os.path.join(data_dir, f"{year}-citibike-tripdata")
    if os.path.exists(year_path):
        for month_folder in os.listdir(year_path):
            month_path = os.path.join(year_path, month_folder)
            try:
                print(f"Processing: {month_folder}")
                month = month_folder.split('_')[1]
            except IndexError:
                continue
            for file in glob.glob(f"{month_path}/*.csv"):
                processed_df = process_csv(file, year, month)
                if processed_df is not None:
                    # Sauvegarder le DataFrame directement dans les répertoires partitionnés
                    parquet_path = f"parquet_files/year={year}/month={month}"
                    processed_df.write.mode("overwrite").parquet(parquet_path)
                    print(f"Saved {parquet_path}")

In [None]:
from pyspark.sql.types import *
schema = StructType([
    StructField("trip_id", StringType(), True),
    StructField("tripduration", IntegerType(), True),  # Forcing tripduration to String
    StructField("starttime", TimestampType(), True),
    StructField("stoptime", TimestampType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_latitude", StringType(), True),
    StructField("start_station_longitude", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_latitude", StringType(), True),
    StructField("end_station_longitude", StringType(), True),
    StructField("bikeid", StringType(), True),
    StructField("bike_type", StringType(), True),
    StructField("usertype", StringType(), True),
    StructField("birth_year", IntegerType(), True),
    StructField("gender", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", StringType(), True)
])

In [None]:
global_df = None

data_dir = "citibike_data_csv"

for year in range(2014, 2024):
    year_path = os.path.join(data_dir, f"{year}-citibike-tripdata")
    if os.path.exists(year_path):
        for month_folder in os.listdir(year_path):
            month_path = os.path.join(year_path, month_folder)
            try:
                print(f"Processing: {month_folder}")
                month = month_folder.split('_')[1]
            except IndexError:
                continue
            for file in glob.glob(f"{month_path}/*.csv"):
                processed_df = process_csv(file, year, month)
                if processed_df is not None:
                    # Fusionner le DataFrame actuel avec le DataFrame global
                    if global_df is None:
                        global_df = processed_df
                    else:
                        global_df = global_df.unionByName(processed_df)

In [None]:
global_df.storageLevel

In [None]:
global_df.rdd.getNumPartitions()

#### Organiser les données en un schéma en étoile

In [None]:
years = range(2014, 2024)
months = ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"]


df = None

for year in years:
    for month in months:
        parquet_file_path = f"parquet_files/year={year}/month={month}"
        if os.path.exists(parquet_file_path):
            try:
                month_df = spark.read.schema(schema).parquet(parquet_file_path)
                if df is None:
                    df = month_df
                else:
                    df = df.unionByName(month_df)
                print(f"Loaded data for {year} {month}")
            except Exception as e:
                print(f"Failed to read Parquet files from {parquet_file_path}: {e}")

In [None]:
df = spark.read.schema(schema).parquet("parquet_files")

In [None]:
#What is the StorageLevel of the dataframe after reading the csv files?
df.storageLevel

In [None]:
#What is the number of partitions of the dataframe?
print(df.rdd.getNumPartitions())
#Is it possible to tune this number at loading time?
#Yes, it is possible to tune the number of partitions at loading time by using the `option("spark.sql.files.maxPartitionBytes", "128MB")` configuration. This configuration sets the maximum number of bytes to pack into a single partition when reading files.
#Why would we want to modify the number of partitions when creating the parquet files?
#We may want to modify the number of partitions when creating the Parquet files to ensure that the data is evenly distributed across the partitions. This can help improve the performance of queries and operations that are performed on the data, especially when the data is large and needs to be processed in parallel.

table de dimension regroupant des informations concernant les stations (nom, identifiant(s), latitude, longitude, ...)

In [None]:

    # Créer la table des stations à partir des informations de départ et d'arrivée des trajets
start_stations_df = df.select(
    col("start_station_id").alias("station_id"),
    col("start_station_name").alias("station_name"),
    col("start_station_latitude").alias("latitude"),
    col("start_station_longitude").alias("longitude")
).distinct()

end_stations_df = df.select(
    col("end_station_id").alias("station_id"),
    col("end_station_name").alias("station_name"),
    col("end_station_latitude").alias("latitude"),
    col("end_station_longitude").alias("longitude")
).distinct()

    # Union des deux DataFrames pour obtenir toutes les stations
stations_df = start_stations_df.union(end_stations_df).distinct()

    # Sauvegarder la table des stations en format Parquet
stations_df.write.mode("overwrite").parquet("stations_parquet")
print("Saved stations dimension table as Parquet")

In [None]:
df_stations = spark.read.parquet("stations_parquet")

df_stations.show(5)

 table d'événements regroupant des informations sur les voyages avec des références à la table de dimension (les identifiants des stations de départ/arrivée).

In [None]:
trips_event_df = df.select(
    col("trip_id"),
    col("tripduration"),
    col("starttime"),
    col("stoptime"),
    col("start_station_id").alias("start_station_id"),
    col("end_station_id").alias("end_station_id"),
    col("bikeid"),
    col("bike_type"),
    col("usertype"),
    col("birth_year"),
    col("gender"),
    col("year"),
    col("month")
)

    # Sauvegarder la table des trajets en format Parquet avec partitionnement
trips_event_df.write.mode("overwrite").partitionBy("year", "month").parquet("trips_parquet")
print("Saved trips event table as Parquet")

In [None]:
df_trips_event = spark.read.parquet("trips_parquet")

df_trips_event.show(5)

## Analyser une année de données avant la COVID et une année après la COVID

Premierement, nous allons charger les données des années 2019 et 2021

In [None]:
def load_data_for_year(year):
    df = spark.read.schema(schema).parquet(f"parquet_files/year={year}")
    return df

In [None]:
df_2019 = load_data_for_year(2019)
df_2021 = load_data_for_year(2021)

On calcule la distance parcourue

In [None]:
from pyspark.sql.functions import sqrt, pow

def add_trip_distance(df):
    df = df.withColumn("trip_distance", sqrt(
        pow(col("end_station_latitude") - col("start_station_latitude"), 2) +
        pow(col("end_station_longitude") - col("start_station_longitude"), 2)
    ))
    return df

def convert_distance_to_km(df):
    km_per_degree = 111  # Approximatif
    df = df.withColumn("trip_distance_km", col("trip_distance") * lit(km_per_degree))
    return df


df_2019 = add_trip_distance(df_2019)
df_2021 = add_trip_distance(df_2021)

df_2019 = convert_distance_to_km(df_2019)
df_2021 = convert_distance_to_km(df_2021)

In [None]:
#import all from pyspark.sql.functions
df_2019.select("gender").distinct().show()
df_2021.select("gender").distinct().show()

Pour chaque jour de la semaine de l'année 2019, on affiche la mediane de la distance parcourue

In [None]:
import matplotlib.pyplot as plt

def plot_trip_distance_distribution(df, year):
    df = df.withColumn("day_of_week", dayofweek(col("starttime")))
    distance_distribution = df.groupBy("day_of_week").agg(expr("percentile_approx(trip_distance_km, 0.5)").alias("median_distance")).collect()
    day_of_week = [row["day_of_week"] for row in distance_distribution]
    median_distance = [row["median_distance"] for row in distance_distribution]

    plt.figure(figsize=(10, 6))
    plt.bar(day_of_week, median_distance, color='blue', alpha=0.7)
    plt.xlabel('Day of Week')
    plt.ylabel('Median Trip Distance (km)')
    plt.title(f'Median Trip Distance Distribution by Day of Week in {year}')
    plt.show()

plot_trip_distance_distribution(df_2019, 2019)

et pour l'année 2021

In [None]:
plot_trip_distance_distribution(df_2021, 2021)

#### Compte le nombre de trajets pour chaque couple de stations de départ/arrivée

On va d'abord affiché le nombre de couple de stations départ/arrivée distinctes pour l'année 2019

In [None]:
# afficher les couples départ/arrivée distincts
station_pairs_df = df_2019.select("start_station_name", "end_station_name").distinct()
print("Nombre de couples de stations distincts en 2019 : ", station_pairs_df.count())

On remarque que le nombre est extrêmement , on pourra donc pas toutes les affichées. On va se limité aux 20 couples de stations les plus fréquenté 

In [None]:
import matplotlib.pyplot as plt

def plot_trip_count_by_location(df, year):
    # Grouper par noms des stations de départ et d'arrivée et compter le nombre de trajets
    trip_count = df.groupBy("start_station_name", "end_station_name").count().orderBy("count", ascending=False).limit(20).collect()
    
    # Préparer les données pour la visualisation
    start_end_stations = [f"{row['start_station_name']} -> {row['end_station_name']}" for row in trip_count]
    counts = [row["count"] for row in trip_count]

    # Créer un graphique en barres horizontales
    plt.figure(figsize=(12, 6))
    plt.barh(start_end_stations, counts, color='green', alpha=0.7)
    plt.xlabel('Number of Trips')
    plt.ylabel('Start Station -> End Station')
    plt.title(f'Top 20 Trip Counts by Pickup/Dropoff Location in {year}')
    plt.show()

plot_trip_count_by_location(df_2019, 2019)

Pour l'année 2021

In [None]:
# afficher les couples départ/arrivée distincts
station_pairs_df = df_2021.select("start_station_name", "end_station_name").distinct()
print("Nombre de couples de stations distincts en 2021 : ", station_pairs_df.count())

Pareil, le nombre est élevé, on se limitra aussi aux 20 couples les plus fréquenté

In [None]:
plot_trip_count_by_location(df_2021, 2021)

#### Calcule la répartition de la distance parcourue selon le sexe

Pour l'année 2019

In [None]:
import matplotlib.pyplot as plt

def plot_trip_distance_by_gender(df, year):
    # Convertir la colonne 'gender' en entier si nécessaire
    df = df.withColumn("gender", col("gender").cast("int"))

    # Filtrer pour exclure les trajets où le genre est non spécifié (0) ou NULL
    filtered_df = df.filter((col("gender") != 0) & (col("gender").isNotNull()))
    
    # Calculer la distance médiane par genre
    distance_by_gender = filtered_df.groupBy("gender").agg(expr("percentile_approx(trip_distance_km, 0.5)").alias("median_distance")).collect()
    
    # Mapper les valeurs de genre aux étiquettes
    gender_labels = {1: "Male", 2: "Female"}
    genders = [gender_labels[row["gender"]] for row in distance_by_gender]
    median_distance = [row["median_distance"] for row in distance_by_gender]

    # Créer un graphique en barres
    plt.figure(figsize=(10, 6))
    plt.bar(genders, median_distance, color='purple', alpha=0.7)
    plt.xlabel('Gender')
    plt.ylabel('Median Trip Distance (km)')
    plt.title(f'Median Trip Distance Distribution by Gender in {year}')
    plt.show()

# Générer les graphiques
plot_trip_distance_by_gender(df_2019, 2019)


Pour l'année 2021

In [None]:
plot_trip_distance_by_gender(df_2021, 2021)

#### Calcule la répartition de la distance parcourue pour les tranches d'âge (15-24,25-44,45-54,55-64,65+)

Pour l'année 2019

In [None]:
def add_age_column(df, year):
    # Calculer l'âge en fonction de l'année des données
    df = df.withColumn("age", (lit(year) - col("birth_year")).cast("integer"))
    return df

# Ajouter la colonne "age" aux DataFrames 2019 et 2021
df_2019 = add_age_column(df_2019, 2019)
df_2021 = add_age_column(df_2021, 2021)

def plot_trip_distance_by_age_range(df, year):
    df = df.withColumn("age_range", 
                       when(col("age").between(15, 24), "15-24")
                       .when(col("age").between(25, 44), "25-44")
                       .when(col("age").between(45, 54), "45-54")
                       .when(col("age").between(55, 64), "55-64")
                       .when(col("age") >= 65, "65+")
                       .otherwise("Unknown"))

    distance_by_age_range = df.groupBy("age_range").agg(expr("percentile_approx(trip_distance_km, 0.5)").alias("median_distance")).collect()
    
    # Trier les âges
    sorted_age_ranges = sorted(distance_by_age_range, key=lambda row: ["15-24", "25-44", "45-54", "55-64", "65+", "Unknown"].index(row["age_range"]))
    
    age_ranges = [row["age_range"] for row in sorted_age_ranges if row["age_range"] != "Unknown"]
    median_distance = [row["median_distance"] for row in sorted_age_ranges if row["age_range"] != "Unknown"]

    plt.figure(figsize=(10, 6))
    plt.bar(age_ranges, median_distance, color='orange', alpha=0.7)
    plt.xlabel('Age Range')
    plt.ylabel('Median Trip Distance (km)')
    plt.title(f'Median Trip Distance Distribution by Age Range in {year}')
    plt.show()

# Générer les graphiques
plot_trip_distance_by_age_range(df_2019, 2019)


Pour l'année 2021

In [None]:
plot_trip_distance_by_age_range(df_2021, 2021)

#### Calcule la répartition de la distance parcourue pour différents types de vélos

Pour l'année 2019 (Les données avant 2021 n'ont pas de type de vélo, donc il y'aura que le type `classic` qu'on a mis par defaut qui sera affiché)

In [None]:
def plot_trip_distance_by_bike_type(df, year):
    distance_by_bike_type = df.groupBy("bike_type").agg(expr("percentile_approx(trip_distance_km, 0.5)").alias("median_distance")).collect()
    bike_types = [row["bike_type"] for row in distance_by_bike_type]
    median_distance = [row["median_distance"] for row in distance_by_bike_type]

    plt.figure(figsize=(10, 6))
    plt.bar(bike_types, median_distance, color='red', alpha=0.7)
    plt.xlabel('Bike Type')
    plt.ylabel('Median Trip Distance (km)')
    plt.title(f'Median Trip Distance Distribution by Bike Type in {year}')
    plt.show()

plot_trip_distance_by_bike_type(df_2019, 2019)

Pour l'année 2021 (il y'a aussi quelque mois qui n'ont pas de type de vélo, qui ont donc aussi la valeur par défaut `classic`, ce qui biaise le résultat)

In [None]:
plot_trip_distance_by_bike_type(df_2021, 2021)

In [None]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import when

# Ajouter une colonne "duration_category" basée sur "tripduration"
def add_duration_category(df):
    df = df.withColumn("duration_category", 
                       when(col("tripduration") <= 300, "Very Short")
                       .when((col("tripduration") > 300) & (col("tripduration") <= 900), "Short")
                       .when((col("tripduration") > 900) & (col("tripduration") <= 1800), "Medium")
                       .when((col("tripduration") > 1800) & (col("tripduration") <= 3600), "Long")
                       .when(col("tripduration") > 3600, "Very Long"))
    return df

# Appliquer la fonction aux DataFrames 2019 et 2021
df_2019 = add_duration_category(df_2019)
df_2021 = add_duration_category(df_2021)

def plot_trip_distance_by_duration_category(df, year):
    distance_by_duration = df.groupBy("duration_category").agg(expr("percentile_approx(trip_distance_km, 0.5)").alias("median_distance")).collect()
    
    duration_categories = [row["duration_category"] for row in distance_by_duration if row["duration_category"] is not None]
    median_distance = [row["median_distance"] for row in distance_by_duration if row["duration_category"] is not None]
    
    # Trier les catégories de durée
    sorted_categories = ["Very Short", "Short", "Medium", "Long", "Very Long"]
    duration_categories, median_distance = zip(*sorted(zip(duration_categories, median_distance), key=lambda x: sorted_categories.index(x[0])))
    
    plt.figure(figsize=(10, 6))
    plt.bar(duration_categories, median_distance, color='teal', alpha=0.7)
    plt.xlabel('Trip Duration Category')
    plt.ylabel('Median Trip Distance (km)')
    plt.title(f'Median Trip Distance Distribution by Trip Duration in {year}')
    plt.show()

# Générer les graphiques
plot_trip_distance_by_duration_category(df_2019, 2019)
plot_trip_distance_by_duration_category(df_2021, 2021)


### Évaluer les saisonnalités et examiner les séries temporelles

Ici, on va calculer et tracer des série temporelle indexée par jour de la semaine et heure de la journée

On commance d'abord par ajouter les colonnes `day_of_week` et `hour_of_day`

In [None]:
import seaborn as sns
import pandas as pd

# Ajouter les colonnes "day_of_week" et "hour_of_day"
def add_time_columns(df):
    df = df.withColumn("day_of_week", dayofweek(col("starttime")))
    df = df.withColumn("hour_of_day", hour(col("starttime")))
    return df

# Appliquer la fonction aux DataFrames 2019 et 2021
df_2019 = add_time_columns(df_2019)
df_2021 = add_time_columns(df_2021)

Ensuite, on calcule les métriques et les trier par jour de la semaine et heure de la journée

In [None]:
def compute_metrics(df):
    metrics = df.groupBy("day_of_week", "hour_of_day").agg(
        count("trip_id").alias("pickup_dock_count"),
        avg("trip_distance_km").alias("avg_distance"),
        avg("tripduration").alias("avg_duration")
    )
    return metrics

In [None]:
metrics_2019 = compute_metrics(df_2019).orderBy("day_of_week", "hour_of_day").collect()
metrics_2021 = compute_metrics(df_2021).orderBy("day_of_week", "hour_of_day").collect()

On convertie ces métriques au format `Pandas` 

In [None]:
def convert_to_pandas(metrics):
    data = [(row["day_of_week"], row["hour_of_day"], row["pickup_dock_count"], row["avg_distance"], row["avg_duration"]) for row in metrics]
    df = pd.DataFrame(data, columns=["day_of_week", "hour_of_day", "pickup_dock_count", "avg_distance", "avg_duration"])
    return df

metrics_2019_pd = convert_to_pandas(metrics_2019)
metrics_2021_pd = convert_to_pandas(metrics_2021)

Fonction qui sera utilisé pour tracer une série temporelle

In [None]:
def plot_time_series(df, metric, title, ylabel, year):
    pivot_df = df.pivot(index='hour_of_day', columns='day_of_week', values=metric)
    plt.figure(figsize=(12, 8))
    sns.heatmap(pivot_df, cmap="YlGnBu", annot=True, fmt=".2f", cbar_kws={'label': ylabel})
    plt.title(f'{title} in {year}')
    plt.xlabel('Day of Week')
    plt.ylabel('Hour of Day')
    plt.xticks(ticks=[0.5,1.5,2.5,3.5,4.5,5.5,6.5], labels=["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"])
    plt.show()

Trace le nombre de départ/arrivé pour les années 2019 et 2021

In [None]:
plot_time_series(metrics_2019_pd, 'pickup_dock_count', 'Number of Pickups/Docks', 'Number of Pickups/Docks', 2019)
plot_time_series(metrics_2021_pd, 'pickup_dock_count', 'Number of Pickups/Docks', 'Number of Pickups/Docks', 2021)

Trace la distance moyenne pour les années 2019 et 2021

In [None]:
plot_time_series(metrics_2019_pd, 'avg_distance', 'Average Trip Distance (km)', 'Average Trip Distance (km)', 2019)
plot_time_series(metrics_2021_pd, 'avg_distance', 'Average Trip Distance (km)', 'Average Trip Distance (km)', 2021)

Trace la durée moyenne des trajets pour les années 2019 et 2021

In [None]:
plot_time_series(metrics_2019_pd, 'avg_duration', 'Average Trip Duration (sec)', 'Average Trip Duration (sec)', 2019)
plot_time_series(metrics_2021_pd, 'avg_duration', 'Average Trip Duration (sec)', 'Average Trip Duration (sec)', 2021)

In [None]:
# Afficher les plans d'exécution
metrics_2019 = compute_metrics(df_2019)
metrics_2019.explain(True)

## Explorer les problèmes liés à l'information spatiale

Construire une carte thermique indexée par couple de stations où la couleur est une fonction du nombre de trajets d'une station à une autre.

In [None]:
import pandas as pd
import plotly.express as px

# Calcul du nombre de trajets entre chaque couple de stations directement dans Spark
trip_counts_spark = df_2019.groupBy('start_station_name', 'end_station_name').count().withColumnRenamed('count', 'trip_count')

# Filtrer les données pour les stations les plus fréquentées (facultatif)
top_stations = [row['start_station_name'] for row in trip_counts_spark.groupBy('start_station_name').sum('trip_count').orderBy('sum(trip_count)', ascending=False).limit(20).collect()]

trip_counts_filtered = trip_counts_spark.filter(trip_counts_spark.start_station_name.isin(top_stations) & trip_counts_spark.end_station_name.isin(top_stations))

# Convertir les données réduites en Pandas
trip_counts_pd = trip_counts_filtered.toPandas()

fig = px.density_heatmap(trip_counts_pd, x='start_station_name', y='end_station_name', z='trip_count',
                         labels={'start_station_name': 'Start Station', 'end_station_name': 'End Station', 'trip_count': 'Number of Trips'},
                         title='Heatmap of Trips between Top Stations')

# Ajuster la taille de la figure et l'orientation des étiquettes
fig.update_layout(width=1000, height=1000, margin=dict(l=200, r=200, b=200, t=100))
fig.update_xaxes(tickangle=45, tickfont=dict(size=10))
fig.update_yaxes(tickfont=dict(size=10))

fig.show()

Construire une carte thermique interactive avec un curseur permettant à l'utilisateur de sélectionner une heure de la journée et où la couleur est une fonction du nombre de trajets d'une station à une autre.

In [None]:
df_2019 = df_2019.withColumn('hour_of_day', hour(df_2019['starttime']))

# Calculer le nombre de trajets entre chaque couple de stations pour chaque heure de la journée
trip_counts_hourly = df_2019.groupBy('start_station_name', 'end_station_name', 'hour_of_day').count().withColumnRenamed('count', 'trip_count')

# Filtrer les données pour les stations les plus fréquentées (facultatif)
top_stations = [row['start_station_name'] for row in trip_counts_hourly.groupBy('start_station_name').sum('trip_count').orderBy('sum(trip_count)', ascending=False).limit(20).collect()]

trip_counts_filtered_hourly = trip_counts_hourly.filter(trip_counts_hourly.start_station_name.isin(top_stations) & trip_counts_hourly.end_station_name.isin(top_stations))

# Convertir les données en Pandas
trip_counts_hourly_pd = trip_counts_filtered_hourly.toPandas()

In [None]:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Créer une liste des figures pour chaque heure
figures = []

for hour in range(24):
    hourly_data = trip_counts_hourly_pd[trip_counts_hourly_pd['hour_of_day'] == hour]
    fig = px.density_heatmap(hourly_data, x='start_station_name', y='end_station_name', z='trip_count',
                             labels={'start_station_name': 'Start Station', 'end_station_name': 'End Station', 'trip_count': 'Number of Trips'},
                             title=f'Heatmap of Trips between Top Stations at Hour {hour}')
    
    # Ajuster la taille de la figure et l'orientation des étiquettes
    fig.update_layout(width=1000, height=1000, margin=dict(l=200, r=200, b=200, t=100))
    fig.update_xaxes(tickangle=45, tickfont=dict(size=10))
    fig.update_yaxes(tickfont=dict(size=10))
    
    figures.append(fig)

# Créer une figure avec des sliders
fig = make_subplots(rows=1, cols=1, shared_xaxes=True, shared_yaxes=True, subplot_titles=['Heatmap of Trips between Top Stations'])

# Ajouter des traces pour chaque heure
for hour, hourly_fig in enumerate(figures):
    for trace in hourly_fig.data:
        fig.add_trace(trace)

# Ajouter un slider
steps = []
for i in range(24):
    step = dict(
        method="update",
        args=[{"visible": [False] * len(fig.data)}],  # Tout rendre invisible
        label=f'Hour {i}'
    )
    for j in range(i * len(hourly_fig.data), (i + 1) * len(hourly_fig.data)):
        step["args"][0]["visible"][j] = True  # Rendre visible la trace correspondante
    steps.append(step)

sliders = [dict(
    active=0,
    pad={"t": 50},
    steps=steps
)]

fig.update_layout(
    sliders=sliders
)

fig.show()

## Superviser l'exécution des tâches

##### Utilisation de la méthode explain.

In [None]:
df_2021.explain(True)

##### Les difference entre le plan logique analysé et le plan logique optimisé :
- Optimisation des Projections : Le Plan Logique Optimisé combine plusieurs projections en une seule projection lorsque c'est possible, réduisant ainsi la surcharge des projections intermédiaires.

- Simplification des Constantes : Les constantes et les expressions déterministes sont calculées durant l'optimisation pour simplifier le plan.

- Réduction des Colonnes : Seules les colonnes nécessaires sont conservées dans les projections, ce qui peut réduire la quantité de données traitées. 

##### Comment un SGBDR procéderait avec une telle requête

- Analyse Syntaxique : La requête SQL est analysée en un arbre syntaxique abstrait (AST) ou une structure similaire.
- Analyse Sémantique : La requête analysée subit une analyse sémantique où les références de colonnes sont résolues et les types sont vérifiés.
- Plan Logique : Un plan logique est généré, similaire au Plan Logique Analysé de Spark. Il décrit les opérations nécessaires pour exécuter la requête.
- Optimisation : Le plan logique est optimisé.

##### Différences entre le Plan Physique et le Plan Logique Optimisé

- Plan Logique Optimisé :  
Le Plan Logique Optimisé représente une version simplifiée et rationalisée des opérations nécessaires pour exécuter la requête. Il supprime les opérations redondantes et combine les transformations lorsque c'est possible.

- Plan Physique :  
Le Plan Physique détaille la manière exacte dont chaque opération sera exécutée sur le cluster Spark. Il inclut des informations sur l'exécution réelle des tâches, comme l'utilisation de mémoire, les partitions, et les stratégies de traitement spécifiques.

##### Mots-clés Non Attendus dans un SGBDR et Leur Signification

- FileScan parquet :  
Signification : Lecture de fichiers Parquet, un format de stockage en colonnes optimisé pour les requêtes analytiques.
RDBMS : Les SGBDR traditionnels utilisent des fichiers de table internes ou des tablespaces plutôt que des fichiers de format spécifique comme Parquet.

- Batched :  
Signification : Traitement des données en lots pour améliorer l'efficacité.
RDBMS : Bien que les SGBDR puissent utiliser des opérations par lots, ce terme spécifique est plus courant dans le traitement distribué comme Spark.

- Union :  
Signification : Combinaison de plusieurs ensembles de données.
RDBMS : Bien que les SGBDR aient des opérations UNION, dans le contexte Spark, cela peut impliquer des manipulations de partitions supplémentaires pour gérer les données distribuées.

- Project :  
Signification : Sélection et transformation des colonnes.
RDBMS : Bien que les projections existent dans les SGBDR, le terme Project en tant qu'opération distincte de l'optimisation logique est plus spécifique à Spark et aux frameworks de traitement de données distribués.

##### Nombre de Stages Nécessaires pour Compléter le Job Spark

Dans notre cas, le nombre de stages nécessaires est : 204

##### Rôles de `HashAggregate` et `Exchange hashpartitioning``

- HashAggregate : 
* * HashAggregate est une étape d'agrégation dans laquelle les données sont agrégées en utilisant une fonction de hachage. Cela signifie que les valeurs des colonnes de regroupement sont transformées en codes de hachage, qui sont ensuite utilisés pour effectuer des agrégations.
* * Typiquement utilisé pour des opérations comme groupBy, sum, avg, etc.

- Exchange hashpartitioning : 
* * Exchange avec hashpartitioning indique un shuffle dans Spark où les données sont redistribuées à travers les partitions en utilisant une fonction de hachage sur une clé spécifique.
* * Par exemple, lors d'un join ou d'un groupBy, les données doivent être partitionnées de manière à ce que les mêmes clés soient envoyées aux mêmes partitions

##### Nombre d'operations `shuffle` utilisé par le plan physique

Les opérations de `shuffle` sont indiquées par des `Exchange` dans le plan physique. Cependant, dans notre plan physique plus haut, il n'y a aucune mention explicite d'opérations Exchange. Cela signifie que le plan physique ne montre pas de redistributions de données explicites.

##### Les Tâches dans le Contexte des Stages dans Spark

Une tâche est une unité de travail qui exécute une fraction d'un stage. Chaque tâche traite une partition spécifique des données et est exécutée sur un nœud de travail dans le cluster. Le nombre de tâches par stage dépend du nombre de partitions des données d'entrée pour ce stage.

##### Analyse des Tâches dans nos Stages

Nos stages sont constitué d'un nombre de tâches qui diffère en fonction des stages, entre 1, 2, 4, 5, 8, 55 et 60