In [1]:
import pyspark
from pyspark.sql import SparkSession, types

In [2]:
# WGET download data
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-02-24 21:01:00--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.72, 54.230.209.126, 54.230.209.140, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.72|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-02-24 21:01:01 (67.9 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [3]:
# start spark sessions
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/02/24 21:01:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.version

'3.3.2'

In [5]:
# Define Schema
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.LongType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.LongType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.LongType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [6]:
# Read the data
df = spark.read \
        .schema(yellow_schema) \
        .parquet('yellow_tripdata_2024-10.parquet')

df_deduplicated = df.dropDuplicates(["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "DOLocationID"])
print(df_deduplicated.count(), df.count())
df.show()

                                                                                

3772648 3833771
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.5|         0.0|     

In [7]:
# Repartition the parquet file
output_path = './output_data/'

df.repartition(4) \
    .write.parquet(output_path, mode='overwrite')

                                                                                

In [8]:
df_deduplicated.createOrReplaceTempView("trips_data")

In [9]:
# How many taxi trips were there on the 15th of October?
spark.sql("""
SELECT 
    count(1)
FROM
    trips_data
WHERE
    DATE(tpep_pickup_datetime) = '2024-10-15'
""").show()



+--------+
|count(1)|
+--------+
|  126112|
+--------+



                                                                                

In [10]:
# What is the length of the longest trip in the dataset in hours?
spark.sql("""
SELECT 
    TIMESTAMPDIFF(HOUR, tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_duration
FROM
    trips_data
ORDER by trip_duration DESC
LIMIT 3
""").show()

[Stage 21:>                                                         (0 + 4) / 4]

+-------------+
|trip_duration|
+-------------+
|          162|
|          143|
|          137|
+-------------+



                                                                                

In [11]:
# Download zone data
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-02-24 21:02:17--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.140, 54.230.209.72, 54.230.209.126, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.140|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-02-24 21:02:17 (20.4 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [12]:
df_zone = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)
df_zone.createOrReplaceTempView("taxi_zone")

In [13]:
df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [14]:
# What is the name of the LEAST frequent pickup location Zone?
spark.sql("""
SELECT
    tz.Zone as zone,
    count(1) as trip_cnt
FROM trips_data
LEFT JOIN taxi_zone tz
ON trips_data.PULocationID = tz.LocationID
GROUP BY zone
ORDER BY trip_cnt ASC
LIMIT 3
""").show()



+--------------------+--------+
|                zone|trip_cnt|
+--------------------+--------+
|Governor's Island...|       1|
|       Rikers Island|       2|
|       Arden Heights|       2|
+--------------------+--------+



                                                                                

In [15]:
# Clean-up
!rm yellow_tripdata_2024-10.parquet
!rm taxi_zone_lookup.csv