# Lab Assignment 3

- Name - Aryan Gupta
- Roll No.- 230150003
- Date - 13 Aug, 2025
- Course - DA331 Big Data Analytics: Tools & Techniques

## Import Library

In [1]:
# !pip install pyspark
# !pip install opendatasets

In [2]:
# import pyspark.sql classes and functions
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType, TimestampType
from pyspark.sql.functions import col, array_contains, isnan, when, count
from pyspark.sql.functions import lit, concat_ws, concat, collect_list, udf
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import max, corr, avg, sum, dayofweek

import opendatasets as od
import numpy as np

spark = SparkSession.builder.appName("answers").getOrCreate()

## Load Data

In [3]:
# Get where the session is built
!pwd

/content


In [4]:
# od.download("https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data")

As the data is very big, let us work with the subset of the data for faster processing.

In [5]:
path = './nyc-yellow-taxi-trip-data/yellow_tripdata_2016-03.csv'

### Get the feel of the data

In [6]:
df = spark.read.option("header",'True').option('delimiter', ',').csv(path)
print(df.count())
print(len(df.columns))
print(df.printSchema())
print(df.show(5))

12210952
19
root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)

None
+--------+--------------------+---------------------+---------------+-------------+-------------------+-------

## Working with sampled data
 This is done for faster processing. We will use 1000 rows at random.

In [7]:
# Define the schema with correct data types
schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])

# Read the CSV with the defined schema and sample the data
df_sampled = spark.read.option("header",'True').option('delimiter', ',').schema(schema).csv(path) \
            .sample(withReplacement=False, fraction=1000 / df.count(), seed=69)

print(df_sampled.count())
df_sampled.printSchema()
df_sampled.show(5)

989
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+-----------

In [8]:
# put store_and_fwd_flag column as boolean type
df_sampled = df_sampled.withColumn(
    "store_and_fwd_flag",
    when(col("store_and_fwd_flag") == "Y", True).otherwise(False).cast(BooleanType())
)

df_sampled = df_sampled.withColumn("store_and_fwd_flag", col("store_and_fwd_flag").cast(StringType()))

df_sampled.printSchema()
df_sampled.select("store_and_fwd_flag").show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = false)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+------------------+
|store_and_fwd_flag|
+------------------+
|             false|
|             false|
|          

### Q1 - Which drivers had the most number of passengers during weekends?

In [9]:
df_weekends = df_sampled.filter(dayofweek(col("tpep_pickup_datetime")).isin([1, 7]))
# 1 - Sunday, 7 - Saturday
print(df_weekends.count())

270


In [10]:
df_passenger_counts = df_weekends.groupBy("VendorID").agg(sum("passenger_count").alias("total_weekend_passengers"))
df_passenger_counts.show()

+--------+------------------------+
|VendorID|total_weekend_passengers|
+--------+------------------------+
|       1|                     160|
|       2|                     303|
+--------+------------------------+



In [11]:
df_ordered = df_passenger_counts.orderBy(col("total_weekend_passengers").desc())
print('Drivers with the most number of passengers during weekends.')
print(df_ordered.show(1))

Drivers with the most number of passengers during weekends.
+--------+------------------------+
|VendorID|total_weekend_passengers|
+--------+------------------------+
|       2|                     303|
+--------+------------------------+
only showing top 1 row

None


### Q2 - Which drivers had the most tip to fare ratio during weekdays?

In [12]:
# not weekend
df_weekdays_full = df_sampled.filter(~dayofweek(col("tpep_pickup_datetime")).isin([1, 7]))
#select relevant columns
df_weekdays = df_weekdays_full.select("VendorID", "tip_amount", "fare_amount")

print(df_weekdays.count())

719


In [13]:
# tip-to-fare ratio for each trip
df_weekdays_with_ratio = df_weekdays.withColumn(
    "tip_fare_ratio",
    when(col("fare_amount") > 0, col("tip_amount") / col("fare_amount")).otherwise(0.0)
) # when used to avoid division by 0

# Group by VendorID and average
df_avg_tip_ratio_by_vendor = df_weekdays_with_ratio.groupBy("VendorID") \
            .agg(avg("tip_fare_ratio").alias("average_tip_fare_ratio"))

# get best drivers
df_avg_tip_ratio_ordered = df_avg_tip_ratio_by_vendor.orderBy(col("average_tip_fare_ratio").desc())

print("Driver with most tip to fare ratio during weekdays:-")
print(df_avg_tip_ratio_ordered.show(1))

Driver with most tip to fare ratio during weekdays:-
+--------+----------------------+
|VendorID|average_tip_fare_ratio|
+--------+----------------------+
|       2|   0.14448068088927166|
+--------+----------------------+
only showing top 1 row

