In [None]:
"""
=========================================================================================
Topic: Analyzing New York City Taxi Data
Author: Benalouache Sassi
Date: 23/10/2023
=========================================================================================
"""

In [22]:
# ==========================
# Dependencies
# ==========================
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, rank, expr, unix_timestamp, udf, StringType, rand, broadcast
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.window import Window
from shapely.geometry import Point, shape
from pyspark.sql import functions as F
import json

In [2]:
###############################################################################################################################################################################################

In [23]:
# ==========================
# 1. Initialization and Data Loading
# ==========================

# Initialize a Spark session
spark = SparkSession.builder.appName('TaxiDataAnalysis').getOrCreate()

# Load the taxi ride data
taxi_data = spark.read.option("header", "true").option("inferSchema", "true").csv('./Taxi_Data/Sample NYC Data.csv')

In [4]:
###############################################################################################################################################################################################

In [24]:
# ==========================
# 2. Data Exploration
# ==========================

# Display the schema and the first few rows to understand the data
taxi_data.printSchema()
taxi_data.show()

# Display the number of rows
row_count = taxi_data.count()
print("Number of Rows:", row_count)
old_row_count = row_count

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)

+--------------------+--------------------+---------+---------+------------------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+-------------------

In [6]:
###############################################################################################################################################################################################

In [25]:
# ==========================
# 3. Data Cleaning and Preprocessing
# ==========================

# Group by 'medallion' and 'hack_license' to count the number of rides for each taxi
taxi_counts = taxi_data.groupBy("medallion", "hack_license").agg(count("*").alias("ride_count"))

# Extract unique taxis and their ride counts
taxis = taxi_counts.select("medallion", "hack_license", "ride_count").distinct()

# Rank taxis by ride_count within each 'hack_license' group
window_spec = Window.partitionBy("hack_license").orderBy(col("ride_count").desc())
ranked_taxis = taxis.withColumn("rank", rank().over(window_spec))

# Retain only the top-ranked taxi for each 'hack_license' (the one with the highest ride_count)
filtered_taxis = ranked_taxis.filter("rank == 1").drop("rank")

# Display the valid taxis and their count
print("The list of the valid taxis:", filtered_taxis.count())
filtered_taxis.show()

# Extract taxis that have duplicate 'hack_license' values
dupe_taxis = ranked_taxis.filter("rank > 1").drop("rank")
print("The list of false taxis:", dupe_taxis.count())
dupe_taxis.show()

# Compute the total number of rides associated with false taxis
total_rides = dupe_taxis.agg(spark_sum("ride_count").alias("total rides with false Taxi"))

# Identify unique combinations of 'medallion' and 'hack_license' among the false taxis
unique_taxi_combinations = dupe_taxis.select("medallion", "hack_license").distinct()

# Use broadcasting for optimizing the join operation (assuming 'unique_taxi_combinations' is small)
taxi_data = taxi_data.join(broadcast(unique_taxi_combinations), on=['medallion', 'hack_license'], how='left_anti')

