# Batch

In [27]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, unix_timestamp, max, count, asc

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

25/02/28 14:17:54 WARN Utils: Your hostname, Aspire resolves to a loopback address: 127.0.1.1; using 192.168.0.164 instead (on interface wlp3s0)
25/02/28 14:17:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/28 14:17:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Question 1

In [3]:
spark.version

'3.5.5'

25/02/28 14:18:14 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Question 2

In [8]:
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

In [9]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [15]:
output_dir = "output_directory" 

In [12]:
df.repartition(4) \
  .write \
  .parquet(output_dir)

                                                                                

In [18]:
parquet_files = [f for f in os.listdir(output_dir) if f.endswith(".parquet")]
total_size = sum(os.path.getsize(os.path.join(output_dir, f)) for f in parquet_files)
average_size = total_size / len(parquet_files) if parquet_files else 0
f"Average file size: {average_size / (1024 * 1024):.2f} MB"

'Average file size: 22.39 MB'

## Question 3

In [19]:
df_filtered = df.filter(to_date(col("tpep_pickup_datetime")) == "2024-10-15")
trip_count = df_filtered.count()
f"Number of taxi trips on October 15th: {trip_count}"

'Number of taxi trips on October 15th: 128893'

## Question 4

In [21]:
df_with_duration = df.withColumn(
    "trip_duration_hours",
    (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600
)
longest_trip = df_with_duration.select(max("trip_duration_hours")).collect()[0][0]
f"Longest trip duration: {longest_trip:.2f} hours"

'Longest trip duration: 162.62 hours'

## Question 6

In [24]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-02-28 14:41:48--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
13.227.153.93, 13.227.153.16, 13.227.153.43, ...zurychx.cloudfront.net)... 
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.227.153.93|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-02-28 14:41:51 (3,37 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [29]:
df_zone = spark.read.option("header", "true").csv('taxi_zone_lookup.csv')
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [42]:
df_trips = spark.read.parquet("yellow_tripdata_2024-10.parquet")
df_trips = df_trips.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp")) \
                   .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))

Column<'VendorID'>

In [44]:
df_joined = df_trips.join(df_zone, df_trips.PULocationID == df_zone.LocationID, "left")
df_zone_counts = df_joined.groupBy("Zone").agg(count("*").alias("trip_count"))
least_frequent_zone = df_zone_counts.orderBy(asc("trip_count")).limit(3)
least_frequent_zone.show()

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|Governor's Island...|         1|
|       Rikers Island|         2|
|       Arden Heights|         2|
+--------------------+----------+

