In [None]:
import os
import sys
from pyspark.sql import SparkSession
import pyspark.sql

spark = (SparkSession
        .builder
        .appName("BIG DATA PROJECT")
        .getOrCreate()
        )

In [None]:
#!pip install geojson geopandas plotly

In [None]:
#!pip install ipyleaflet


## New York taxis trips

In [None]:
# import all the packages we need
import geopandas as gpd
import plotly.graph_objects as go
import pandas as pd
from plotly.subplots import make_subplots
import plotly.express as px
from pyspark.sql.functions import *
import seaborn as sns
import matplotlib.pyplot as plt

# read the parquet file
df_yel= spark.read.parquet("/home/marius/Téléchargements/yellow_tripdata_2019-03.parquet")
df_fhv= spark.read.parquet("/home/marius/Téléchargements/fhv_tripdata_2019-03.parquet")

df_zones = gpd.read_file("/home/marius/Téléchargements/taxi_zones/taxi_zones.shp")


df_yel.columns
#df_fhv.columns
#df_yel.show()
#df_fhv.show()

df_zones


## Using data as parquet files

1. What is the number of partitions of the dataframe?

In [None]:
# for yellow taxis
num_partitions_yel = df_yel.rdd.getNumPartitions()
num_partitions_yel

In [None]:
# for fhv taxis
num_partitions_fhv = df_fhv.rdd.getNumPartitions()
num_partitions_fhv

In [None]:
# Dropping na values 
df_yel.na.drop()
df_fhv.na.drop()

In [None]:
# for yellow taxis
df_yel.show()


In [None]:
# for fhv taxis
df_fhv.show()

## Investigate (at least) one month of data in 2019

1. Using these boundaries, filter the 2019 data (using pickup and dropoff longitude and
latitude) and count the number of trips for each value of passenger_count and make a
plot of that.

In [None]:
  
# Return Centroid as crs code of 3310 for calcuating distance in meters.
     
df_zones["centroid"] = df_zones.geometry.centroid.to_crs(epsg=3310)
    
# Convert cordinates to the WSG84 lat/long CRS has a EPSG code of 4326.
df_zones["latitude"] = df_zones.centroid.to_crs(epsg=4326).y
df_zones["longitude"] = df_zones.centroid.to_crs(epsg=4326).x
df_zones =df_zones[['LocationID','latitude','longitude','zone','borough']]

#convert dataframe zones to spark dataframe 
df_zones=spark.createDataFrame(df_zones)


df_zones.show()

In [None]:
# rename columns
def renameColumns(df, old_columns, new_columns):
    
    for old_col, new_col in zip(old_columns,new_columns):
        df = df.withColumnRenamed(old_col,new_col)
    return df

def renameFullColumns(df):
    old_columns = ['LocationID','latitude','longitude','zone','borough']
    new_columns_pickup = ['DOLocationID', 'pickup_latitude','pickup_longitude',
                          'zones_pickup_location','borough_pickup_location']
    new_columns_dropoff = ['PULocationID', 'dropoff_latitude','dropoff_longitude',
                           'zones_dropoff_location','borough_dropoff_location']
    zones_pickup = renameColumns(df, old_columns, new_columns_pickup)
    zones_dropoff = renameColumns(df, old_columns, new_columns_dropoff)
    return zones_pickup, zones_dropoff

# Join trips data with df_zones in order to have longitudes and latitudes
def joinDataframes(df,zones_pickup, zones_dropoff):
    df=df.join(zones_pickup,['DOLocationID'])
    df=df.join(zones_dropoff,['PULocationID'])
    return df

df_zones.show

zones_pickup , zones_dropoff = renameFullColumns(df_zones)

In [None]:
# for yellow taxis
full_data_yel = joinDataframes(df_yel, zones_pickup, zones_dropoff)
full_data_yel.show()

In [None]:
# for fhv taxis
full_data_fhv = joinDataframes(df_fhv, zones_pickup, zones_dropoff)
full_data_fhv.show()

In [None]:

