### Question 1: Install Spark and PySpark

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

spark = SparkSession.builder \
    .appName('homework') \
    .getOrCreate()

spark.version

25/03/06 04:41:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


'3.5.1'

### Question 2: Yellow October 2024

In [2]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-06 04:41:21--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.50, 65.8.245.171, 65.8.245.51, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.50|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet.1’


2025-03-06 04:41:22 (183 MB/s) - ‘yellow_tripdata_2024-10.parquet.1’ saved [64346071/64346071]



In [3]:
from pyspark.sql import types

In [13]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.LongType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.LongType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.LongType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [14]:
df = spark.read.schema(yellow_schema).parquet("file:///yellow_tripdata_2024-10.parquet")
df.show(5)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.5|         0.0|                  1.0

In [15]:
df = df.repartition(4)

In [16]:
output_path = 'gs://huiling-qiao-kestra-bucket/pq/yellow/2024/10/'
df.write.parquet(output_path)

                                                                                

### Question 3: Count records 

In [17]:
from pyspark.sql import functions as F

In [19]:
df \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \
    .filter("pickup_date = '2024-10-15'").count()

128893

In [20]:
df.registerTempTable('yellow_trip_2024_10')



In [21]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    yellow_trip_2024_10
WHERE
    to_date(tpep_pickup_datetime) = '2024-10-15';
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



### Question 4: Longest trip

In [26]:
spark.sql("""
SELECT
    MAX((CAST(tpep_dropoff_datetime AS LONG) - CAST(tpep_pickup_datetime AS LONG))) / 60 / 60
FROM 
    yellow_trip_2024_10
""").show()



+-------------------------------------------------------------------------------------------------+
|((max((CAST(tpep_dropoff_datetime AS BIGINT) - CAST(tpep_pickup_datetime AS BIGINT))) / 60) / 60)|
+-------------------------------------------------------------------------------------------------+
|                                                                                162.6177777777778|
+-------------------------------------------------------------------------------------------------+



                                                                                

### Question 6: Least frequent pickup location zone

In [27]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-06 05:04:32--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.50, 65.8.245.51, 65.8.245.178, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.50|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-06 05:04:32 (11.7 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [29]:
!head taxi_zone_lookup.csv

"LocationID","Borough","Zone","service_zone"
1,"EWR","Newark Airport","EWR"
2,"Queens","Jamaica Bay","Boro Zone"
3,"Bronx","Allerton/Pelham Gardens","Boro Zone"
4,"Manhattan","Alphabet City","Yellow Zone"
5,"Staten Island","Arden Heights","Boro Zone"
6,"Staten Island","Arrochar/Fort Wadsworth","Boro Zone"
7,"Queens","Astoria","Boro Zone"
8,"Queens","Astoria Park","Boro Zone"
9,"Queens","Auburndale","Boro Zone"


In [31]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('file:///taxi_zone_lookup.csv')

In [32]:
df_join = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [33]:
df_join.registerTempTable('yellow_trip_zones_2024_10')



In [36]:
spark.sql("""
SELECT
    PULocationID, Zone, COUNT(1) AS frequency
FROM 
    yellow_trip_zones_2024_10
GROUP BY 1, 2
ORDER BY 3;
""").show()

                                                                                

+------------+--------------------+---------+
|PULocationID|                Zone|frequency|
+------------+--------------------+---------+
|         105|Governor's Island...|        1|
|         199|       Rikers Island|        2|
|           5|       Arden Heights|        2|
|         111| Green-Wood Cemetery|        3|
|           2|         Jamaica Bay|        3|
|          44|Charleston/Totten...|        4|
|         204|   Rossville/Woodrow|        4|
|         245|       West Brighton|        4|
|         187|       Port Richmond|        4|
|          84|Eltingville/Annad...|        4|
|          59|        Crotona Park|        6|
|         109|         Great Kills|        6|
|         118|Heartland Village...|        7|
|         156|     Mariners Harbor|        7|
|         176|             Oakwood|        9|
|         206|Saint George/New ...|        9|
|         172|New Dorp/Midland ...|       10|
|          30|       Broad Channel|       10|
|         251|         Westerleigh