In [12]:
# Import libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
# Create the spark cluster
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
# Download datasets
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-02 15:15:38--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.167.84.228, 3.167.84.127, 3.167.84.131, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.167.84.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-02 15:15:38 (332 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]

--2025-03-02 15:15:38--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.167.84.86, 3.167.84.131, 3.167.84.127, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.167.84.86|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


202

In [6]:
!ls

download_data.sh      Untitled.ipynb
taxi_zone_lookup.csv  yellow_tripdata_2024-10.parquet


In [8]:
# Load the data files
df_yellow = spark.read.option("header", "true").parquet("yellow_tripdata_2024-10.parquet")
df_zone = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")

In [9]:
# Check the df schema
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [10]:
df_zone.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [31]:
# The schema looks alright for yellow taxi but LocationID in df_zone should be an integer instead of a string. We will pass the schema manually
# The True is for nullable i.e. A null value is acceptable for this column
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.LongType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.LongType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.LongType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])
zone_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)
])

In [32]:
# Reloading data with the specified datatypes
df_yellow = spark.read.option("header", "true").schema(yellow_schema).parquet("yellow_tripdata_2024-10.parquet")
df_zone = spark.read.option("header", "true").schema(zone_schema).csv("taxi_zone_lookup.csv")

In [33]:
# Check the df schemas
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [34]:
df_zone.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [36]:
# Repartitioning yellow_taxi data and writing 
df_yellow.repartition(4).write.parquet(path="yellow_taxi", mode="overwrite")

In [38]:
!ls -lh yellow_taxi

total 97M
-rw-r--r-- 1 jovyan users 25M Mar  2 15:36 part-00000-189129b0-8b51-4efc-b6c8-c85d6dbcf9df-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 25M Mar  2 15:36 part-00001-189129b0-8b51-4efc-b6c8-c85d6dbcf9df-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 25M Mar  2 15:36 part-00002-189129b0-8b51-4efc-b6c8-c85d6dbcf9df-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 25M Mar  2 15:36 part-00003-189129b0-8b51-4efc-b6c8-c85d6dbcf9df-c000.snappy.parquet
-rw-r--r-- 1 jovyan users   0 Mar  2 15:36 _SUCCESS


### Question 1: Install Spark and PySpark

In [40]:
pyspark.__version__

'3.5.0'

### Question 2: Yellow October 2024

In [41]:
df_yellow.repartition(4).write.parquet(path="yellow_taxi", mode="overwrite")
!ls -lh yellow_taxi

total 97M
-rw-r--r-- 1 jovyan users 25M Mar  2 15:40 part-00000-f1fcc67d-d6fd-4321-909a-0cfaab778cf0-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 25M Mar  2 15:40 part-00001-f1fcc67d-d6fd-4321-909a-0cfaab778cf0-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 25M Mar  2 15:40 part-00002-f1fcc67d-d6fd-4321-909a-0cfaab778cf0-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 25M Mar  2 15:40 part-00003-f1fcc67d-d6fd-4321-909a-0cfaab778cf0-c000.snappy.parquet
-rw-r--r-- 1 jovyan users   0 Mar  2 15:40 _SUCCESS


### Question 3: Count records

In [45]:
# Creates or replaces a local temporary view with this DataFrame.
df_yellow.createOrReplaceTempView('trips_data')
df_zone.createOrReplaceTempView('trips_zone')

In [46]:
# The dateformat is: **YYY-MM--DD**
query = """SELECT COUNT(*) AS total_trips FROM trips_data WHERE DATE(tpep_pickup_datetime)='2024-10-15';"""
df_result = spark.sql(query)
df_result.show()

+-----------+
|total_trips|
+-----------+
|     128893|
+-----------+



### Question 4: Longest trip

In [47]:
query = """SELECT (UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / 3600 AS diff FROM trips_data ORDER BY diff DESC LIMIT 5;"""
df_result = spark.sql(query)
df_result.show(truncate=False)

+------------------+
|diff              |
+------------------+
|162.61777777777777|
|143.325           |
|137.76055555555556|
|114.83472222222223|
|89.89833333333333 |
+------------------+



### Question 5: User Interface

Open `localhost:4040` or any of the forwarded port on the host system.

### Question 6: Least frequent pickup location zone

In [49]:
query = """
WITH lest_freq_pu_id AS
(
    SELECT PULocationID as pu_id, COUNT(*) AS total_count
    FROM trips_data
    GROUP BY pu_id
    ORDER BY total_count ASC
    LIMIT 1
)
SELECT Zone, LocationID FROM trips_zone WHERE LocationID = (SELECT pu_id FROM lest_freq_pu_id);
"""
df_result = spark.sql(query)
df_result.show(truncate=False)

+---------------------------------------------+----------+
|Zone                                         |LocationID|
+---------------------------------------------+----------+
|Governor's Island/Ellis Island/Liberty Island|105       |
+---------------------------------------------+----------+