def filterData(df):
    return df[(df['pickup_latitude'] >= 40.58) & 
                  (df['pickup_latitude'] <= 40.90) & 
                  (df['dropoff_latitude'] >= 40.58) & 
                  (df['dropoff_latitude'] <= 40.90) & 
                  (df['pickup_longitude'] >= -74.10) & 
                  (df['pickup_longitude'] <= -73.70) & 
                  (df['dropoff_longitude'] >= -74.10) & 
                  (df['dropoff_longitude'] <= -73.70)]


In [None]:
# for yellow taxis
df_yel_filtered = filterData(full_data_yel)
df_yel_filtered.show()

In [None]:
# for fhv taxis
df_fhv_filtered = filterData(full_data_fhv)
df_fhv_filtered.show()

count the number of trips for each value of passenger_count

In [None]:
#  Count the number of trips for each value of passenger_count
# for yellow taxis
df_passenger=df_yel_filtered.groupby('passenger_count').count()

Plot the trip counts

In [None]:

# Plot the trip counts
df_passenger_pd = df_passenger.toPandas() #transform to pandas dataframe for plot
plt.bar(df_passenger_pd["passenger_count"], df_passenger_pd["count"])
plt.xlabel("Passenger Count")
plt.ylabel("Number of Trips")
plt.title("Number of Trips by Passenger Count")
plt.show()

1. What’s special with trips with zero passengers?

In [None]:
# Filter the data for trips with zero passengers
zero_passenger_data = df_yel_filtered.filter(col("passenger_count") == 0)

2. What’s special with trips with more than 6 passengers?


In [None]:
more_passenger_data = df_yel_filtered.filter(col("passenger_count") > 6)

3. What is the largest distance travelled during this month? Is it the first taxi on the moon?

In [None]:
max_distance = df_yel_filtered.selectExpr("MAX(trip_distance) AS max_distance").collect()[0]["max_distance"]
print("The largest distance travelled during this month is: " ,max_distance, "milles")

4. Plot the distribution of the trip_distance (using an histogram for instance) during year 2019. Focus on trips with non-zero trip distance and trip distance less than 30 miles.

In [None]:
# Filter the data for non-zero trip distances less than 30 miles
yel_filtered_data = df_yel_filtered.filter((col("trip_distance") > 0) & (col("trip_distance") < 30))

# Convert the filtered data to a Pandas DataFrame for plotting
yel_filtered_data = yel_filtered_data.select("trip_distance").toPandas()

# Plot the distribution of trip distances
plt.hist(yel_filtered_data["trip_distance"], bins=30, edgecolor="black")
plt.xlabel("Trip Distance (miles)")
plt.ylabel("Frequency")
plt.title("Distribution of Trip Distances (2019)")
plt.show()

1. Use the explain method or have a look at the Spark UI to analyze the job.

In [None]:
df_yel_filtered.explain(True)

2. Compare the Analyzed Logical Plan and Optimized Logical Plan: 
Analyzed Logical Plan: This plan represents the logical operations after the analysis phase, where Spark resolves references, applies type checking, and performs other logical optimizations. Optimized Logical Plan: This plan represents the logical operations after further optimization, such as predicate pushdown, column pruning, or constant folding.


3. Compare the Physical Plan with the Optimized Logical Plan: Physical Plan: This plan describes the actual execution strategy that Spark will use to process the query. It includes details about how the data will be read, transformed, and distributed across the worker nodes. 3-2. Keywords and concepts in the Physical Plan that you would not expect in an RDBMS include: Shuffle: This keyword indicates data shuffling, which involves redistributing data across partitions or nodes, often required for operations like joins or aggregations. Exchange: This keyword represents the exchange of data between partitions or nodes. Sort: This keyword indicates sorting operations performed during the query execution. Broadcast: This keyword represents the broadcasting of small data to all worker nodes, typically used in join operations. Code generation: This indicates that Spark generates code dynamically for specific operations to improve performance. In an RDBMS, the physical plan might involve index scans, table scans, join algorithms, and disk-based operations. The specific keywords and concepts may vary depending on the RDBMS implementation.

4. Tasks in Spark refer to the individual units of work performed on the data. Each task processes a subset of the data and is executed on a single partition of a stage. The number of tasks within each stage may vary depending on the size of the data and the available resources. You can find the number of tasks for each stage in the Spark UI under the "Stages" tab.

1. Break down the trip distance distribution for each day of week

