In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/06 23:40:56 WARN Utils: Your hostname, boyka.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.2 instead (on interface en0)
26/02/06 23:40:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/06 23:40:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**Q1**: Install Spark and PySpark

In [3]:
spark.version

'4.1.1'

**Q2**: Yellow October 2024

In [5]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2026-02-06 23:42:04--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.227.235.160, 13.227.235.199, 13.227.235.205, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.227.235.160|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2026-02-06 23:42:05 (52.7 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [11]:
!ls -lh yellow_tripdata_2024-10.parquet

-rw-r--r--@ 1 boyka  staff    61M Dec 19  2024 yellow_tripdata_2024-10.parquet


In [59]:
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

In [60]:
df.show(1)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [62]:
df = df.repartition(4)

df.write.parquet('data/pq/hw/2024/10/', compression="none", mode="overwrite")

                                                                                

In [63]:
df = spark.read.parquet('data/pq/hw/2024/10/')

In [64]:
!ls -lh data/pq/hw/2024/10/

total 213504
-rw-r--r--@ 1 boyka  staff     0B Feb  7 00:28 _SUCCESS
-rw-r--r--@ 1 boyka  staff    25M Feb  7 00:28 part-00000-d8b10b17-ba2e-4d47-a4cb-47cd4b2d123e-c000.parquet
-rw-r--r--@ 1 boyka  staff    25M Feb  7 00:28 part-00001-d8b10b17-ba2e-4d47-a4cb-47cd4b2d123e-c000.parquet
-rw-r--r--@ 1 boyka  staff    25M Feb  7 00:28 part-00002-d8b10b17-ba2e-4d47-a4cb-47cd4b2d123e-c000.parquet
-rw-r--r--@ 1 boyka  staff    25M Feb  7 00:28 part-00003-d8b10b17-ba2e-4d47-a4cb-47cd4b2d123e-c000.parquet


**Q3**: How many taxi trips were there on February 15?

In [28]:
from pyspark.sql import functions as F

In [84]:
df \
    .withColumn('pickup_datetime', F.to_date(df.tpep_pickup_datetime)) \
    .filter("pickup_datetime = '2024-10-15'") \
    .count()

128893

In [70]:
df.createOrReplaceTempView('yellow_tripdata_2024_10')

In [66]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    yellow_tripdata_2024_10
WHERE
    to_date(tpep_pickup_datetime) = '2024-10-15';
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



**Q4**: Longest trip for each day

In [67]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [46]:
df \
    .withColumn('duration', (F.unix_timestamp(df.tpep_dropoff_datetime) - F.unix_timestamp(df.tpep_pickup_datetime)) / 3600) \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()


+-----------+------------------+
|pickup_date|     max(duration)|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-21| 89.89833333333333|
+-----------+------------------+



In [47]:
spark.sql("""
SELECT
    to_date(tpep_pickup_datetime) AS pickup_date,
    MAX((unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 3600) AS duration
FROM 
    yellow_tripdata_2024_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
""").show()

+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-21| 89.89833333333333|
| 2024-10-20| 89.44611111111111|
| 2024-10-12| 67.57333333333334|
| 2024-10-17| 66.06666666666666|
| 2024-10-24| 38.47416666666667|
| 2024-10-23| 33.95111111111111|
+-----------+------------------+



**Q6**: Least frequent pickup location zone

In [48]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2026-02-07 00:22:03--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.227.235.160, 13.227.235.181, 13.227.235.205, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.227.235.160|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2026-02-07 00:22:04 (67.6 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [49]:
!ls

03_test.ipynb                   Untitled.ipynb
04_pyspark.ipynb                cloud.md
05_taxi_schema.ipynb            [1m[36mdata[m[m
06_spark_sql.ipynb              [31mdownload_data.sh[m[m
06_spark_sql.py                 hello-hw.py
06_spark_sql_big_query.py       homework.ipynb
07_groupby_join.ipynb           taxi_zone_lookup.csv
08_rdds.ipynb                   yellow_tripdata_2024-10.parquet
09_spark_gcs.ipynb


In [50]:
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [55]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('taxi_zone_lookup.csv')

In [57]:
df_zones.schema

StructType([StructField('LocationID', IntegerType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [47]:
df_zones = spark.read.parquet('zones')

In [58]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [68]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [69]:
df_zones.createOrReplaceTempView('zones')

In [76]:
spark.sql("""
SELECT
    pul.Zone,
    COUNT(1)
FROM 
    yellow_tripdata_2024_10 tripdata LEFT JOIN zones pul ON tripdata.PULocationID = pul.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show(truncate=False)

+---------------------------------------------+--------+
|Zone                                         |count(1)|
+---------------------------------------------+--------+
|Governor's Island/Ellis Island/Liberty Island|1       |
|Rikers Island                                |2       |
|Arden Heights                                |2       |
|Jamaica Bay                                  |3       |
|Green-Wood Cemetery                          |3       |
+---------------------------------------------+--------+