None


### Q3 - Which drivers avoided tolls the most?

In [14]:
# Mark each trip as 1(toll given) or 0(not given)
toll_counts_by_vendor = df_sampled.groupBy('VendorID') \
.agg(
    sum(when(col("tolls_amount") > 0, 1).otherwise(0)).alias("trips_with_tolls")
)

# Get best drivers
vendors_avoiding_tolls = toll_counts_by_vendor.orderBy(col("trips_with_tolls").asc())
vendors_avoiding_tolls.show(1)

+--------+----------------+
|VendorID|trips_with_tolls|
+--------+----------------+
|       1|              20|
+--------+----------------+
only showing top 1 row



### Q4 - Which rate code had the most trips?

In [15]:
# Group by RatecodeID and count no. of trips
rate_code_counts = df_sampled.groupBy("RatecodeID").agg(
    count("*").alias("total_trips")
)

# get best rate code
most_frequent_rate_code = rate_code_counts.orderBy(col("total_trips").desc())
print('Rate code with most trips:-')
most_frequent_rate_code.show(1)
best_rate_code = most_frequent_rate_code.first()["RatecodeID"] # for next question

Rate code with most trips:-
+----------+-----------+
|RatecodeID|total_trips|
+----------+-----------+
|         1|        965|
+----------+-----------+
only showing top 1 row



### Q5 - Which drivers rode the most best rate code?

In [16]:
# Filter best trips
best_rate_code_trips = df_sampled.filter(col("RatecodeID") == best_rate_code)

# Group by VendorID and count no. of trips
trips_at_best_rate_by_vendor = best_rate_code_trips.groupBy('VendorID').agg(
    count("*").alias("trips_at_best_rate")
)

# get best drivers
drivers_with_most_best_rate_trips = trips_at_best_rate_by_vendor.orderBy(col("trips_at_best_rate").desc())
drivers_with_most_best_rate_trips.show(1)

+--------+------------------+
|VendorID|trips_at_best_rate|
+--------+------------------+
|       2|               528|
+--------+------------------+
only showing top 1 row



## Working with full data

In [17]:
file_paths = [
    "/content/nyc-yellow-taxi-trip-data/yellow_tripdata_2015-01.csv",
    "/content/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-01.csv",
    "/content/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-02.csv",
    "/content/nyc-yellow-taxi-trip-data/yellow_tripdata_2016-03.csv"
]

df_full = spark.read.option("header",'True').option('delimiter', ',').schema(schema).csv(file_paths[0])

# Read and concatenate the rest
for path in file_paths[1:]:
    temp_df = spark.read.option("header",'True').option('delimiter', ',').schema(schema).csv(path)
    df_full = df_full.unionByName(temp_df)

In [18]:
# Show schema and first few rows of the full DataFrame
df_full.printSchema()
df_full.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+---------------

### Answer q1 - weekend passengers

In [19]:
df_weekends_full = df_full.filter(dayofweek(col("tpep_pickup_datetime")).isin([1, 7]))
df_passenger_counts_full = df_weekends_full.groupBy("VendorID").agg(sum("passenger_count").alias("total_weekend_passengers"))
df_ordered_full = df_passenger_counts_full.orderBy(col("total_weekend_passengers").desc())
print('Drivers with the most number of passengers during weekends (full dataset):')
df_ordered_full.show(1)

Drivers with the most number of passengers during weekends (full dataset):
+--------+------------------------+
|VendorID|total_weekend_passengers|
+--------+------------------------+
|       2|                14858253|
+--------+------------------------+
only showing top 1 row



Answer - The driver with `VendorID = 2` had the most number of passengers during weekends.

### Answer q2 - tip to fare ratio on weekdays

In [20]:
# 1. Filter for weekday trips
df_weekdays_full = df_full.filter(~dayofweek(col("tpep_pickup_datetime")).isin([1, 7]))

# 2. Select relevant columns and 3. Calculate tip-to-fare ratio
df_weekdays_with_ratio_full = df_weekdays_full.select("VendorID", "tip_amount", "fare_amount") \
    .withColumn(
    "tip_fare_ratio",
    when(col("fare_amount") > 0, col("tip_amount") / col("fare_amount")).otherwise(0.0)
)

# 4. Group by VendorID and 5. Calculate average ratio
df_avg_tip_ratio_by_vendor_full = df_weekdays_with_ratio_full.groupBy("VendorID") \
            .agg(avg("tip_fare_ratio").alias("average_tip_fare_ratio"))