In [None]:
# Extract the day of the week from the pickup datetime
df_with_day_of_week = df_yel_filtered.withColumn('pickup_day_of_week', dayofweek('tpep_pickup_datetime'))

# Group by day of the week and calculate the distribution of trip distances
distance_breakdown = df_with_day_of_week.groupBy('pickup_day_of_week').agg(mean('trip_distance').alias('avg_distance'))

# Convert the result to a Pandas DataFrame for plotting
distance_breakdown_pd = distance_breakdown.toPandas()

# Plot the breakdown of trip distance
plt.figure(figsize=(8, 6))
sns.barplot(x='pickup_day_of_week', y='avg_distance', data=distance_breakdown_pd)
plt.xlabel('Day of Week')
plt.ylabel('Average Trip Distance')
plt.title('Breakdown of Trip Distance by Day of Week')
plt.show()

2. Count the number of distinct pickup location

In [None]:
distinct_pickup_locations = df_yel_filtered.select('PUlocationID').distinct().count()
print("Number of distinct pickup locations:", distinct_pickup_locations)

3. Compute and display tips and profits as a function of the pickup location

In [None]:
tips_profits_by_location = df_yel_filtered.groupBy('PUlocationID').agg(sum('tip_amount').alias('total_tips'),
                                                          sum('extra').alias('total_profits'))

# Display the results
tips_profits_by_location.show()

## Investigate one month of trips data in 2019, 2020, 2021, 2022

## For year 2020

In [None]:
# Read the data
df_yel_2020 = spark.read.parquet("/home/marius/Téléchargements/yellow_tripdata_2020-08.parquet")
df_fhv_2020 = spark.read.parquet("/home/marius/Téléchargements/fhv_tripdata_2020-08.parquet")

In [None]:
# Dropping na values 
df_yel_2020.na.drop()
df_fhv_2020.na.drop()

In [None]:
# for yellow taxis
full_data_yel_2020 = joinDataframes(df_yel_2020, zones_pickup, zones_dropoff)
full_data_yel_2020.show()

In [None]:
# for fhv taxis
full_data_fhv_2020 = joinDataframes(df_fhv_2020, zones_pickup, zones_dropoff)
full_data_fhv_2020.show()

In [None]:
# for yellow taxis
df_yel_filtered_2020 = filterData(full_data_yel_2020)
df_yel_filtered_2020.show()

In [None]:
# for fhv taxis
df_fhv_filtered_2020 = filterData(full_data_fhv_2020)
df_fhv_filtered_2020.show()

In [None]:
# for yellow taxis
df_yel_filtered_2020.show()

In [None]:
# for fhv taxis
df_fhv_filtered_2020.show()

1. Filter and cache/persist the result

In [None]:
# for yellow taxis
df_yel_filtered_2020 = df_yel_2020.cache()

# for fhv taxis
df_fhv_filtered_2020 = df_fhv_2020.cache()

## Assessing seasonalities and looking at time series

1. The number of pickups

In [None]:
# for yellow taxis
counts_pickup_yel_2020 = df_yel_filtered_2020.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                             hour('tpep_pickup_datetime').alias('pickup_hour')).count()
#display the result


In [None]:
# for fhv taxis
counts_pickup_fhv_2020 = df_fhv_filtered_2020.groupby(dayofweek('pickup_datetime').alias('pickup_day'),
                             hour('pickup_datetime').alias('pickup_hour')).count()
#display the result

2. The average fare

In [None]:
#for yellow taxis
avg_fare_yel_2020 = df_yel_filtered_2020.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                        hour('tpep_pickup_datetime').alias('pickup_hour')).avg('fare_amount')
#display the result
#print("The average fare is: " ,avg_fare)
avg_fare_yel_2020 = avg_fare_yel_2020.toPandas()
avg_fare_yel_2020 = avg_fare_yel_2020.sort_values(by=['pickup_hour'])
avg_fare_yel_2020


3. The average trip duration

In [None]:
# for yellow taxis
avg_trip_duration_yel_2020 = df_yel_filtered_2020.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'), 
                            hour('tpep_pickup_datetime').alias('pickup_hour')).avg('trip_distance')
#display the result


In [None]:
# for fhv taxis
avg_trip_duration_fhv_2020 = df_fhv_filtered_2020.groupby(dayofweek('pickup_datetime').alias('pickup_day'), 
                            hour('pickup_datetime').alias('pickup_hour'))
