In [24]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

25/03/05 18:56:37 WARN Utils: Your hostname, MacBook-Air-2.local resolves to a loopback address: 127.0.0.1; using 10.23.121.1 instead (on interface en0)
25/03/05 18:56:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/05 18:56:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/05 18:56:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Question 1: Install Spark and PySpark

In [3]:
spark.version

'3.5.5'

## Question 2: Yellow October 2024

In [4]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-05 18:56:40--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.226.36.218, 13.226.36.73, 13.226.36.130, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.226.36.218|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet.1’


2025-03-05 18:56:43 (36.3 MB/s) - ‘yellow_tripdata_2024-10.parquet.1’ saved [64346071/64346071]



In [5]:
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

                                                                                

In [8]:
output_path = f'partitions/'
df \
    .repartition(4) \
    .write.parquet(output_path, mode='overwrite')

                                                                                

In [8]:
parquet_sizes = []
for filename in os.listdir(output_path):
    if filename.endswith(".parquet"):
        file_path = os.path.join(output_path, filename)
        parquet_sizes.append(os.path.getsize(file_path))

average_size_bytes = sum(parquet_sizes) / len(parquet_sizes)
average_size_mb = average_size_bytes / (1024 * 1024)

In [9]:
average_size_mb

22.3963520526886

## Question 3: Count records

In [9]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [10]:
df = df \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime))

In [11]:
df.head(10)

[Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 0, 30, 44), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 0, 48, 26), passenger_count=1, trip_distance=3.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=246, payment_type=1, fare_amount=18.4, extra=1.0, mta_tax=0.5, tip_amount=1.5, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=24.9, congestion_surcharge=2.5, Airport_fee=0.0, pickup_date=datetime.date(2024, 10, 1)),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 0, 12, 20), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 0, 25, 25), passenger_count=1, trip_distance=2.2, RatecodeID=1, store_and_fwd_flag='N', PULocationID=48, DOLocationID=236, payment_type=1, fare_amount=14.2, extra=3.5, mta_tax=0.5, tip_amount=3.8, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=23.0, congestion_surcharge=2.5, Airport_fee=0.0, pickup_date=datetime.date(2024, 10, 1)),
 Row(VendorID=1, tpep_pickup_datetime=dat

In [12]:
df \
    .filter(df.pickup_date == '2024-10-15') \
    .count()

                                                                                

128893

## Question 4: Longest trip

In [13]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- pickup_date: date (nullable = true)



In [19]:
df \
    .withColumn('tripDurationSec', F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) \
    .withColumn('tripDurationHrs', F.col('tripDurationSec')/3600) \
    .orderBy(F.desc('tripDurationHrs')) \
    .show(5)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+---------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|tripDurationSec|   tripDurationHrs|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+---------------+------------------+
|       2| 2024-10-16 13:03:49|  2024-10-23 07

## Question 5: User Interface

- 4040

## Question 6: Least frequent pickup location zone

In [21]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-05 19:14:27--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.226.36.196, 13.226.36.218, 13.226.36.73, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.226.36.196|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-05 19:14:29 (905 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [25]:
df_zone = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [26]:
df_zone.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [29]:
df \
    .join(df_zone, df.PULocationID == df_zone.LocationID, 'left') \
    .groupBy('Zone') \
    .agg(F.count("tpep_pickup_datetime").alias("count")) \
    .orderBy('count') \
    .show(5)

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|Governor's Island...|    1|
|       Rikers Island|    2|
|       Arden Heights|    2|
|         Jamaica Bay|    3|
| Green-Wood Cemetery|    3|
+--------------------+-----+
only showing top 5 rows