# 6. Order by average ratio
df_avg_tip_ratio_ordered_full = df_avg_tip_ratio_by_vendor_full.orderBy(col("average_tip_fare_ratio").desc())

# 7. Display the top driver(s)
print("Driver with most tip to fare ratio during weekdays (full dataset):-")
df_avg_tip_ratio_ordered_full.show(1)

# 8. Delete temporary DataFrames
df_weekdays_full.unpersist()
df_weekdays_with_ratio_full.unpersist()
df_avg_tip_ratio_by_vendor_full.unpersist()
df_avg_tip_ratio_ordered_full.unpersist()

Driver with most tip to fare ratio during weekdays (full dataset):-
+--------+----------------------+
|VendorID|average_tip_fare_ratio|
+--------+----------------------+
|       1|   0.18602175713362476|
+--------+----------------------+
only showing top 1 row



DataFrame[VendorID: int, average_tip_fare_ratio: double]

Answer - The driver with `VendorID = 1` had the most tip to fare ratio during weekdays.

### Answer q3 - drivers avoiding tolls

In [21]:
# 1. Select relevant columns
df_tolls = df_full.select("VendorID", "tolls_amount")

# 2. Group by VendorID and 3. Count trips with tolls
toll_counts_by_vendor_full = df_tolls.groupBy('VendorID') \
.agg(
    sum(when(col("tolls_amount") > 0, 1).otherwise(0)).alias("trips_with_tolls")
)

# 4. Order by trips_with_tolls in ascending order
vendors_avoiding_tolls_full = toll_counts_by_vendor_full.orderBy(col("trips_with_tolls").asc())

# 5. Show the top driver(s)
print('Drivers who avoided tolls the most (full dataset):-')
vendors_avoiding_tolls_full.show(1)

# 6. Unpersist temporary DataFrame
df_tolls.unpersist()
toll_counts_by_vendor_full.unpersist()
vendors_avoiding_tolls_full.unpersist()

Drivers who avoided tolls the most (full dataset):-
+--------+----------------+
|VendorID|trips_with_tolls|
+--------+----------------+
|       1|         1016350|
+--------+----------------+
only showing top 1 row



DataFrame[VendorID: int, trips_with_tolls: bigint]

Answer - The driver with `VendorID = 1` avoided tolls the most.

### Answer q4 - most frequent rate code

In [22]:
# 1. Select the RatecodeID column and 2. Group by RatecodeID and 3. Count trips
rate_code_counts = df_full.groupBy("RatecodeID").agg(
    count("*").alias("total_trips")
)

# 4. Order by total_trips in descending order
most_frequent_rate_code = rate_code_counts.orderBy(col("total_trips").desc())

# 5. Display the top result
print('Rate code with most trips (full dataset):-')
most_frequent_rate_code.show(1)

# 6. Unpersist temporary DataFrames
rate_code_counts.unpersist()
most_frequent_rate_code.unpersist()

Rate code with most trips (full dataset):-
+----------+-----------+
|RatecodeID|total_trips|
+----------+-----------+
|         1|   46091898|
+----------+-----------+
only showing top 1 row



DataFrame[RatecodeID: int, total_trips: bigint]

Answer - The Rate Code with `RatecodeID = 1`, i. e., the Standard Rate, had the most trips.

### Answer q5 - drivers with most trips at best rate code

In [23]:
# 1. Retrieve the value of the best_rate_code
best_rate_code = 1

# 2. Filter the df_full DataFrame
best_rate_code_trips = df_full.filter(col("RatecodeID") == best_rate_code)

# 3. Group by "VendorID" and 4. Count the number of trips
trips_at_best_rate_by_vendor = best_rate_code_trips.groupBy('VendorID').agg(
    count("*").alias("trips_at_best_rate")
)

# 5. Order the resulting DataFrame
drivers_with_most_best_rate_trips = trips_at_best_rate_by_vendor.orderBy(col("trips_at_best_rate").desc())

# 6. Display the top driver(s)
print("Drivers with the most best rate code trips (full dataset):-")
drivers_with_most_best_rate_trips.show(1)

# Delete temporary dataframes
best_rate_code_trips.unpersist()
trips_at_best_rate_by_vendor.unpersist()
drivers_with_most_best_rate_trips.unpersist()

Drivers with the most best rate code trips (full dataset):-
+--------+------------------+
|VendorID|trips_at_best_rate|
+--------+------------------+
|       2|          24387232|
+--------+------------------+
only showing top 1 row



DataFrame[VendorID: int, trips_at_best_rate: bigint]

Answer - The driver with `VendorID = 2`  rode the most best rate code, or the standard rate.