#display the result

4. Plot the average of ongoing trips

In [None]:

# convert into pandas dataframe
df_yel_filtered_2020 = df_yel_filtered_2020.toPandas()

# Convert the 'tpep_dropoff_datetime' and 'tpep_pickup_datetime' columns to datetime format
df_yel_filtered_2020['tpep_dropoff_datetime'] = pd.to_datetime(df_yel_filtered_2020['tpep_dropoff_datetime'])
df_yel_filtered_2020['tpep_pickup_datetime'] = pd.to_datetime(df_yel_filtered_2020['tpep_pickup_datetime'])

# Calculate the duration of each trip by subtracting the pickup datetime 
# from the dropoff datetime to get the trip duration in seconds.

df_yel_filtered_2020['trip_duration'] = (df_yel_filtered_2020['tpep_dropoff_datetime'] - 
                                    df_yel_filtered_2020['tpep_pickup_datetime']).dt.total_seconds()

# Group the data by the desired time interval and calculate the average trip duration for each interval.

average_trip_duration = df_yel_filtered_2020.groupby(df_yel_filtered_2020['tpep_pickup_datetime'].dt.hour)['trip_duration'].mean()

# the plot
plt.figure(figsize=(10, 6)) 
plt.bar(average_trip_duration.index, average_trip_duration.values)
plt.xlabel('Hour')
plt.ylabel('Average Trip Duration (seconds)')
plt.title('Average Trip Duration by Hour')
plt.xticks(average_trip_duration.index)
plt.grid(True) 
plt.show()


In [None]:
# for fhv taxis
# convert into pandas dataframe
df_fhv_filtered_2020 = df_fhv_filtered_2020.toPandas()

# Convert the 'tpep_dropoff_datetime' and 'tpep_pickup_datetime' columns to datetime format
df_fhv_filtered_2020['dropOff_datetime'] = pd.to_datetime(df_fhv_filtered_2020['dropOff_datetime'])
df_fhv_filtered_2020['pickup_datetime'] = pd.to_datetime(df_fhv_filtered_2020['pickup_datetime'])

# Calculate the duration of each trip by subtracting the pickup datetime 
# from the dropoff datetime to get the trip duration in seconds.

df_fhv_filtered_2020['trip_duration'] = (df_fhv_filtered_2020['dropOff_datetime'] - 
                                    df_fhv_filtered_2020['pickup_datetime']).dt.total_seconds()

# Group the data by the desired time interval and calculate the average trip duration for each interval.

average_trip_duration = df_fhv_filtered_2020.groupby(df_fhv_filtered_2020['pickup_datetime'].dt.hour)['trip_duration'].mean()

# the plot
plt.figure(figsize=(10, 6)) 
plt.bar(average_trip_duration.index, average_trip_duration.values)
plt.xlabel('Hour')
plt.ylabel('Average Trip Duration (seconds)')
plt.title('Average Trip Duration by Hour')
plt.xticks(average_trip_duration.index)
plt.grid(True) 
plt.show()


## Rides to the airports

1. Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport

In [None]:
# convert into pandas dataframe
#df_yel_filtered_2020 = df_yel_filtered_2020.toPandas()

# Convert the 'tpep_dropoff_datetime' and 'tpep_pickup_datetime' columns to datetime format
#df_yel_filtered['tpep_dropoff_datetime'] = pd.to_datetime(df_yel_filtered['tpep_dropoff_datetime'])
#df_yel_filtered['tpep_pickup_datetime'] = pd.to_datetime(df_yel_filtered['tpep_pickup_datetime'])

# Calculate the duration of each trip by subtracting the pickup datetime 
# from the dropoff datetime to get the trip duration in seconds.

#df_yel_filtered['trip_duration'] = (df_yel_filtered['tpep_dropoff_datetime'] - 
 #                                   df_yel_filtered['tpep_pickup_datetime']).dt.total_seconds()

In [None]:
midtown = {'longitude': -73.97, 'latitude': 40.75}
jfk = {'longitude': -73.78, 'latitude': 40.64}
newark = {'longitude': -74.18, 'latitude': 40.73}
 
