In [13]:
import os
from pathlib import Path


import pyspark

In [14]:
FHVH_DATA = Path('fhvh-data')

# Q1

In [2]:
pyspark.__version__

'3.3.2'

# Q2

In [3]:
from pyspark.sql import SparkSession

 
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "15g") \
    .appName("homework5"). \
    getOrCreate()

23/02/26 12:18:57 WARN Utils: Your hostname, gajop-desktop resolves to a loopback address: 127.0.1.1; using 192.168.10.87 instead (on interface enp2s0)
23/02/26 12:18:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/26 12:18:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read \
    .option("header", "true") \
    .csv("fhvhv_tripdata_2021-06.csv").repartition(12)

In [5]:
df.show()

[Stage 1:>                                                        (0 + 20) / 20]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-02 17:19:43|2021-06-02 17:47:47|          80|          65|      N|                B02764|
|              B02510|2021-06-02 16:36:18|2021-06-02 16:48:05|          39|          39|      N|                  null|
|              B02876|2021-06-01 02:11:30|2021-06-01 02:18:01|          39|          91|      N|                B02876|
|              B02510|2021-06-02 13:00:02|2021-06-02 13:04:07|          61|          61|      N|                  null|
|              B02835|2021-06-01 21:29:47|2021-06-01 21:53:40|          81|         212|      N|                B02835|
|              B02882|2021-06-01 22:34:1

                                                                                

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [8]:
from pyspark.sql import types

In [9]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [10]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("fhvhv_tripdata_2021-06.csv").repartition(12)

In [16]:
df.write.parquet(str(FHVH_DATA))

AnalysisException: path file:/home/gajop/projects/learn/data-engineering-zoomcamp/cohorts/2023/week_5_batch_processing/fhvh-data already exists.

In [26]:
files = [FHVH_DATA / f for f in os.listdir(FHVH_DATA) if f.endswith(".parquet")]
sizes = [f.stat().st_size for f in files]
average_size_mb = sum(sizes) / len(sizes) / 1024 / 1024

print(f'Partitions: {len(files)}')
assert(len(files) == 12)
average_size_mb

Partitions: 12


21.572226126988728

# Q3

In [47]:
df = spark.read.parquet("fhvh-data")

In [48]:
df.createOrReplaceTempView('fhvh_trips')

In [50]:
spark.sql("""
SELECT COUNT(*)
FROM
fhvh_trips
""").collect()

[Row(count(1)=14961892)]

# Q4

In [51]:
spark.sql("""
SELECT (dropoff_datetime - pickup_datetime) as duration
FROM
fhvh_trips
ORDER BY duration
DESC LIMIT 1
""").collect()

                                                                                

[Row(duration=datetime.timedelta(days=2, seconds=67964))]

In [55]:
days = 2
seconds = 67964
days * 24 + seconds / 3600

66.8788888888889

# Q6

In [58]:
df_zones = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")

In [63]:
df_zones.head(n=10)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone'),
 Row(LocationID='6', Borough='Staten Island', Zone='Arrochar/Fort Wadsworth', service_zone='Boro Zone'),
 Row(LocationID='7', Borough='Queens', Zone='Astoria', service_zone='Boro Zone'),
 Row(LocationID='8', Borough='Queens', Zone='Astoria Park', service_zone='Boro Zone'),
 Row(LocationID='9', Borough='Queens', Zone='Auburndale', service_zone='Boro Zone'),
 Row(LocationID='10', Borough='Queens', Zone='Baisley Park', service_zone='Boro Zone')]

In [60]:
df_zones.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [64]:
zone_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [66]:
df_zones = spark.read.option("header", "true").schema(zone_schema).csv("taxi_zone_lookup.csv")

In [68]:
df_zones.head(n=10)

[Row(LocationID=1, Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID=2, Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID=3, Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID=4, Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID=5, Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone'),
 Row(LocationID=6, Borough='Staten Island', Zone='Arrochar/Fort Wadsworth', service_zone='Boro Zone'),
 Row(LocationID=7, Borough='Queens', Zone='Astoria', service_zone='Boro Zone'),
 Row(LocationID=8, Borough='Queens', Zone='Astoria Park', service_zone='Boro Zone'),
 Row(LocationID=9, Borough='Queens', Zone='Auburndale', service_zone='Boro Zone'),
 Row(LocationID=10, Borough='Queens', Zone='Baisley Park', service_zone='Boro Zone')]

In [69]:
df_zones.createOrReplaceTempView('zones')

In [88]:
df_zone_frequency = spark.sql("""
SELECT 
    PULocationID,
    MAX(zones.Zone) as name,
    COUNT(*)
FROM
    fhvh_trips, zones
WHERE
    fhvh_trips.PULocationID == zones.LocationID
GROUP BY
    fhvh_trips.PULocationID
ORDER BY 3 DESC
""").collect()

                                                                                

In [89]:
df_zone_frequency

[Row(PULocationID=61, name='Crown Heights North', count(1)=231279),
 Row(PULocationID=79, name='East Village', count(1)=221244),
 Row(PULocationID=132, name='JFK Airport', count(1)=188867),
 Row(PULocationID=37, name='Bushwick South', count(1)=187929),
 Row(PULocationID=76, name='East New York', count(1)=186780),
 Row(PULocationID=231, name='TriBeCa/Civic Center', count(1)=164344),
 Row(PULocationID=138, name='LaGuardia Airport', count(1)=161596),
 Row(PULocationID=234, name='Union Sq', count(1)=158937),
 Row(PULocationID=249, name='West Village', count(1)=154698),
 Row(PULocationID=7, name='Astoria', count(1)=152493),
 Row(PULocationID=148, name='Lower East Side', count(1)=151020),
 Row(PULocationID=68, name='East Chelsea', count(1)=147673),
 Row(PULocationID=42, name='Central Harlem North', count(1)=146402),
 Row(PULocationID=255, name='Williamsburg (North Side)', count(1)=143683),
 Row(PULocationID=181, name='Park Slope', count(1)=143594),
 Row(PULocationID=225, name='Stuyvesant Hei