# Analysis of Citibikes - Big Data

Author : Ophiase

In [None]:
# At the first execution those options should all be set to True.
# Once those operations are performed, you can rely on the baked version and disable them.
#
# If you already have the https://s3.amazonaws.com/tripdata/{year}-citibike-tripdata.zip
# You can directly disable ENABLE_DOWNLOAD and put the zip files into data
#
# For the graphics, we do not use the star schema. 
# Thus if you disable "DO_NOT_IGNORE_STAR_SCHEMA", 
# then "ENABLE_COMPUTER" and "ENABLE_COMPUTE_EVENT" will be ignored.

# EXTRACT
ENABLE_DOWNLOAD = True
ENABLE_UNZIP = True

# TRANSFORM
ENABLE_COMPUTE_ZIP_TO_PARQUET = True
DO_NOT_IGNORE_STAR_SCHEMA = True
ENABLE_COMPUTE_DIMENSION = True
ENABLE_COMPUTE_EVENT = True

# AGGREGATE
ENABLE_COMPUTE_GRAPHICS_DATA = True 

### Dependencies

In [None]:
import shutil
import os
import requests
import pandas as pd
import numpy as np
import re
import zipfile
from functools import reduce
import math
import json

import warnings
import logging
warnings.simplefilter(action='ignore', category=FutureWarning)
logging.getLogger("pyspark").setLevel(logging.ERROR)
# os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 2g"

import pyspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
from pyspark.sql.functions import when, mean, stddev, skewness, kurtosis, expr, date_format, count
from pyspark.sql.functions import col, desc, lit, udf, dayofweek, avg, year, month, unix_timestamp, hour

import seaborn as sns
import altair as alt
import plotly
import plotly.express as px
# import plotly.graph_objects as go

import matplotlib.pyplot as plt
from plotly.subplots import make_subplots
import plotly.graph_objs as go

from geopy.geocoders import Nominatim
from urllib.request import urlopen

In [None]:
spark = SparkSession.builder \
    .appName("Big_Data_Project") \
    .config("spark.driver.memory", "3g") \
    .config("spark.sql.files.maxPartitionBytes", "2g") \
    .getOrCreate()

## Download the DATA

In [None]:
# from 2014 to 2023

zip_files = []

def download_data():
    trip_urls = [
        (year, f"https://s3.amazonaws.com/tripdata/{year}-citibike-tripdata.zip")
        for year in range(2014, 2023 + 1)
    ]

    if not os.path.exists(os.path.join('data')) :
        os.makedirs('data')

    for year, url in trip_urls:
        basename = os.path.join('data', str(year) + "_" + 'citibike_tripdata')
        zip_filename = basename + ".zip"
        csv_filename = basename + ".csv"
        zip_files.append((year, zip_filename, csv_filename))

        if not ENABLE_DOWNLOAD : continue
        print(f'Check {basename} ...')

        response = requests.get(url, stream=True)
        total_size = int(response.headers.get('content-length', 0))
        block_size = 1024
        progress = 0

        if not os.path.exists(zip_filename) and not os.path.exists(csv_filename) :
            with open(zip_filename, 'wb') as f:
                for data in response.iter_content(block_size):
                    if data:
                        f.write(data)
                        progress += len(data)
                        print(f'\rDownloaded {progress}/{total_size} bytes', end='')

            print(f'\nDownload complete: {zip_filename}')

    print("Finished")

if ENABLE_UNZIP or ENABLE_DOWNLOAD:
    download_data()

In [None]:
def unzip_data():
    for (year, zip_filename, csv_filename) in zip_files:
        # if year < 2018: continue # WARNING : DISABLE THIS LINE

        if not zipfile.is_zipfile(zip_filename):
            print("Corrupted zip file.")
            break

        if os.path.exists("tmp"):
            shutil.rmtree("tmp")

        print("Unzip : ", zip_filename)
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall("tmp")
        print("Process ..")

        # find the folder in tmp
        items = os.listdir("tmp")
        for folder in items :
            if not os.path.isdir(os.path.join("tmp", folder)) or \
                folder.startswith("__") :
                continue
            
            # find all the folder in this folder
            sub_folders = os.listdir(os.path.join("tmp", folder))
            for sub_folder in sub_folders :
                if not os.path.isdir(os.path.join("tmp", folder, sub_folder)) or \
                    sub_folder.startswith(".") : 
                    continue
                
                sub_item = os.listdir(os.path.join("tmp", folder, sub_folder))
                for leaf in sub_item :
                    # move the csv inside to data
                    from_path = os.path.join("tmp", folder, sub_folder, leaf)
                    dest_path = os.path.join("data", leaf)
                    if os.path.exists(dest_path) : 
                        os.remove(dest_path)

                    shutil.move(from_path, "data")

    if os.path.exists("tmp"):
        shutil.rmtree("tmp")

if ENABLE_UNZIP :
    unzip_data()