#temp = df_yel_filtered_2020.filter((col("pickup_longitude") == midtown['longitude']) &
 #                     (col("pickup_latitude") == midtown['latitude']) &
  #                    (col("dropoff_longitude") == jfk['longitude']) &
   #                   (col("dropoff_latitude") == jfk['latitude'])).persist()

#median_trip_duration = temp.approxQuantile("trip_duration", [0.5], 0.01)[0]

## Geographic information

In [None]:
# Load GeoJSON file
data_geojson = gpd.read_file("/home/marius/Téléchargements/map.geojson")         # Load the GeoJSON file as a GeoDataFrame

print(data_geojson)

In [None]:
#Register DataFrame as a temporary view for Spark SQL operations
#df_yel_filtered.createOrReplaceTempView("trips")

# Build a heatmap of number of pickups, number of dropoffs, and number of pickups with dropoff at airports
heatmap_pickups = df_yel_filtered.groupBy("zones_pickup_location").count()
heatmap_dropoffs = df_yel_filtered.groupBy("zones_dropoff_location").count()
heatmap_airport_pickups = df_yel_filtered.filter(col("zones_dropoff_location").isin(['JFK', 
                                        'LaGuardia', 'Newark'])).groupBy("zones_pickup_location").count()



In [None]:
# Build a choropleth map of number of pickups in the area, ratio of card payments to cash payments, and ratio of total fare to trip duration
choropleth_pickups = df_yel_filtered.groupBy("zones_pickup_location").count()
choropleth_payment_ratio = df_yel_filtered.groupBy("zones_pickup_location").agg(
    (sum(when(col("payment_type") == 1, 1).otherwise(0)) /
     sum(when(col("payment_type") == 2, 1).otherwise(0))).alias("payment_ratio")
)
choropleth_fare_duration_ratio = df_yel_filtered.groupBy("zones_dropoff_location").agg(
    (sum("total_amount") / sum("trip_distance")).alias("fare_duration_ratio")
)

## For year 2021

In [None]:
# Read the data
df_yel_2021 = spark.read.parquet("/home/marius/Téléchargements/yellow_tripdata_2021-01.parquet")
df_fhv_2021 = spark.read.parquet("/home/marius/Téléchargements/fhv_tripdata_2021-01.parquet")

In [None]:
# Dropping na values 
df_yel_2021.na.drop()
df_fhv_2021.na.drop()

In [None]:
# for yellow taxis
full_data_yel_2021 = joinDataframes(df_yel_2021, zones_pickup, zones_dropoff)
full_data_yel_2021.show()

In [None]:
# for fhv taxis
full_data_fhv_2021 = joinDataframes(df_fhv_2021, zones_pickup, zones_dropoff)
full_data_fhv_2021.show()

In [None]:
# for yellow taxis
df_yel_filtered_2021 = filterData(full_data_yel_2021)
df_yel_filtered_2021.show()

In [None]:
# for fhv taxis
df_fhv_filtered_2021 = filterData(full_data_yel_2021)
df_fhv_filtered_2021.show()

In [None]:
# for yellow taxis
df_yel_filtered_2021.show()

In [None]:
# for fhv taxis
df_fhv_filtered_2021.show()

1. Filter and cache/persist the result

In [None]:
df_yel_filtered_2021.cache()
df_fhv_filtered_2021.cache()

In [None]:
# for yellow taxis
counts_pickup_yel_2021 = df_yel_filtered_2021.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                             hour('tpep_pickup_datetime').alias('pickup_hour')).count()
#display the result

In [None]:
# for fhv taxis
counts_pickup_fhv_2021 = df_fhv_filtered_2021.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                             hour('tpep_pickup_datetime').alias('pickup_hour')).count()
#display the result

2. The average fare

In [None]:
#for yellow taxis
avg_fare_yel_2021 = df_yel_filtered_2021.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                        hour('tpep_pickup_datetime').alias('pickup_hour')).avg('fare_amount')
#display the result
#print("The average fare is: " ,avg_fare)
avg_fare_yel_2021 = avg_fare_yel_2021.toPandas()
avg_fare_yel_2021 = avg_fare_yel_2021.sort_values(by=['pickup_hour'])
avg_fare_yel_2021

3. The average trip duration

