In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 19:34:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Question 1: Install Spark and PySpark

In [3]:
spark.version

'3.5.5'

## Question 2: Yellow October 2024

In [4]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-06 17:03:02--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:2070:c000:b:20a5:b140:21, 2600:9000:2070:8200:b:20a5:b140:21, 2600:9000:2070:b800:b:20a5:b140:21, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2070:c000:b:20a5:b140:21|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-06 17:03:12 (6,76 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [5]:
# Specify the path to your Parquet file
file_path = '05-batch/yellow_tripdata_2024-10.parquet'

# Read the Parquet file into a DataFrame
df = spark.read.parquet(file_path)

df.show(5)


                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [6]:
# Repartition the DataFrame into 4 partitions
df = df.repartition(4)

# Specify the output path for the repartitioned Parquet file
output_path = '05-batch/yellow_tripdata_2024-10-repartitioned.parquet'

# Save the repartitioned DataFrame to Parquet
df.write.parquet(output_path)


                                                                                

**Q3**: How many taxi trips were there on October 15?

In [3]:
from pyspark.sql import functions as F

In [9]:
file_path = '05-batch/yellow_tripdata_2024-10.parquet'
df = spark.read.parquet(file_path)

df \
    .withColumn('pickup_date', F.to_date(
        F.from_utc_timestamp(df.tpep_pickup_datetime, 'Europe/Berlin')
        )) \
    .filter("pickup_date = '2024-10-15'") \
    .count()

                                                                                

125567

In [12]:
df.registerTempTable('test')



In [15]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    test
WHERE
    to_date(tpep_pickup_datetime) = '2024-10-15';
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



**Q4**: Longest trip

In [16]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [22]:
# Calculate duration and order by it, then show the 5 longest trips
df \
    .withColumn('duration_hours', 
               (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) / 3600) \
    .select('duration_hours', 'tpep_pickup_datetime', 'tpep_dropoff_datetime') \
    .orderBy('duration_hours', ascending=False) \
    .limit(5) \
    .show()

+------------------+--------------------+---------------------+
|    duration_hours|tpep_pickup_datetime|tpep_dropoff_datetime|
+------------------+--------------------+---------------------+
|162.61777777777777| 2024-10-16 13:03:49|  2024-10-23 07:40:53|
|           143.325| 2024-10-03 18:47:25|  2024-10-09 18:06:55|
|137.76055555555556| 2024-10-22 16:00:55|  2024-10-28 09:46:33|
|114.83472222222223| 2024-10-18 09:53:32|  2024-10-23 04:43:37|
| 89.89833333333333| 2024-10-21 00:36:24|  2024-10-24 18:30:18|
+------------------+--------------------+---------------------+



## Question 6: Least frequent pickup location zone

In [23]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-06 17:33:55--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:2070:c400:b:20a5:b140:21, 2600:9000:2070:c800:b:20a5:b140:21, 2600:9000:2070:6a00:b:20a5:b140:21, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2070:c400:b:20a5:b140:21|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-06 17:33:56 (4,53 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [27]:
df_zones = spark.read.option("header", "true").csv('05-batch/taxi_zone_lookup.csv')

In [28]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [29]:
# Join the Yellow October data with zone lookup data on PULocationID and LocationID
df_with_zone = df.join(df_zones, df.PULocationID == df_zones.LocationID, 'left') \
    .select('Zone')  # Only select the Zone column for counting

# Group by Zone and count the number of pickups
zone_counts = df_with_zone.groupBy('Zone').count()

# Sort by count in ascending order to find the least frequent zone
least_frequent_zone = zone_counts.orderBy('count').limit(5)

# Show the result
least_frequent_zone.show()



+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|Governor's Island...|    1|
|       Rikers Island|    2|
|       Arden Heights|    2|
|         Jamaica Bay|    3|
| Green-Wood Cemetery|    3|
+--------------------+-----+



                                                                                