To optimize disk usage, we could have unziped one file at a time and convert its content instantaneously to `.parquet`.

## Cleaning / Convert the Data

### Technical/Raw Analysis

In [None]:
csv_reader = spark.read.option("header", "true") \
            .option("inferSchema", "true").csv

In [None]:
def find_all_csv():
    all_csv = []
    for item in os.listdir("data"):
        if not item.endswith(".csv") :
            continue
        all_csv.append(item)
    return sorted(all_csv)

all_csv = find_all_csv()

if False: # check column_names.txt
    for item in all_csv:    
        df = csv_reader(os.path.join("data", item))
        print(f"item {item} : {df.columns}")

By looking at the previous code output *(cached in `column_names.txt`)*, \
we notice the following columns between 2014-01 $\to$ 2021-01 (included) :
- `['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', `\
`'start station latitude', 'start station longitude', 'end station id', 'end station name', `\
`'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender']`
    - The naming convention is not exactly the same between : `201610-citibike-tripdata_1.csv` $\to$ `201703-citibike-tripdata.csv_1.csv` : \
    `['Trip Duration', 'Start Time', 'Stop Time', 'Start Station ID',` \
    `'Start Station Name', 'Start Station Latitude', 'Start Station Longitude',` \
    `'End Station ID', 'End Station Name', 'End Station Latitude',` \
    `'End Station Longitude', 'Bike ID', 'User Type', 'Birth Year', 'Gender']`

The columns change between 2021-02 $\to$ 2023-12 (included) :
- `['ride_id', 'rideable_type', 'started_at', 'ended_at', 'start_station_name', `\
`'start_station_id', 'end_station_name', 'end_station_id', 'start_lat', `\
`'start_lng', 'end_lat', 'end_lng', 'member_casual']`

We decide the following matching (**O.** as before 2021-02, **N.** as after 2021-02):

- `O.'tripduration'` as function : `stoptime` - `startime`
    - Renamed as `trip_duration`

- `N.'started_at'` $\leftarrow$ `O.'starttime'`
- `N.'ended_at'` $\leftarrow$ `O.'stoptime'` 
- `N.'start_station_id'` $\leftarrow$ `O.'start station id'`
    - type : string
- `N.'start_station_name'` $\leftarrow$ `O.'start station name'`
- `N.'start_lat'` $\leftarrow$ `O.'start station latitude'`
- `N.'start_lng'` $\leftarrow$ `O.'start station longitude'`
- `N.'end_station_id'` $\leftarrow$ `O.'end station id'`
    - type : string
- `N.'end_station_name'` $\leftarrow$ `O.'end station name'`
- `N.'end_lat'` $\leftarrow$ `O.'end station latitude'`
- `N.'end_lng'` $\leftarrow$ `O.'end station longitude'`
`
- `N.'ride_id'` $\leftarrow$ `O.'bikeid'` (format is not the same)
    - Both type of ID can registered as string

- `N.'member_casual'` $\leftarrow$ `O.'usertype'` (format is not the same)
    - Mapping : `O.Subscriber`, `O.Customer` $\to$ `N.member`, `N.casual`

- `O.'birth year` : (None) for elements of N
    - Renamed as `birth_year`
- `O.'gender'` : (None) for elements of N
- `N.'rideable_type'` : (None) for elements of O

- We will also add a binary column `old_format` to indicate if the data comes from `O` or `N` as defined above.

In [None]:
col_mapping_1 = {
    'tripduration': 'trip_duration',
    'usertype': 'member_casual',
    'birth year': 'birth_year',

    'starttime': 'started_at',
    'stoptime': 'ended_at',
    'start station id': 'start_station_id',
    'start station name': 'start_station_name',
    'start station latitude': 'start_lat',
    'start station longitude': 'start_lng',
    'end station id': 'end_station_id',
    'end station name': 'end_station_name',
    'end station latitude': 'end_lat',
    'end station longitude': 'end_lng',
    'bikeid': 'ride_id',
}

col_mapping_2 = {
    'Trip Duration': 'tripduration',
    'Start Time': 'starttime',
    'Stop Time': 'stoptime',
    
    'Start Station ID': 'start station id',
    'Start Station Name': 'start station name',
    'Start Station Latitude': 'start station latitude',
    'Start Station Longitude': 'start station longitude',

    'End Station ID': 'end station id',
    'End Station Name' : 'end station name',
    'End Station Latitude' : 'end station latitude',
    'End Station Longitude' : 'end station longitude',
    
    'Bike ID' : 'bikeid',
    'User Type' : 'usertype',
    'Birth Year' : 'birth year',
    'Gender' : 'gender'
}


In [None]:
def check_unique_values(df, column):
    return df.select(column).dropDuplicates().rdd.map(lambda row: row[0]).collect()

