In [6]:
import pyspark
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Q1

In [8]:
spark.version

'3.3.2'

## Q2

In [28]:
!ls -lh data/yellow_tripdata_2024-10.parquet

-rw-r--r-- 1 lan lan 62M Dec 19 00:21 data/yellow_tripdata_2024-10.parquet


In [9]:
df = spark.read.parquet('data/yellow_tripdata_2024-10.parquet')

In [26]:
df \
.repartition(4) \
.write.parquet('data/q2', mode='overwrite')

                                                                                

In [29]:
!ls -lh data/q2

total 97M
-rw-r--r-- 1 lan lan   0 Mar  1 17:26 _SUCCESS
-rw-r--r-- 1 lan lan 25M Mar  1 17:26 part-00000-30c6876f-0a3c-4670-9f1b-069b7c55b717-c000.snappy.parquet
-rw-r--r-- 1 lan lan 25M Mar  1 17:26 part-00001-30c6876f-0a3c-4670-9f1b-069b7c55b717-c000.snappy.parquet
-rw-r--r-- 1 lan lan 25M Mar  1 17:26 part-00002-30c6876f-0a3c-4670-9f1b-069b7c55b717-c000.snappy.parquet
-rw-r--r-- 1 lan lan 25M Mar  1 17:26 part-00003-30c6876f-0a3c-4670-9f1b-069b7c55b717-c000.snappy.parquet


## Q3

In [40]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [41]:
df.registerTempTable('trips_data')

In [42]:
spark.sql("""
SELECT
    count(1)
FROM
    trips_data
WHERE
    DATE(tpep_pickup_datetime) = '2024-10-15'
""").show()

+--------+
|count(1)|
+--------+
|  122561|
+--------+



## Q4

In [47]:
spark.sql("""
SELECT
    max((bigint(tpep_dropoff_datetime) - bigint(tpep_pickup_datetime )) / 3600 ) AS hour_difference
FROM
    trips_data;
""").show()



+------------------+
|   hour_difference|
+------------------+
|162.61777777777777|
+------------------+



                                                                                

## Q4

In [49]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-01 18:06:04--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.114.180, 52.85.114.39, 52.85.114.54, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.114.180|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-01 18:06:04 (324 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [51]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [52]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [62]:
df_aggr = spark.sql("""
SELECT
    PULocationID
  , count(*) as trip_qty
FROM
    trips_data
GROUP BY PULocationID
ORDER BY trip_qty ASC LIMIT 1;
""")

In [63]:
df_result = df_aggr.join(df_zones, df_aggr.PULocationID == df_zones.LocationID)

In [64]:
df_result.show()



+------------+--------+----------+---------+--------------------+------------+
|PULocationID|trip_qty|LocationID|  Borough|                Zone|service_zone|
+------------+--------+----------+---------+--------------------+------------+
|         105|       1|       105|Manhattan|Governor's Island...| Yellow Zone|
+------------+--------+----------+---------+--------------------+------------+



                                                                                