In [None]:
# for yellow taxis
avg_trip_duration_yel_2021 = df_yel_filtered_2021.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'), 
                            hour('tpep_pickup_datetime').alias('pickup_hour')).avg('trip_distance')
#display the result

In [None]:
# for fhv taxis
avg_trip_duration_fhv_2021 = df_fhv_filtered_2021.groupby(dayofweek('pickup_datetime').alias('pickup_day'), 
                            hour('pickup_datetime').alias('pickup_hour'))
#display the result

4. Plot the average of ongoing trips

In [None]:
# convert into pandas dataframe
#df_yel_filtered_2021 = df_yel_filtered_2021.toPandas()

# Convert the 'tpep_dropoff_datetime' and 'tpep_pickup_datetime' columns to datetime format
#df_yel_filtered_2021['tpep_dropoff_datetime'] = pd.to_datetime(df_yel_filtered_2021['tpep_dropoff_datetime'])
#df_yel_filtered_2021['tpep_pickup_datetime'] = pd.to_datetime(df_yel_filtered_2021['tpep_pickup_datetime'])

# Calculate the duration of each trip by subtracting the pickup datetime 
# from the dropoff datetime to get the trip duration in seconds.

#df_yel_filtered_2021['trip_duration'] = (df_yel_filtered_2021['tpep_dropoff_datetime'] - 
                             #       df_yel_filtered_2021['tpep_pickup_datetime']).dt.total_seconds()

# Group the data by the desired time interval and calculate the average trip duration for each interval.

#average_trip_duration = df_yel_filtered_2021.groupby(df_yel_filtered_2021['tpep_pickup_datetime'].dt.hour)['trip_duration'].mean()

In [None]:
# for fhv taxis
# convert into pandas dataframe
#df_fhv_filtered_2021 = df_fhv_filtered_2021.toPandas()

# Convert the 'tpep_dropoff_datetime' and 'tpep_pickup_datetime' columns to datetime format
#df_fhv_filtered_2021['tpep_dropoff_datetime'] = pd.to_datetime(df_fhv_filtered_2021['tpep_dropoff_datetime'])
#df_fhv_filtered_2021['tpep_pickup_datetime'] = pd.to_datetime(df_fhv_filtered_2021['tpep_pickup_datetime'])

# Calculate the duration of each trip by subtracting the pickup datetime 
# from the dropoff datetime to get the trip duration in seconds.

#df_fhv_filtered_2021['trip_duration'] = (df_fhv_filtered_2021['tpep_dropoff_datetime'] - 
                                    #df_fhv_filtered_2021['tpep_pickup_datetime']).dt.total_seconds()

# Group the data by the desired time interval and calculate the average trip duration for each interval.

#average_trip_duration = df_fhv_filtered_2021.groupby(df_fhv_filtered_2021['tpep_dropoff_datetime'].dt.hour)['trip_duration'].mean()

## Rides to the airports

1. Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport

In [None]:
midtown = {'longitude': -73.97, 'latitude': 40.75}
jfk = {'longitude': -73.78, 'latitude': 40.64}
newark = {'longitude': -74.18, 'latitude': 40.73}
 
#temp = df_yel_filtered_2021.filter((col("pickup_longitude") == midtown['longitude']) &
 #                     (col("pickup_latitude") == midtown['latitude']) &
  #                    (col("dropoff_longitude") == jfk['longitude']) &
   #                   (col("dropoff_latitude") == jfk['latitude'])).persist()

#median_trip_duration = temp.approxQuantile("trip_duration", [0.5], 0.01)[0]

## Geographic information

In [None]:
#Register DataFrame as a temporary view for Spark SQL operations
#df_yel_filtered.createOrReplaceTempView("trips")

# Build a heatmap of number of pickups, number of dropoffs, and number of pickups with dropoff at airports
heatmap_pickups = df_yel_filtered.groupBy("zones_pickup_location").count()
heatmap_dropoffs = df_yel_filtered.groupBy("zones_dropoff_location").count()
heatmap_airport_pickups = df_yel_filtered.filter(col("zones_dropoff_location").isin(['JFK', 
                                        'LaGuardia', 'Newark'])).groupBy("zones_pickup_location").count()

