# Homework - Week 05 - Batch Processing

```bash
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
```


## Question 1: Install Spark and PySpark

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?
>> '3.5.4'

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

25/02/25 10:50:41 WARN Utils: Your hostname, Gabis-iMac-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.178.241 instead (on interface en1)
25/02/25 10:50:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/25 10:50:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.5.4'

## Question 2: Yellow October 2024

Read the October 2024 Yellow into a Spark Dataframe.

Repartition the Dataframe to 4 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

> 25MB


In [4]:
df_yellow_oct_24 = spark.read.parquet('./yellow_tripdata_2024-10.parquet') # read the yellow taxi data

                                                                                

In [None]:
df_yellow_oct_24.printSchema() # print the schema of the data, just to check

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [8]:
# repartion the data in 4 partitions and save it in the output path
df_yellow_oct_24 \
        .repartition(4) \
        .write.parquet("output_yellow_oct_24")

                                                                                

We can check manually or via Spark:

<img src="./question-2.png" width="70%">


## Question 3: Count records 

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

> 125,567


In [15]:
from pyspark.sql import functions as F

df_yellow_oct_24 = spark.read.parquet("yellow_tripdata_2024-10.parquet") # read the data from the output path

df_yellow_oct_24 \
    .withColumn('pickup_date', F.to_date(df_yellow_oct_24.tpep_pickup_datetime)) \
    .filter("pickup_date = '2024-10-15'") \
    .count()

128893

## Question 4: Longest trip

What is the length of the longest trip in the dataset in hours?

> 162

*Explanation:*
- `F.unix_timestamp()`: Converts the timestamp column into seconds.
- Duration Calculation: Subtracts drop-off time from pick-up time, and divides by 3600 to ge the value in hours
- `F.agg()`: Aggregates with max() while renaming the output column (alias()).
- `orderBy()`: Sorts by maximum duration in descending order.
- `limit()`: Restricts output to the top 5 rows.

In [18]:
from pyspark.sql import functions as F

df_yellow_oct_24 \
    .withColumn('duration_hours', 
                (F.unix_timestamp(df_yellow_oct_24.tpep_dropoff_datetime) - 
                 F.unix_timestamp(df_yellow_oct_24.tpep_pickup_datetime)) / 3600) \
    .withColumn('pickup_date', F.to_date(df_yellow_oct_24.tpep_pickup_datetime)) \
    .groupBy('pickup_date') \
    .agg(F.max('duration_hours').alias('max_duration_hours')) \
    .orderBy('max_duration_hours', ascending=False) \
    .limit(5) \
    .show()


+-----------+------------------+
|pickup_date|max_duration_hours|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-21| 89.89833333333333|
+-----------+------------------+



## Question 5: User Interface

Spark’s User Interface which shows the application's dashboard runs on which local port?

> 4040


## Question 6: Least frequent pickup location zone

Load the zone lookup data into a temp view in Spark:

```bash
wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
```

Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

> Governor's Island/Ellis Island/Liberty Island


In [21]:
# read the location data
df_location = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)

df_location.createOrReplaceTempView("zone_lookup") # create a temporary view for the location data
df_yellow_oct_24.createOrReplaceTempView("yellow_trip_data") # create a temporary view for the yellow data



In [None]:
# SQL Query to Find the Location with the Least Pickups
result = spark.sql("""
    SELECT z.Zone, COUNT(y.PULocationID) AS pickup_count
    FROM yellow_trip_data y
    JOIN zone_lookup z ON y.PULocationID = z.LocationID
    GROUP BY z.Zone
    ORDER BY pickup_count ASC
    LIMIT 5
""")

# Display the result
result.show()

+--------------------+------------+
|                Zone|pickup_count|
+--------------------+------------+
|Governor's Island...|           1|
|       Rikers Island|           2|
|       Arden Heights|           2|
|         Jamaica Bay|           3|
| Green-Wood Cemetery|           3|
+--------------------+------------+



In [25]:
# ALTERNATIVE SOLUTION 
# Step 1: Perform the join between yellow_trip_data and zone_lookup
joined_df = df_yellow_oct_24.join(df_location, df_yellow_oct_24.PULocationID == df_location.LocationID, "inner")

# Step 2: Count pickups by zone
pickup_counts = joined_df.groupBy("Zone").agg(F.count("PULocationID").alias("pickup_count"))

# Step 3: Find the 5 locations with the least pickups
result = pickup_counts.orderBy("pickup_count").limit(5)

# Step 4: Show the result
result.show()

+--------------------+------------+
|                Zone|pickup_count|
+--------------------+------------+
|Governor's Island...|           1|
|       Rikers Island|           2|
|       Arden Heights|           2|
|         Jamaica Bay|           3|
| Green-Wood Cemetery|           3|
+--------------------+------------+



In [26]:
# stop connection
spark.stop()