In [1]:
import pandas as pd 
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

25/03/04 11:13:49 WARN Utils: Your hostname, Davids-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.11 instead (on interface en0)
25/03/04 11:13:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/04 11:13:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Q1

In [3]:
spark.version

'3.5.5'

In [4]:
df = spark.read.option("header", "true") \
.parquet('yellow_tripdata_2024-10.parquet')

In [5]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [6]:
df.repartition(4).write.parquet('data/pq/', mode='overwrite')

                                                                                

# Q2 - 25MB

In [7]:
df = spark.read.parquet('data/pq')

In [8]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [9]:
df.show(1)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-04 23:19:27|  2024-10-04 23:43:28|              1|         6.07|         1|                 N|         229|         231|           1|       31.0|  1.0|    0.5|       4.

In [10]:
from pyspark.sql import functions as F

In [11]:
df = df \
.withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \
.withColumn('dropoff_date', F.to_date(df.tpep_dropoff_datetime))

In [12]:
# Q3

In [13]:
(df.filter (df.pickup_date=="2024-10-15")).count()

128893

# Q3 128893

In [14]:
df = df \
.withColumn('duration', df.tpep_dropoff_datetime - df.tpep_pickup_datetime)


In [15]:
from pyspark.sql.functions import col, unix_timestamp, max

# Compute trip duration in hours
df_with_duration = df.withColumn("trip_duration_hours", 
                                 (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600)

# Find the maximum trip duration
max_trip_duration = df_with_duration.agg(max("trip_duration_hours")).collect()[0][0]

print(f"Longest trip duration in hours: {max_trip_duration}")

Longest trip duration in hours: 162.61777777777777


# Q4  162.61777777777777

# Q5 4040

In [16]:
df_zones = spark.read.option("header", "true") .csv('data/taxi_zone_lookup.csv')

In [17]:
df_zones.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [18]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True), StructField('pickup_date',

In [19]:
df_result = df.join(df_zones, df.PULocationID==df_zones.LocationID)

In [20]:
df_result

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp_ntz, tpep_dropoff_datetime: timestamp_ntz, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double, pickup_date: date, dropoff_date: date, duration: interval day to second, LocationID: string, Borough: string, Zone: string, service_zone: string]

# Q6 

In [21]:
pickup_counts = df_result.groupBy("PULocationID", "Zone").agg(F.count("*").alias("trip_count"))

# Find the minimum trip count
min_trip_count = pickup_counts.agg(F.min("trip_count")).collect()[0][0]

# # Filter zones with the minimum trip count
least_frequent_pickup = pickup_counts.filter(F.col("trip_count") == min_trip_count)

least_frequent_pickup.show()

+------------+--------------------+----------+
|PULocationID|                Zone|trip_count|
+------------+--------------------+----------+
|         105|Governor's Island...|         1|
+------------+--------------------+----------+