In [None]:
# Build a choropleth map of number of pickups in the area, ratio of card payments to cash payments, and ratio of total fare to trip duration
choropleth_pickups = df_yel_filtered.groupBy("zones_pickup_location").count()
choropleth_payment_ratio = df_yel_filtered.groupBy("zones_pickup_location").agg(
    (sum(when(col("payment_type") == 1, 1).otherwise(0)) /
     sum(when(col("payment_type") == 2, 1).otherwise(0))).alias("payment_ratio")
)
choropleth_fare_duration_ratio = df_yel_filtered.groupBy("zones_dropoff_location").agg(
    (sum("total_amount") / sum("trip_distance")).alias("fare_duration_ratio")
)

## For year 2022

In [None]:
df_yel_2022 = spark.read.parquet("/home/marius/Téléchargements/yellow_tripdata_2022-08.parquet")
df_fhv_2022 = spark.read.parquet("/home/marius/Téléchargements/fhv_tripdata_2022-08.parquet")

In [None]:
# Dropping na values 
df_yel_2022.na.drop()
df_fhv_2022.na.drop()

In [None]:
# for yellow taxis
full_data_yel_2022 = joinDataframes(df_yel_2022, zones_pickup, zones_dropoff)
full_data_yel_2022.show()

In [None]:
# for fhv taxis
full_data_fhv_2022 = joinDataframes(df_fhv_2022, zones_pickup, zones_dropoff)
full_data_fhv_2022.show()

In [None]:
# for yellow taxis
df_yel_filtered_2022 = filterData(full_data_yel_2022)
df_yel_filtered_2022.show()

In [None]:
# for fhv taxis
df_fhv_filtered_2022 = filterData(full_data_yel_2022)
df_fhv_filtered_2022.show()

In [None]:
# for yellow taxis
df_yel_filtered_2022.show()

In [None]:
# for fhv taxis
df_fhv_filtered_2022.show()

1. Filter and cache/persist the result

In [None]:
df_yel_filtered_2022.cache()
df_fhv_filtered_2022.cache()

In [None]:
# for yellow taxis
counts_pickup_yel_2022 = df_yel_filtered_2022.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                             hour('tpep_pickup_datetime').alias('pickup_hour')).count()
#display the result

In [None]:
# for fhv taxis
counts_pickup_fhv_2022 = df_fhv_filtered_2022.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                             hour('tpep_pickup_datetime').alias('pickup_hour')).count()
#display the result

2. The average fare

In [None]:
#for yellow taxis
avg_fare_yel_2022 = df_yel_filtered_2022.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'),
                        hour('tpep_pickup_datetime').alias('pickup_hour')).avg('fare_amount')
#display the result
#print("The average fare is: " ,avg_fare)
avg_fare_yel_2022 = avg_fare_yel_2022.toPandas()
avg_fare_yel_2022 = avg_fare_yel_2022.sort_values(by=['pickup_hour'])
avg_fare_yel_2022

3. The average trip duration

In [None]:
# for yellow taxis
avg_trip_duration_yel_2022 = df_yel_filtered_2022.groupby(dayofweek('tpep_pickup_datetime').alias('pickup_day'), 
                            hour('tpep_pickup_datetime').alias('pickup_hour')).avg('trip_distance')
#display the result

In [None]:
# for fhv taxis
avg_trip_duration_fhv_2022 = df_fhv_filtered_2022.groupby(dayofweek('pickup_datetime').alias('pickup_day'), 
                            hour('pickup_datetime').alias('pickup_hour'))
#display the result

4. Plot the average of ongoing trips

In [None]:
# convert into pandas dataframe
#df_yel_filtered_2022 = df_yel_filtered_2022.toPandas()

# Convert the 'tpep_dropoff_datetime' and 'tpep_pickup_datetime' columns to datetime format
#df_yel_filtered_2022['tpep_dropoff_datetime'] = pd.to_datetime(df_yel_filtered_2022['tpep_dropoff_datetime'])
#df_yel_filtered_2022['tpep_pickup_datetime'] = pd.to_datetime(df_yel_filtered_2022['tpep_pickup_datetime'])

# Calculate the duration of each trip by subtracting the pickup datetime 
# from the dropoff datetime to get the trip duration in seconds.

#df_yel_filtered_2022['trip_duration'] = (df_yel_filtered_2022['tpep_dropoff_datetime'] - 
 #                                   df_yel_filtered_2022['tpep_pickup_datetime']).dt.total_seconds()