# Remove rows with missing values in essential columns
initial_count = taxi_data.count()
taxi_data = taxi_data.dropna(subset=["medallion", "hack_license", "pickup_datetime", "dropoff_datetime", "passenger_count", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
print("Number of null rows:", initial_count - taxi_data.count())

# Filter out rows with invalid passenger counts
initial_count = taxi_data.count()
T = taxi_data.filter((col("passenger_count") > 0))
print("Rows with invalid passenger count:", initial_count - T.count())

# Drop unnecessary columns
T = T.drop("medallion", "passenger_count", "vendor_id", "rate_code", "store_and_fwd_flag")

# Display the final row count and the total number of rows cleaned
print("Number of Rows:", T.count())
print("Total rows cleaned:", old_row_count - T.count())
old_row_count_2 = T.count()

The list of the valid taxis: 10013
+--------------------+--------------------+----------+
|           medallion|        hack_license|ride_count|
+--------------------+--------------------+----------+
|B91CBA168CDFF2963...|02856AFC22881ABCA...|         3|
|D27A0F1D7A560D03E...|03A2D28F831C5C3E5...|         8|
|A47A97A6E57264A1C...|069B5562096AF7684...|         1|
|204BAB16D3382C5A5...|0FBF11956EE14B253...|        12|
|3CACE6A20EB462544...|130328475AD7427AF...|         1|
|CD89A39EA4C16B62D...|138B0A7B7D3B898E4...|         1|
|FB8C95542F2EA53BD...|13CD9D132F9DFE9BD...|         3|
|0D5744C51DAABD952...|28A7C858D9231A3EC...|         2|
|9DDC978C91239BCDB...|2E18539FA05E802C2...|         5|
|299D366EE8BC7D8C3...|31195E1D3AA1EC26D...|        11|
|7EE820827C3806BB6...|3183016714F5E253E...|        11|
|97A12ACC40B7F500B...|428AE5AF18511D16B...|        29|
|0797C43D0AB24B4F3...|42D2B75CA34A867A4...|         4|
|99DC8B92B9DD3926C...|44D39A75B5BADD81E...|         5|
|2D582E009BC6D0DC1...|49EE1E8E

                                                                                

Number of Rows: 99773
Total rows cleaned: 226


In [26]:
# Unpersist the data to free up memory
taxis.unpersist()
ranked_taxis.unpersist()
filtered_taxis.unpersist()
dupe_taxis.unpersist()
unique_taxi_combinations.unpersist()

T.show()

+--------------------+---------------+----------------+----------------+---------------+-----------------+----------------+
|        hack_license|pickup_datetime|dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+---------------+----------------+----------------+---------------+-----------------+----------------+
|BA96DE419E711691B...| 01-01-13 15:11|  01-01-13 15:18|      -73.978165|      40.757977|       -73.989838|       40.751171|
|9FD8F69F0804BDB55...|  06-01-13 0:18|   06-01-13 0:22|      -74.006683|      40.731781|       -73.994499|        40.75066|
|9FD8F69F0804BDB55...| 05-01-13 18:49|  05-01-13 18:54|      -74.004707|       40.73777|       -74.009834|       40.726002|
|51EE87E3205C985EF...| 07-01-13 23:54|  07-01-13 23:58|      -73.974602|      40.759945|       -73.984734|       40.759388|
|51EE87E3205C985EF...| 07-01-13 23:25|  07-01-13 23:34|       -73.97625|      40.748528|       -74.002586|       40.747868|
|598CCE5

In [9]:
###############################################################################################################################################################################################

In [27]:
# ==========================
# 4. Enrichment and Feature Engineering
# ==========================

# Time enrichment
# UDF to compute timestamps
def compute_timestamps(df):
    df = df.withColumn("pickup_timestamp", unix_timestamp("pickup_datetime", "dd-MM-yy H:mm"))
    df = df.withColumn("dropoff_timestamp", unix_timestamp("dropoff_datetime", "dd-MM-yy H:mm"))
    return df

# UDF to filter duration
def filter_duration(df):
    df = df.withColumn("Duration", (col("dropoff_timestamp") - col("pickup_timestamp")) / 60)  # Duration in minutes
    df = df.filter((col("Duration") > 0) & (col("Duration") < 240.0))
    return df


# Borough enrichment
# UDF to read GeoJSON data
def read_geojson_data(file_path):
    with open(file_path, "r") as file:
        return file.read()
    
# UDF to get borough based on longitude and latitude
def get_borough(longitude, latitude, sorted_polygons):
    point = Point(longitude, latitude)
    for feature in sorted_polygons:
        polygon = shape(feature['geometry'])
        if polygon.contains(point):
            return feature['properties']['borough']
    return None

# UDF to enrich data with borough information
def enrich_with_borough(df, sorted_polygons):
    borough_udf = udf(lambda x, y: get_borough(x, y, sorted_polygons), StringType())
    df = df.withColumn("pickup_borough", borough_udf(df["pickup_longitude"], df["pickup_latitude"])) \
           .withColumn("dropoff_borough", borough_udf(df["dropoff_longitude"], df["dropoff_latitude"]))
    return df

# UDF to clean data
def clean_data(df):
    df = df.dropna(subset=["Duration", "pickup_borough", "dropoff_borough", "pickup_timestamp", "dropoff_timestamp"])
    df = df.drop("pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "pickup_datetime", "dropoff_datetime")
    return df



# Load and parse GeoJSON data
geojson_data = read_geojson_data("./Taxi_Data/nyc-boroughs.geojson")
geojson = json.loads(geojson_data)
sorted_polygons = sorted(geojson['features'], key=lambda feature: shape(feature['geometry']).area, reverse=True)

# Apply transformations
T = compute_timestamps(T)
T = filter_duration(T)
T = enrich_with_borough(T, sorted_polygons)
T2 = clean_data(T)

T.unpersist()
T2.show()

[Stage 110:>                                                        (0 + 1) / 1]

+--------------------+----------------+-----------------+--------+--------------+---------------+
|        hack_license|pickup_timestamp|dropoff_timestamp|Duration|pickup_borough|dropoff_borough|
+--------------------+----------------+-----------------+--------+--------------+---------------+
|BA96DE419E711691B...|      1357053060|       1357053480|     7.0|     Manhattan|      Manhattan|
|9FD8F69F0804BDB55...|      1357431480|       1357431720|     4.0|     Manhattan|      Manhattan|
|9FD8F69F0804BDB55...|      1357411740|       1357412040|     5.0|     Manhattan|      Manhattan|
|51EE87E3205C985EF...|      1357602840|       1357603080|     4.0|     Manhattan|      Manhattan|
|51EE87E3205C985EF...|      1357601100|       1357601640|     9.0|     Manhattan|      Manhattan|
|598CCE5B9C1918568...|      1357572420|       1357573080|    11.0|     Manhattan|      Manhattan|
|513189AD756FF14FE...|      1357642860|       1357643280|     7.0|     Manhattan|      Manhattan|
|CCD4367B417ED6634..

Traceback (most recent call last):                                              
  File "/usr/local/lib/python3.9/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/lib/python3.9/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/lib/python3.9/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 642, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/lib/python3.9/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


In [1]:
!pwd

/opt/workspace/local


In [8]:
# Reduce the number of partitions to one
T2 = T2.coalesce(1)
# Save the DataFrame as a CSV file in one single file
T2.write.csv("./Taxi_Data/taxi_data_clean_enriched.csv", header=True, mode="overwrite")

                                                                                

In [None]:
########################################################################################################################################################################################

In [12]:
########################################################################################################################################################################################
# Verification

spark.stop()
spark = SparkSession.builder.appName('TaxiDataAnalysis').getOrCreate()

# Load the taxi ride data
taxi_data = spark.read.option("header", "true").option("inferSchema", "true").csv('./Taxi_Data/taxi_data_clean_enriched.csv')

# Display the schema and the first few rows to understand the data
taxi_data.printSchema()

print("Number of Rows:", taxi_data.count())
print("Total rows cleaned:", old_row_count_2 - taxi_data.count())

root
 |-- hack_license: string (nullable = true)
 |-- pickup_timestamp: integer (nullable = true)
 |-- dropoff_timestamp: integer (nullable = true)
 |-- Duration: double (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- dropoff_borough: string (nullable = true)

Number of Rows: 97167
Total rows cleaned: 2606


In [13]:
# boroughs verification
df = taxi_data

# Define a list of valid New York boroughs
valid_boroughs = ['Manhattan', 'Brooklyn', 'Queens', 'Bronx', 'Staten Island']

# Filter the DataFrame to include only rows where both pickup and dropoff boroughs are valid
filtered_df = df.filter((col('pickup_borough').isin(valid_boroughs)) & (col('dropoff_borough').isin(valid_boroughs)))

# Check if all rows passed the filter
all_rows_are_in_ny = df.count() == filtered_df.count()

# Display the result
if all_rows_are_in_ny:
    print("All pickup and dropoff boroughs are in New York.")
else:
    print("Some pickup or dropoff boroughs are not in New York.")

All pickup and dropoff boroughs are in New York.


In [None]:
#########################################################################################################################################################################################

In [None]:
#########################################################################################################################################################################################
# Query 1 :

In [16]:
# ==========================
# 1. Sorting and Time Difference Calculation
# ==========================

# Sort the DataFrame by driver and pickup_timestamp for accurate time difference calculation
sorted_df = taxiData.orderBy("hack_license", "pickup_timestamp")

# Using a window function to calculate the time difference in seconds between the current ride's pickup 
# and the previous ride's dropoff for the same driver
window_spec = Window.partitionBy("hack_license").orderBy("pickup_timestamp")
df_with_time_diff = sorted_df.withColumn("time_diff_seconds", 
                                         (F.col("pickup_timestamp") - F.lag("dropoff_timestamp").over(window_spec)))

# ==========================
# 2. Filtering Idle Times
# ==========================

# Filter out rows where the idle time (time_diff_seconds) is greater than 0 seconds and less than 4 hours (14400 seconds)
df_filtered_idle = df_with_time_diff.filter((F.col("time_diff_seconds") > 0) & (F.col("time_diff_seconds") < 14400))

# Convert the "Duration" column from minutes to seconds for uniformity
df_filtered_idle = df_filtered_idle.withColumn("Duration_seconds", F.col("Duration") * 60)

# ==========================
# 3. Aggregation and Utilization Calculation
# ==========================

# Group by driver (hack_license) and destination borough (dropoff_borough)
# Sum the ride duration (in seconds) and idle time (in seconds) for each group
result = df_filtered_idle.groupBy("hack_license", "dropoff_borough").agg(
    F.sum("Duration_seconds").alias("rides_duration"),
    F.sum("time_diff_seconds").alias("idle_time")
)

# Calculate the total work time for each driver in each borough as the sum of ride duration and idle time
result = result.withColumn("total_work_time", F.col("idle_time") + F.col("rides_duration"))

# Calculate the utilization, which represents the percentage of time a taxi is occupied with a passenger 
# compared to the total work time
result = result.withColumn("utilization", (F.col("rides_duration") / F.col("total_work_time")) * 100)

# Display the final result
result.show()

+--------------------+---------------+--------------+---------+---------------+------------------+
|        hack_license|dropoff_borough|rides_duration|idle_time|total_work_time|       utilization|
+--------------------+---------------+--------------+---------+---------------+------------------+
|02856AFC22881ABCA...|      Manhattan|        1320.0|     2640|         3960.0| 33.33333333333333|
|03A2D28F831C5C3E5...|      Manhattan|        3600.0|    12480|        16080.0|22.388059701492537|
|03A2D28F831C5C3E5...|         Queens|        3000.0|     6540|         9540.0|31.446540880503143|
|0FBF11956EE14B253...|      Manhattan|        4980.0|    20100|        25080.0| 19.85645933014354|
|0FBF11956EE14B253...|         Queens|        1320.0|     5460|         6780.0|19.469026548672566|
|13CD9D132F9DFE9BD...|      Manhattan|         300.0|      300|          600.0|              50.0|
|13CD9D132F9DFE9BD...|       Brooklyn|        1320.0|     2640|         3960.0| 33.33333333333333|
|28A7C858D

In [None]:
#########################################################################################################################################################################################
# Query 2 :

In [18]:
# ==========================
# 1. Time Difference Calculation
# ==========================

# Using a window function to calculate the time difference in seconds between the current ride's pickup 
# and the previous ride's dropoff for the same driver
window_spec = Window.partitionBy("hack_license").orderBy("pickup_timestamp")
time_diff = (F.col("pickup_timestamp").cast("long") - F.lag("dropoff_timestamp").over(window_spec)).alias("time_diff")

# Add the calculated time difference to the DataFrame. If the time difference is null (e.g., for the first ride of a driver), 
# set it to 0.
enriched_data = taxiData.withColumn("time_diff", F.when(F.isnull(time_diff), 0).otherwise(time_diff))

# ==========================
# 2. Filtering Data
# ==========================

# Filter out rides where the time difference between the current ride's pickup and the previous ride's dropoff 
# is more than 4 hours (14,400 seconds)
filtered_data = enriched_data.filter(F.col("time_diff") <= 14400)

# ==========================
# 3. Aggregation
# ==========================

# Group the data by the destination borough (dropoff_borough) and calculate the average time difference 
# (representing the average time taken by a taxi to find its next fare after dropping off a passenger in that borough)
result = filtered_data.groupBy("dropoff_borough").agg(F.avg("time_diff").alias("avg_time_to_find_fare"))

# Display the final result
result.show()

+---------------+---------------------+
|dropoff_borough|avg_time_to_find_fare|
+---------------+---------------------+
|         Queens|   1394.4181646168402|
|       Brooklyn|   1175.2196531791908|
|  Staten Island|                320.0|
|      Manhattan|    893.0988468064967|
|          Bronx|   1173.6315789473683|
+---------------+---------------------+



In [19]:
# ==========================
# Bonus : Pivot and Aggregation
# ==========================

# Pivot the data based on the 'dropoff_borough' column to get the average time difference for each borough.
# The pivot function will create a separate column for each borough.
pivoted_data = filtered_data.groupBy("hack_license").pivot("dropoff_borough", ["Queens", "Brooklyn", "Staten Island", "Manhattan", "Bronx"]).avg("time_diff")

# Fill any null values with 0 (indicating no rides to that borough for the driver)
pivoted_data = pivoted_data.fillna(0)

# Display the final result
pivoted_data.show()

# Print the number of rows in the final DataFrame
row_count = pivoted_data.count()
print("Number of Rows:", row_count)

+--------------------+------+--------+-------------+------------------+-----+
|        hack_license|Queens|Brooklyn|Staten Island|         Manhattan|Bronx|
+--------------------+------+--------+-------------+------------------+-----+
|02856AFC22881ABCA...|   0.0|     0.0|          0.0|             880.0|  0.0|
|03A2D28F831C5C3E5...|3270.0|     0.0|          0.0|            2080.0|  0.0|
|069B5562096AF7684...|   0.0|     0.0|          0.0|               0.0|  0.0|
|0FBF11956EE14B253...|5460.0|     0.0|          0.0|2233.3333333333335|  0.0|
|130328475AD7427AF...|   0.0|     0.0|          0.0|               0.0|  0.0|
|138B0A7B7D3B898E4...|   0.0|     0.0|          0.0|               0.0|  0.0|
|13CD9D132F9DFE9BD...|   0.0|  2640.0|          0.0|             150.0|  0.0|
|28A7C858D9231A3EC...|1920.0|     0.0|          0.0|               0.0|  0.0|
|2E18539FA05E802C2...| 540.0|     0.0|          0.0|             495.0|  0.0|
|31195E1D3AA1EC26D...| 960.0|  1050.0|          0.0| 1062.857142

In [None]:
#########################################################################################################################################################################################
# Query 3 :

In [20]:
# ==========================
# 1. Filtering Data
# ==========================

# Filter out trips where the pickup and dropoff boroughs are the same.
# This helps in identifying trips that started and ended within the same borough.
same_borough_trips = taxiData.filter(F.col("pickup_borough") == F.col("dropoff_borough"))

# ==========================
# 2. Counting Trips
# ==========================

# Count the number of trips that started and ended in the same borough.
# This gives an insight into the number of intra-borough trips.
num_same_borough_trips = same_borough_trips.count()

# Display the result
print("Number of trips that started and ended in the same borough:", num_same_borough_trips)

Number of trips that started and ended in the same borough: 85773


In [None]:
#########################################################################################################################################################################################
# Query 4 :

In [21]:
# ==========================
# 1. Filtering Data
# ==========================

# Filter out trips where the pickup and dropoff boroughs are different.
# This helps in identifying trips that started in one borough and ended in another.
inter_borough_trips = taxiData.filter(F.col("pickup_borough") != F.col("dropoff_borough"))

# ==========================
# 2. Counting Trips
# ==========================

# Count the number of trips that started in one borough and ended in another.
# This gives an insight into the number of inter-borough trips.
num_inter_borough_trips = inter_borough_trips.count()

# Display the result
print("Number of trips that started in one borough and ended in another:", num_inter_borough_trips)

# ==========================
# 3. Alternative Method
# ==========================

# An alternative method to calculate the number of inter-borough trips is to subtract the number of 
# intra-borough trips (same borough trips) from the total number of trips.
total_trips = taxiData.count()
num_inter_borough_trips_alt = total_trips - num_same_borough_trips

# Display the result from the alternative method
print("Number of trips using alternative method:", num_inter_borough_trips_alt)

Number of trips that started in one borough and ended in another: 11394
Number of trips using alternative method: 11394