In [None]:
def fast_check():
    df_o = spark.read.csv(os.path.join("data", all_csv[0]), header=True, inferSchema=True)
    df_o = df_o.select(
        [col(old_col).alias(col_mapping_1.get(old_col, old_col)) for old_col in df_o.columns]
        )

    df_n = spark.read.csv(os.path.join("data", all_csv[-1]), header=True, inferSchema=True)
    
    print(check_unique_values(df_o, "member_casual"))
    print(check_unique_values(df_o, "gender"))
    print(check_unique_values(df_n, "rideable_type"))

    df_o.printSchema()
    df_n.printSchema()
    
    # df.show()

fast_check()

### File format Selection : Parquet

<img src="resources/file-formats.png" alt="Drawing" style="width: 400px;"/>

Our [course](https://stephane-v-boucheron.fr/slides/tbd/slides06_file-formats.html#/title-slide) on Big Data file formats.

Parquet suits our needs for the project:

- Suitable for laptop execution
- 37GB dataset size manageable with Parquet's compression
- Supports splitability for processing subsets of data on minimal configuration
- Compatible with Apache Spark

### CSV $\to$ Parquet

In [None]:
if not os.path.exists(os.path.join('computed')) :
    os.makedirs('computed')

In [None]:
factorial_columns = ["member_casual", "gender", "rideable_type"]

In [None]:
# Lower Case O. part

# add missing columns
def csv_o_process(df):
    df = df.select(
        [col(old_col).alias(col_mapping_1.get(old_col, old_col)) for old_col in df.columns])

    df = df.withColumn("birth_year", 
                       when(col("birth_year") == r"\N", lit(None))
                       .otherwise(col("birth_year"))
                       .cast("integer")
                       )

    df = df.withColumn("start_station_id", col("start_station_id").cast("string"))\
            .withColumn("end_station_id", col("end_station_id").cast("string")) \
            .withColumn("ended_at", col("ended_at").cast("timestamp")) \
            .withColumn("started_at", col("started_at").cast("timestamp")) \
            .withColumn("ride_id", col("ride_id").cast("string"))

    df = df.withColumn("member_casual",
                        when(col("member_casual") == "Subscriber", lit("member")) \
                        .otherwise(lit("casual")))
    
    df = df.withColumn("rideable_type", lit(None).cast('string'))
    df = df.withColumn("old_format", lit(True))
    
    df = df.withColumn("gender", 
         when(col('gender') == 0, lit(None) ) \
        .when(col('gender') == 1, lit("Male")) \
        .when(col('gender') == 2, lit("Female")).cast("string"))

    df = df.select(*sorted(df.columns))

    # df.write.mode("append").parquet(parquet_file)
    return df

def csv_to_parquet_part_1(csv_file) :
    df = spark.read.csv(csv_file, header=True, inferSchema=True)
    return csv_o_process(df)

# Lower case O. part (with wrong the date format)
def csv_to_parquet_part_4(csv_file) :
    df = spark.read.csv(csv_file, header=True, inferSchema=True)
    df = df.withColumn("starttime", F.to_timestamp(df.starttime, "M/d/yyyy H:mm:ss")) \
       .withColumn("stoptime", F.to_timestamp(df.stoptime, "M/d/yyyy H:mm:ss"))
    return csv_o_process(df)

In [None]:
# Upper case O. part

def csv_to_parquet_part_2(csv_file) :
    df = spark.read.csv(csv_file, header=True, inferSchema=True)
    
    df = df.select(
        [col(old_col).alias(col_mapping_2.get(old_col, old_col)) for old_col in df.columns])
    
    return csv_o_process(df)

In [None]:
# N. Part

def csv_to_parquet_part_3(csv_file) :
    df = spark.read.csv(csv_file, header=True, inferSchema=True)

    # df = df.withColumn("trip_duration", 
    #     (unix_timestamp(col("ended_at")) - unix_timestamp(col("started_at"))) \
    #     .cast("integer"))
    
    df = df.withColumn("trip_duration", 
        F.when(
            (col("started_at").isNotNull()) & (col("ended_at").isNotNull()) & (col("ended_at") > col("started_at")),
            (F.unix_timestamp(col("ended_at")) - F.unix_timestamp(col("started_at"))).cast("integer")
        ).otherwise(None))
    
    df = df.withColumn("old_format", lit(False))
    df = df.withColumn("birth_year", lit(None).cast("integer"))

    df = df.withColumn("gender", lit(None).cast("string"))

    df = df.withColumn("start_station_id", col("start_station_id").cast("string"))\
            .withColumn("end_station_id", col("end_station_id").cast("string")) \
            .withColumn("ended_at", col("ended_at").cast("timestamp")) \
            .withColumn("started_at", col("started_at").cast("timestamp")) \
            .withColumn("ride_id", col("ride_id").cast("string"))

    df = df.select(*sorted(df.columns))

    # df.write.mode("append").parquet(parquet_file)
    return df

#### File Partitioning

We decide to partition columns on `"year(started_at)"`, `"month(started_at)"`, `"start_station_id"`

Even with Lazy Loading of the CSV file, dataset doesn't seems to fit in RAM, thus we partition by hand the .parquet file intos multiples files

In [None]:
# Scheduler

schedule = [
    ("201401-citibike-tripdata_1.csv", 1),
    ("201409-citibike-tripdata_1.csv", 4),
    ("201610-citibike-tripdata_1.csv", 2),
    ("201704-citibike-tripdata.csv_1.csv", 1),
    ("202102-citibike-tripdata_1.csv", 3)
]

def process_csv_files(csv_files, schedule):
    schedule_pointer = 0
    df_buffer = []

    for index, csv_file in enumerate(csv_files):
        csv_path = os.path.join("data", csv_file)
        
        if index % 20 == 0 :
            print(f"[{schedule_pointer}, {index}] {csv_file}")
        if (schedule_pointer < len(schedule) - 1) and (csv_file == schedule[schedule_pointer + 1][0]) :
            schedule_pointer += 1
            print(f"{csv_file} | {schedule_pointer} : {schedule[schedule_pointer]}")

        schedule_mode = schedule[schedule_pointer][1]
        target_function = [csv_to_parquet_part_1, csv_to_parquet_part_2, csv_to_parquet_part_3, csv_to_parquet_part_4][schedule_mode - 1]

        df_to_add = target_function(csv_path)
        df_buffer.append(df_to_add)

    print ("REDUCE STEP")
    
    df = reduce(lambda df1, df2: df1.union(df2), df_buffer)
    df = df.withColumn("year",
        when(col("started_at").isNotNull(), 
                year(col("started_at")).cast("integer")
            ).otherwise(lit(None).cast("integer"))
    )

    print ("WITH year/month")

    parquet_path = os.path.join(
        'computed', 'travels', f'travels.parquet')

    df.write \
        .partitionBy("year") \
        .mode("overwrite").parquet(parquet_path)
        # .write.option("maxRecordsPerFile", 70000000) \

if ENABLE_COMPUTE_ZIP_TO_PARQUET :
    process_csv_files(all_csv, schedule)

### Verifying Parquet is working

In [None]:
df = spark.read.parquet(os.path.join("computed", "travels", "*.parquet"))

print("Number of rows : " + str(df.count()))
df.show()

### Star Schema

### Dimension (Stations)

In [None]:
df_dimension = df.select("start_station_id", "start_station_name", 
                                "start_lat", "start_lng") \
                        .withColumnRenamed("start_station_id", "station_id") \
                        .withColumnRenamed("start_station_name", "station_name") \
                        .withColumnRenamed("start_lat", "lat") \
                        .withColumnRenamed("start_lng", "lng")

df_dimension = df_dimension.unionByName(
    df.select("end_station_id", "end_station_name", 
                    "end_lat", "end_lng") \
             .withColumnRenamed("end_station_id", "station_id") \
             .withColumnRenamed("end_station_name", "station_name") \
             .withColumnRenamed("end_lat", "lat") \
             .withColumnRenamed("end_lng", "lng")
            ).distinct()

                #, allowMissingColumns=True

df_dimension = df_dimension.groupBy("station_id").agg(
    F.first("station_name").alias("station_name"),
    F.first("lat").alias("lat"),
    F.first("lng").alias("lng")
)

df_dimension.explain()

In [None]:
if DO_NOT_IGNORE_STAR_SCHEMA and ENABLE_COMPUTE_DIMENSION:
    df_dimension.write \
                .mode("overwrite") \
                .parquet(os.path.join("computed", "dimension", "dimension.parquet"))
    #.option("maxRecordsPerFile", 70000000) \

In [None]:
if DO_NOT_IGNORE_STAR_SCHEMA :
    df_dimension = spark.read.parquet(os.path.join("computed", "dimension", "dimension.parquet"))
    # df_station_dim.show()
    df_dimension.count()

#### Events

In [None]:
def make_event_table(df_event):
    columns = [
        "start_station_name", "start_lng", "start_lat", 
        "end_station_name", "end_lng", "end_lat"
        ]
    
    for x in columns :
        df_event = df_event.drop(col(x))

    return df_event

df_event = make_event_table(df)
df_event.describe()

In [None]:
if DO_NOT_IGNORE_STAR_SCHEMA and ENABLE_COMPUTE_EVENT :
    df_event.write \
                .partitionBy("year") \
                .mode("overwrite") \
                .parquet(os.path.join("computed", "event", "event.parquet"))


In [None]:
if DO_NOT_IGNORE_STAR_SCHEMA :
    df_event = spark.read.parquet(os.path.join("computed", "event", "event.parquet"))
    df_event.count()
    df_event.show()

### Star schema

In [None]:
def build_df_star(df_event, df_dimension):
    df_dimension_start = df_dimension.selectExpr(
        "station_id as start_station_id",
        "station_name as start_station_name",
        "lat as start_lat",
        "lng as start_lng"
    )

    df_dimension_end = df_dimension.selectExpr(
        "station_id as end_station_id",
        "station_name as end_station_name",
        "lat as end_lat",
        "lng as end_lng"
    )

    return df_event \
        .join(df_dimension_start, df_event.start_station_id == df_dimension_start.start_station_id, "left") \
        .join(df_dimension_end, df_event.end_station_id == df_dimension_end.end_station_id, "left")

df_star = build_df_star(df_event, df_dimension)
df_star.explain()

## Analysis

In [None]:
if not os.path.exists(os.path.join('computed', 'graphics')) :
    os.makedirs(os.path.join('computed', 'graphics'))

### Split before/after Covid-19

In [None]:
end_date_pre_covid = '2020-02-29'
start_date_post_covid = '2020-03-01'

df_pre_covid = df.filter(col('ended_at') <= end_date_pre_covid)
df_2018 = df.filter((col('ended_at') <= '2019-01-01') & (col('started_at') > '2018-01-01')) 

df_post_covid = df.filter(col('started_at') >= start_date_post_covid)
df_2022 = df.filter((col('ended_at') <= '2023-01-01') & (col('started_at') > '2022-01-01')) 

In [None]:
# print("Before Covid-19")
# df_pre_covid.show(5)
# print("After Covid-19")
# df_post_covid.show(5)

#### Compute distances and durations

In [None]:
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Rayon de la Terre en kilomètres
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.asin(math.sqrt(a))
    return R * c

haversine_udf = udf(haversine, DoubleType())

In [None]:
def compte_trip_distance(df, verbose=False):
    if verbose : print("compute distance")

    df = df.filter(
        col("start_lat").isNotNull() & 
        col("start_lng").isNotNull() & 
        col("end_lat").isNotNull() & 
        col("end_lng").isNotNull()
    )

    df = df.withColumn(
        "distance_km", haversine_udf(col("start_lat"), 
                                     col("start_lng"), 
                                     col("end_lat"), 
                                     col("end_lng")))
    
    return df

df_2018 = compte_trip_distance(df_2018)
df_2022 = compte_trip_distance(df_2022)
df_post_covid = compte_trip_distance(df_post_covid)

In [None]:
# generic function
def visualize_trip_distance_over_something(metric, metric_name, df_1, df_2=None, description_1="", description_2="", min_factor=0.99):
    def specific_chart(df, description_bonus):
        min_value = df['avg_distance_km'].min() * min_factor
        max_value = df['avg_distance_km'].max() * 1.02

        return alt.Chart(df).mark_bar().encode(
                x=alt.X(metric, title=metric_name, sort=None),
                y=alt.Y('avg_distance_km:Q', title='Average distance (km)',
                        scale=alt.Scale(domain=(min_value, max_value))),
                color=alt.Color(metric, title=metric_name),
                tooltip=['avg_distance_km', metric]
            ).properties(
                title= f'Average trip distance per {metric_name} {description_bonus}',
                width= 300,
                height= 200
            )
        

    if df_2 is None:
        chart = alt.hconcat(specific_chart(df_1, description_1))
    else :
        chart = alt.hconcat(
            specific_chart(df_1, description_1),
            specific_chart(df_2, description_2)
        )

    chart = chart.configure_axis(
        labelAngle=45,
        titleFontSize=12,
        labelFontSize=10,
        labelOverlap='parity'
    ).configure_legend(
        titleFontSize=12,
        labelFontSize=10
    )

    return chart


In [None]:
def save_graphic_data(graphic_data_generator, graphic_name, ENABLE_COMPUTE_GRAPHICS_DATA=ENABLE_COMPUTE_GRAPHICS_DATA) :
    file_name = os.path.join("computed", "graphics", f"{graphic_name}.h5")
    if ENABLE_COMPUTE_GRAPHICS_DATA :
        graphic_data = graphic_data_generator()
        graphic_data.to_hdf(file_name, key='df', mode='w')
        return graphic_data
    return pd.read_hdf(file_name, key='df')

### Trip distance by day

In [None]:
def compute_trip_distance_over_week(df, verbose=False):
    '''
        Returns in the pandas format
    '''
    if verbose : print("compute day of week")
    df = df.withColumn("day_of_the_week", dayofweek(col("started_at")))

    if verbose : print("group by")
    
    avg_distance_by_day_of_the_week = df.groupBy("day_of_the_week") \
        .agg(avg("distance_km").alias("avg_distance_km"))
    
    avg_distance_by_day_of_the_week = avg_distance_by_day_of_the_week.withColumn(
        "day_of_the_week",
        when(col("day_of_the_week") == 1, "Sunday")
        .when(col("day_of_the_week") == 2, "Monday")
        .when(col("day_of_the_week") == 3, "Tuesday")
        .when(col("day_of_the_week") == 4, "Wednesday")
        .when(col("day_of_the_week") == 5, "Thursday")
        .when(col("day_of_the_week") == 6, "Friday")
        .when(col("day_of_the_week") == 7, "Saturday")
    )

    return avg_distance_by_day_of_the_week.toPandas()

trip_distance_over_week_2018 = save_graphic_data(lambda: compute_trip_distance_over_week(df_2018), "trip_distance_over_week_2018")
trip_distance_over_week_2022 = save_graphic_data(lambda: compute_trip_distance_over_week(df_2022), "trip_distance_over_week_2022")

In [None]:
visualize_trip_distance_over_something(
    "day_of_the_week", "day",
    trip_distance_over_week_2018, trip_distance_over_week_2022, 
    " (2018)", " (2022)"
).interactive()

###  Number of trips for each pickup/dropoff location couple

In [None]:
def aggregate_on_pickup_dropoff(df):
    return df.groupBy(
        "start_station_id", "end_station_id"
        ).agg(
            count("*").alias("count"),
            F.concat_ws(" -> ", 
                      F.first("start_station_name"), 
                      F.first("end_station_name")
                      ).alias("station_pair")
        )
    
def reduce_pickup_dropoff(df):
    return df.groupBy("count") \
        .agg(count("*").alias("frequency")) \
        .toPandas()

def top_10_pickup_dropoff(agg_df):
    top_10_df = agg_df.orderBy(desc("count")).limit(10)
    return top_10_df.toPandas()

def visualize_number_of_pickup_drop(ax, df, date):
    sns.histplot(df["count"], bins=30, kde=True, ax=ax)
    ax.set_xlabel("Number of couple (start_station_id, end_station_id) during " + date)
    ax.set_ylabel("Number of occurrences the couple")
    ax.set_title("(start_station_id, end_station_id)")

def visualize_top_10_pickup_dropoff(ax, top_10_df, date):
    sns.barplot(x="count", y="station_pair", data=top_10_df, dodge=False, ax=ax)
    ax.set_xlabel("Number of Occurrences")
    ax.set_ylabel("Station Pair")
    ax.set_title("Top 10 Most Frequent Start-End Station Pairs during " + date)
    
    labels = [item.get_text() for item in ax.get_yticklabels()]
    wrapped_labels = ["\n".join(label.split(" -> ")) for label in labels]
    ax.set_yticklabels(wrapped_labels)

def full_visualization(df, date):
    agg = aggregate_on_pickup_dropoff(df)

    df_reduce_pickup_dropoff = save_graphic_data(lambda: reduce_pickup_dropoff(agg), f"reduce_pickup_dropoff_{date}")
    df_top_10_pickup_dropoff = save_graphic_data(lambda: top_10_pickup_dropoff(agg), f"top_10_pickup_dropoff_{date}")
    
    fig, axes = plt.subplots(1, 2, figsize=(20, 8))
    visualize_number_of_pickup_drop(axes[0], df_reduce_pickup_dropoff, date)
    visualize_top_10_pickup_dropoff(axes[1], df_top_10_pickup_dropoff, date)
    
    plt.tight_layout()
    plt.show()


In [None]:
full_visualization(df_2018, "2018")

In [None]:
full_visualization(df_2022, "2022")

### Trip distance distribution for gender

In [None]:
def compute_trip_distance_over_genders(df, verbose=False):
    '''
        Returns in Pandas format
    '''
    df = df.filter(col("gender").isNotNull())
    
    avg_distance = df.groupBy("gender") \
        .agg(avg("distance_km").alias("avg_distance_km"))
    
    return avg_distance.toPandas()

trip_distance_over_genders_2018 = save_graphic_data(lambda: compute_trip_distance_over_genders(df_2018), "trip_distance_over_genders_2018")
trip_distance_over_genders_post_covid = save_graphic_data(lambda: compute_trip_distance_over_genders(df_post_covid), "trip_distance_over_genders_post_covid")

In [None]:
visualize_trip_distance_over_something(
    "gender", "Gender",
    trip_distance_over_genders_2018, trip_distance_over_genders_post_covid,
    " (2018)", " (post covid)", min_factor=0.0
    ).interactive()


### Trip distance distribution for age ranges

In [None]:
def compute_trip_distance_over_ages(df, verbose=False):
    '''
        Returns in Pandas format
    '''
        
    df = df.filter(col("birth_year").isNotNull())
    df = df.withColumn("age", year(col('started_at')) - col('birth_year'))

    df = df.withColumn(
        "age_range", 
         when(col('age') <= 14, lit(None) ) \
        .when((15 <= col('age')) & (col('age') <= 24), lit('15-24')) \
        .when((25 <= col('age')) & (col('age') <= 44), lit('25-45')) \
        .when((45 <= col('age')) & (col('age') <= 54), lit('45-54')) \
        .when((55 <= col('age')) & (col('age') <= 64), lit('55-64')) \
        .when(65 <= col('age'), lit('65+')) \
    )

    avg_distance = df.groupBy("age_range") \
        .agg(avg("distance_km").alias("avg_distance_km"))

    return avg_distance.toPandas()

trip_distance_over_ages_2018 = save_graphic_data(lambda: compute_trip_distance_over_ages(df_2018), "trip_distance_over_ages_2018")
trip_distance_over_ages_post_covid = save_graphic_data(lambda: compute_trip_distance_over_ages(df_post_covid), "trip_distance_over_ages_post_covid")

In [None]:
visualize_trip_distance_over_something(
    "age_range", "Age Range",
    trip_distance_over_ages_2018, trip_distance_over_ages_post_covid,
    " (2018) ", " (post covid) ", min_factor=0.0
    ).interactive()


### Trip distance distribution for different kind of bikes

In [None]:
def compute_trip_distance_over_bikes(df, verbose=False):
    df = df.filter(col("rideable_type").isNotNull())
    
    avg_distance = df.groupBy("rideable_type") \
        .agg(avg("distance_km").alias("avg_distance_km"))

    return avg_distance.toPandas()

trip_distance_over_bikes_post_covid = save_graphic_data(lambda: compute_trip_distance_over_bikes(df_post_covid), "trip_distance_over_bikes_post_covid")

In [None]:
visualize_trip_distance_over_something(
    "rideable_type", "Type of bike",
    trip_distance_over_bikes_post_covid, None,
    " (after covid) ", min_factor=0.0
    ).interactive()


### Hour of the week

In [None]:
def compute_hour_of_the_week(df):
    df = df.dropna(subset=["started_at"])
    df = df.withColumn("day_of_the_week", dayofweek(col("started_at")) - 1)
    df = df.withColumn("hour_of_the_day", hour(col("started_at")))
    df = df.withColumn("hour_of_the_week", 
                col("day_of_the_week")*24 + col("hour_of_the_day"))
    
    df = df.withColumn(
        "day_of_the_week",
        when(col("day_of_the_week") == 0, "Sunday")
        .when(col("day_of_the_week") == 1, "Monday")
        .when(col("day_of_the_week") == 2, "Tuesday")
        .when(col("day_of_the_week") == 3, "Wednesday")
        .when(col("day_of_the_week") == 4, "Thursday")
        .when(col("day_of_the_week") == 5, "Friday")
        .when(col("day_of_the_week") == 6, "Saturday")
    )
    
    return df

In [None]:
def compute_data_per_hour_of_the_week(df):
    result = df.groupBy("hour_of_the_week", "day_of_the_week", "hour_of_the_day") \
    .agg(
        F.countDistinct(
            "start_station_id", "end_station_id"
            ).alias("distinct_station_pairs"),
        avg("distance_km").alias("avg_distance_km"),
        avg("trip_duration").alias("avg_trip_duration"),
        count("*").alias("row_count")
    )

    return result.toPandas().sort_values("hour_of_the_week").reset_index(drop=True)

f = lambda x: compute_data_per_hour_of_the_week(compute_hour_of_the_week(x))

data_phw_2018 = save_graphic_data(lambda: f(df_2018), "data_phw_2018")
data_phw_2022 = save_graphic_data(lambda: f(df_2022), "data_phw_2022")

In [None]:
print("2018");data_phw_2018.head()

In [None]:
print("2022");data_phw_2022.head()

In [None]:
def full_visualization_over_the_week(metric, metric_name, df_1, df_2, description_1="", description_2=""):
    def specific_chart(df, description_bonus):
        min_value = 0 # df['avg_distance_km'].min() * 0.80
        max_value = df['avg_distance_km'].max() * 1.02

        return alt.Chart(df).mark_bar().encode(
                x=alt.X('hour_of_the_week:N', title='Hour of the week', sort=None),
                y=alt.Y(metric, title=metric_name), #scale=alt.Scale(domain=(min_value, max_value))),
                color=alt.Color('day_of_the_week', title='Day of the week'),
                tooltip=[metric, 'day_of_the_week', 'hour_of_the_day']
            ).properties(
                title= f'{metric_name} per hour of the week ' + description_bonus,
                width= 300,
                height= 200
            )
        
    chart = alt.hconcat(
        specific_chart(df_1, description_1),
        specific_chart(df_2, description_2)
    ).configure_axis(
        labelAngle=45
    )

    chart = chart.configure_axis(
        titleFontSize=12,
        labelFontSize=10,
        labelOverlap='parity'
    ).configure_legend(
        titleFontSize=12,
        labelFontSize=10
    )

    return chart

In [None]:
full_visualization_over_the_week_1822 = lambda x, y : full_visualization_over_the_week(x, y, data_phw_2018, data_phw_2022, "(2018)", "(2022)")

### The number of pickups/docks

In [None]:
full_visualization_over_the_week_1822("distinct_station_pairs", "Distinct Station Pairs")

### The average distance

In [None]:
full_visualization_over_the_week_1822("avg_distance_km", "Average distance (km)")

### The average trip duration

In [None]:
full_visualization_over_the_week_1822("avg_trip_duration", "Average trip duration (seconds)")

### The average number of ongoing trips

In [None]:
full_visualization_over_the_week_1822("row_count", "Number of ongoing trips")

## Monitoring

### Relevant Job to analyze

In [None]:
# df_to_analyze = spark.read.parquet(os.path.join("computed", "travels", "*.parquet"))
# df_to_analyze.show()

# df_star.show()

We'll also analyze some toPandas performed for visualization graphics above in the notebook.

### Monitor Travels

In [None]:
df.explain(True)

2. Relation does not differ between Analyzed and Optimized Logical Plan. The query optimizer in the RDBMS did not find any opportunities to further optimize the logical plan after the initial analysis phase.


3. The Physical Plan differs from the Optimized Logical Plan in that it includes the ColumnarToRow operator, which is a Spark-specific operator that converts data from the columnar format used for storage and processing to the row format used for execution. The FileScan parquet operator is also specific to Spark and is used to scan Parquet data files.

4.
    - It requires :
        - One stage to read the parquet file of `travels`.
        - One stage to show it.
        - Two stages (over two jobs) to show the df_star 
    - **HashAggregate** efficiently performs aggregations by grouping data using hash tables, optimizing memory usage and processing speed. 
    - **Exchange hashpartitioning** redistributes data across nodes based on hashed keys, enabling parallel processing and necessary data colocation for operations like joins, albeit with the cost of shuffle operations.

5. The above operations do not perform shuffle operations. But, neverthless, if we look down on jobs performing at .toPandas operation, they do perform a shuffle operation to do so.

6. 
    - Tasks are the smallest units of work within a stage, each processing a single data partition.
    - Again, for the 3 operations shown in this part, there is one task (except for df_star which uses 2 stages in one of its tasks). Nevertheless, most toPandas stages perform **8** task (because, the `travels.parquet` is partionned into the **8**`** years present in the dataset) 

## Spatial Informations

### Heat Map

In [None]:
def build_heatmap_data(df, fraction):
    trip_counts = df.groupBy("start_station_id", "end_station_id").count()
    trip_counts = trip_counts.withColumnRenamed("count", "trip_count")
    if fraction != 1.0 :
        trip_counts = trip_counts.sample(withReplacement=False, fraction=fraction)
    trip_counts = trip_counts.toPandas()
    return trip_counts.sort_values("trip_count") # sort by count ?

def preview_heat_map(heatmap_data):
    heatmap_data = heatmap_data.pivot(index='start_station_id', columns='end_station_id', values='trip_count')
    return px.imshow(heatmap_data, title="Heatmap of Start/End stations in 2018", labels=dict(x="End Station", y="Start Station", color="Number of Trips"))

FRACTION = 1.0 # my computer is not powerful enough
heatmap_data = save_graphic_data(lambda: build_heatmap_data(df_2018, FRACTION), "heatmap_data_2018")
preview_heat_map(heatmap_data).show()

### Build Spatial Data

### Interactive Map

In [None]:
def build_interactive_map_data(df):
    start_df = df.select(col("start_station_id").alias("station_id"),
                        hour(col("started_at")).alias("hour"),
                        col("start_lat").alias("lat"),
                        col("start_lng").alias("lng"))

    end_df = df.select(col("end_station_id").alias("station_id"),
                        hour(col("ended_at")).alias("hour"),
                        col("end_lat").alias("lat"),
                        col("end_lng").alias("lng"))

    union_df = start_df.union(end_df)

    hourly_trip_counts = union_df.groupBy("hour", "station_id", "lat", "lng").count()

    return hourly_trip_counts.toPandas()

interactive_map_data = save_graphic_data(lambda: build_interactive_map_data(df_2018), "interactive_map_data_2018")

In [None]:
def display_heatmap(df, bonus=""):
    df = df.sort_values("hour")
    fig = px.density_mapbox(df, lat='lat', lon='lng', z='count', radius=30,
                            center=dict(lat=40.7128, lon=-74.0060), zoom=10,
                            mapbox_style="carto-positron", animation_frame='hour',
                            range_color=[0, df['count'].max()*1.0])

    fig.update_layout(
        title="Citibikes Station Frequency Heatmap of New York City by Hour" + bonus,
        updatemenus=[dict(
            type="buttons",
            showactive=False,
            buttons=[
                dict(label="Play",
                     method="animate",
                     args=[None, dict(frame=dict(duration=500, redraw=True), fromcurrent=True)]),
                dict(label="Pause",
                     method="animate",
                     args=[[None], dict(frame=dict(duration=0, redraw=False), mode="immediate", fromcurrent=True)])
            ]
        )],
        sliders = [dict(
            active=0,
            currentvalue={"prefix": "Hour: "},
            pad={"t": 1}
        )],
        height=700, width=700 # margin=dict(l=0, r=0, t=10.0, b=0)
        
    )

    fig.show(frame=0)

display_heatmap(interactive_map_data, " (2018)")