# Group the data by the desired time interval and calculate the average trip duration for each interval.

#average_trip_duration = df_yel_filtered_2022.groupby(df_yel_filtered_2022['tpep_pickup_datetime'].dt.hour)['trip_duration'].mean()

In [None]:
# for fhv taxis
# convert into pandas dataframe
#df_fhv_filtered_2022 = df_fhv_filtered_2022.toPandas()

# Convert the 'tpep_dropoff_datetime' and 'tpep_pickup_datetime' columns to datetime format
#df_fhv_filtered_2022['tpep_dropoff_datetime'] = pd.to_datetime(df_fhv_filtered_2022['tpep_dropoff_datetime'])
#df_fhv_filtered_2022['tpep_pickup_datetime'] = pd.to_datetime(df_fhv_filtered_2022['tpep_pickup_datetime'])

# Calculate the duration of each trip by subtracting the pickup datetime 
# from the dropoff datetime to get the trip duration in seconds.

#df_fhv_filtered_2022['trip_duration'] = (df_fhv_filtered_2022['tpep_dropoff_datetime'] - 
 #                                   df_fhv_filtered_2022['tpep_pickup_datetime']).dt.total_seconds()

# Group the data by the desired time interval and calculate the average trip duration for each interval.

#average_trip_duration = df_fhv_filtered_2022.groupby(df_fhv_filtered_2022['tpep_dropoff_datetime'].dt.hour)['trip_duration'].mean()

## Rides to the airports

1. Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport

In [None]:
midtown = {'longitude': -73.97, 'latitude': 40.75}
jfk = {'longitude': -73.78, 'latitude': 40.64}
newark = {'longitude': -74.18, 'latitude': 40.73}
 
#temp = df_yel_filtered_2021.filter((col("pickup_longitude") == midtown['longitude']) &
 #                     (col("pickup_latitude") == midtown['latitude']) &
  #                    (col("dropoff_longitude") == jfk['longitude']) &
   #                   (col("dropoff_latitude") == jfk['latitude'])).persist()

#median_trip_duration = temp.approxQuantile("trip_duration", [0.5], 0.01)[0]

## Geographic information

In [None]:
#Register DataFrame as a temporary view for Spark SQL operations
#df_yel_filtered.createOrReplaceTempView("trips")

# Build a heatmap of number of pickups, number of dropoffs, and number of pickups with dropoff at airports
heatmap_pickups = df_yel_filtered.groupBy("zones_pickup_location").count()
heatmap_dropoffs = df_yel_filtered.groupBy("zones_dropoff_location").count()
heatmap_airport_pickups = df_yel_filtered.filter(col("zones_dropoff_location").isin(['JFK', 
                                        'LaGuardia', 'Newark'])).groupBy("zones_pickup_location").count()

In [None]:
# Build a choropleth map of number of pickups in the area, ratio of card payments to cash payments, and ratio of total fare to trip duration
choropleth_pickups = df_yel_filtered.groupBy("zones_pickup_location").count()
choropleth_payment_ratio = df_yel_filtered.groupBy("zones_pickup_location").agg(
    (sum(when(col("payment_type") == 1, 1).otherwise(0)) /
     sum(when(col("payment_type") == 2, 1).otherwise(0))).alias("payment_ratio")
)
choropleth_fare_duration_ratio = df_yel_filtered.groupBy("zones_dropoff_location").agg(
    (sum("total_amount") / sum("trip_distance")).alias("fare_duration_ratio")
)

## Covid impact

In [None]:
'''
fig, ax = plt.subplots(figsize=(8, 4))  # Create a figure and axes
table = plt.table(cellText=df_yel_filtered['tpep_pickup_time'], colLabels=df_yel_filtered.columns, loc='center')  # Plot the table
table.auto_set_font_size(False)  # Set font size manually
table.set_fontsize(10)  # Set font size for the table
ax.axis('off')  # Hide axes
plt.show()  # Display the table

'''

Based on our study, we have seen that the covid pandemic has had a major impact on trips of taxis in New York , especially in the years 2020 (August) and 2021 (January).
On the other hand, in 2019 (March) and 2022 (August), traffic remained normal.