In [0]:
catalog = "workspace"
schema  = "bde"
volume  = "assignment2"

GREEN_DST = f"/Volumes/{catalog}/{schema}/{volume}/green"
YELLOW_DST = f"/Volumes/{catalog}/{schema}/{volume}/yellow"


spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"CREATE SCHEMA  IF NOT EXISTS {catalog}.{schema}")
spark.sql(f"CREATE VOLUME  IF NOT EXISTS {catalog}.{schema}.{volume}")

dbutils.fs.mkdirs(GREEN_DST)
dbutils.fs.mkdirs(YELLOW_DST)

In [0]:
import pkgutil
import sys
import subprocess
from pathlib import Path

if not pkgutil.find_loader("gdown"):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "gdown"])

import gdown

def fetch_to_volume(file_id: str, dst_dir: str, fname: str):
    url = f"https://drive.google.com/uc?id={file_id}"
    dst_path = f"{dst_dir}/{fname}"
    gdown.download(url, dst_path, quiet=False)
    return dst_path

In [0]:
green_ids = [
    ("1XlToYPB2fJ9Ky6VlNLIjWggDBXvpF-Cd", "green_taxi.parquet")
]

for fid, fname in green_ids:
    fetch_to_volume(fid, GREEN_DST, fname)

In [0]:
yellow_ids = [
    ("1W-f4gXbYrfqS6nWoLestn_1xBJSq8M1r", "yellow_taxi.parquet")
]


for fid, fname in yellow_ids:
    fetch_to_volume(fid, YELLOW_DST, fname)

In [0]:
print("Files now in Volume:")
display(dbutils.fs.ls(GREEN_DST))
display(dbutils.fs.ls(YELLOW_DST))

In [0]:
green_df  = spark.read.parquet(GREEN_DST)
display(green_df)
green_df.count() 

In [0]:
yellow_df = spark.read.parquet(YELLOW_DST)
display(yellow_df)
yellow_df.count() 

In [0]:
green_count  = green_df.count()
yellow_count = yellow_df.count()
total_count  = green_count + yellow_count

print(green_count)
print(yellow_count)
print(total_count) 


In [0]:
yellow_df = spark.read.parquet("/Volumes/workspace/bde/assignment2/yellow")
green_df = spark.read.parquet("/Volumes/workspace/bde/assignment2/green")
location_df = spark.read.csv(
    "/Volumes/workspace/bde/assignment2/Taxi_CSV/taxi_zone_lookup.csv",
    header=True,
    inferSchema=True
)

In [0]:
green_df.printSchema()


In [0]:
from pyspark.sql.functions import col

if "tpep_pickup_datetime" in yellow_df.columns:
    yellow_df = yellow_df.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
                         .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

if "lpep_pickup_datetime" in green_df.columns:
    green_df = green_df.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
                       .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

yellow_df.printSchema()
green_df.printSchema()


In [0]:
from pyspark.sql.functions import expr, unix_timestamp, col, when


df = yellow_df.withColumn(
    "duration_sec", unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")
).withColumn(
    "speed_kph", expr("try_divide(trip_distance, duration_sec/3600.0)")
)


In [0]:
from pyspark.sql.functions import col, when, unix_timestamp, lit
from pyspark.sql import functions as F

def clean_trips_percentile(df, has_airport_fee=False):
    df = df.withColumn(
        "duration_sec", 
        unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")
    )
    df = df.withColumn(
        "speed_kph", 
        when(col("duration_sec") > 0, col("trip_distance") / (col("duration_sec") / 3600.0))
        .otherwise(None)
    )

    df = df.filter(
        col("pickup_datetime").isNotNull() & 
        col("dropoff_datetime").isNotNull() & 
        (col("dropoff_datetime") > col("pickup_datetime"))
    )
    df = df.filter((col("duration_sec") >= 30) & (col("duration_sec") <= 12000))

    df = df.filter((col("trip_distance") >= 0.1) & (col("trip_distance") <= 100))

    df = df.filter(
        (col("speed_kph").isNull()) | ((col("speed_kph") > 0) & (col("speed_kph") <= 120))
    )

    df = df.withColumn(
        "passenger_count", 
        when((col("passenger_count").isNull()) | (col("passenger_count") <= 0), 1)
        .when(col("passenger_count") > 6, 6)
        .otherwise(col("passenger_count"))
    )

    base_amount = (
        col("fare_amount") + col("extra") + col("mta_tax") + 
        col("tip_amount") + col("tolls_amount") + 
        col("improvement_surcharge") + col("congestion_surcharge")
    )
    if has_airport_fee and "airport_fee" in df.columns:
        base_amount = base_amount + col("airport_fee")

    df = df.withColumn(
        "total_amount",
        when((col("total_amount").isNull()) | (col("total_amount") <= 0), base_amount)
        .otherwise(col("total_amount"))
    )

    df = df.filter((col("pickup_datetime").isNotNull()) & (col("dropoff_datetime").isNotNull()))
    df = df.filter((F.year("pickup_datetime") >= 2009) & (F.year("pickup_datetime") <= 2024))

    return df


In [0]:
yellow_clean = clean_trips_percentile(yellow_df)
green_clean = clean_trips_percentile(green_df)


yellow_total, green_total = yellow_df.count(), green_df.count()
yellow_clean_total, green_clean_total = yellow_clean.count(), green_clean.count()

yellow_removed_pct = (yellow_total - yellow_clean_total) / yellow_total * 100
green_removed_pct = (green_total - green_clean_total) / green_total * 100

print(f"Yellow Removed: {yellow_removed_pct:.2f}%")
print(f"Green Removed: {green_removed_pct:.2f}%")

In [0]:
from pyspark.sql.functions import lit

yellow_clean = (yellow_clean
    .withColumn("ehail_fee", lit(0.0))             
    .withColumn("trip_type", lit(None).cast("double"))  
)

green_clean = green_clean.withColumn("airport_fee", lit(0.0))

yellow_clean = yellow_clean.withColumn("taxi_color", lit("yellow"))
green_clean  = green_clean.withColumn("taxi_color", lit("green"))

final_cols = sorted(set(yellow_clean.columns) | set(green_clean.columns))

yellow_final = yellow_clean.select(final_cols)
green_final  = green_clean.select(final_cols)

combined_df = yellow_final.unionByName(green_final)

print("Combined total rows:", combined_df.count())


In [0]:
from pyspark.sql.functions import col

location_df = spark.read.csv(
    "/Volumes/workspace/bde/assignment2/Taxi_CSV/taxi_zone_lookup.csv",
    header=True,
    inferSchema=True
)

pickup_loc = location_df.select(
    col("LocationID").alias("PULocationID_lookup"),
    col("Borough").alias("pickup_borough"),
    col("Zone").alias("pickup_zone"),
    col("service_zone").alias("pickup_service_zone")
)

dropoff_loc = location_df.select(
    col("LocationID").alias("DOLocationID_lookup"),
    col("Borough").alias("dropoff_borough"),
    col("Zone").alias("dropoff_zone"),
    col("service_zone").alias("dropoff_service_zone")
)

combined_df = (combined_df
    .join(pickup_loc, combined_df["PULocationID"] == col("PULocationID_lookup"), "left")
    .drop("PULocationID_lookup")
    .join(dropoff_loc, combined_df["DOLocationID"] == col("DOLocationID_lookup"), "left")
    .drop("DOLocationID_lookup")
)

combined_df.printSchema()


In [0]:
from pyspark.sql.functions import col, sum

combined_df.select(
    [sum(col(c).isNull().cast("int")).alias(c) for c in [
        "pickup_datetime", "dropoff_datetime", 
        "PULocationID", "DOLocationID", 
        "trip_distance", "total_amount", "passenger_count"
    ]]
).show()


In [0]:
combined_df.select(
    "pickup_datetime", "dropoff_datetime", 
    "pickup_borough", "dropoff_borough", 
    "trip_distance", "total_amount", "passenger_count"
).show(10, truncate=False)


In [0]:
combined_df.selectExpr(
    "min(trip_distance) as min_dist",
    "max(trip_distance) as max_dist",
    "avg(trip_distance) as avg_dist"
).show()


In [0]:
combined_df = combined_df.filter((col("trip_distance") > 0.05) & (col("trip_distance") < 100))
combined_df = combined_df.filter(col("total_amount").isNotNull())


In [0]:
combined_df.createOrReplaceTempView("taxi_trips")



In [0]:

%sql
SELECT * FROM taxi_trips LIMIT 10;


###**Q1 b) Which day of week (e.g. monday, tuesday, etc..) had the most trips**

###**Q1 c) Which hour of the day had the most trips**

###**Q1 d)What was the average number of passengers**

###**Q1 e) What was the average amount paid per trip (using total_amount)**

###**Q1 f) What was the average amount paid per passenger (using total_amount)**

In [0]:
%sql
SELECT 
    YEAR(pickup_datetime) AS year,
    MONTH(pickup_datetime) AS month,

    COUNT(*) AS total_trips,

    DAYOFWEEK(pickup_datetime) AS weekday_num,

    HOUR(pickup_datetime) AS hour_of_day,

    ROUND(AVG(passenger_count), 2) AS avg_passengers,
    
    ROUND(AVG(total_amount), 2) AS avg_amount_per_trip,
    
    ROUND(AVG(total_amount / NULLIF(passenger_count,0)), 2) AS avg_amount_per_passenger

FROM taxi_trips
WHERE YEAR(pickup_datetime) BETWEEN 2009 AND 2024
  AND passenger_count > 0
GROUP BY 
    YEAR(pickup_datetime),
    MONTH(pickup_datetime),
    DAYOFWEEK(pickup_datetime),
    HOUR(pickup_datetime)
ORDER BY year, month, weekday_num, hour_of_day;


**Trip volumes steadily increased from 2009, peaking in 2013–2014 at more than 16M trips per month, before gradually declining through the late 2010s. In 2020, trip counts collapsed to as low as 263K in April 2020 due to COVID-19 restrictions, compared with over 6–7M in the months just prior. A gradual recovery followed from 2021 onward, with monthly volumes rising above 3M by 2023–2024, though still below historic highs. This trajectory highlights both the long-term decline in ridership since 2015 and the dramatic but temporary pandemic shock in 2020.**

# **Part 2**

###**Q2 a)What was the average, median, minimum and maximum trip duration in minutes (with 2 decimals, eg. 90 seconds = 1.50 min)**

##**Q2 b) What was the average, median, minimum and maximum trip distance in km?**

###**Q2 c) What was the average, median, minimum and maximum speed in km per hour 10?**

In [0]:
%sql
SELECT 
    taxi_color,

    
    ROUND(AVG(duration_sec / 60.0), 2) AS avg_duration_min,
    ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY duration_sec / 60.0), 2) AS median_duration_min,
    ROUND(MIN(duration_sec / 60.0), 2) AS min_duration_min,
    ROUND(MAX(duration_sec / 60.0), 2) AS max_duration_min,

    ROUND(AVG(trip_distance), 2) AS avg_distance_km,
    ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY trip_distance), 2) AS median_distance_km,
    ROUND(MIN(trip_distance), 2) AS min_distance_km,
    ROUND(MAX(trip_distance), 2) AS max_distance_km,

    ROUND(AVG(speed_kph), 2) AS avg_speed_kph,
    ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY speed_kph), 2) AS median_speed_kph,
    ROUND(MIN(speed_kph), 2) AS min_speed_kph,
    ROUND(MAX(speed_kph), 2) AS max_speed_kph

FROM taxi_trips
WHERE duration_sec IS NOT NULL
  AND trip_distance IS NOT NULL
  AND speed_kph IS NOT NULL
GROUP BY taxi_color
ORDER BY taxi_color;


**green taxi trips averaged 13.8 minutes and 3.05 km, while yellow taxis averaged 14.4 minutes and 3.06 km, showing very similar ride characteristics. Median values were slightly lower (around 11 minutes and 1.7–2.0 km), reflecting the predominance of short urban trips. Minimums (0.5 minutes, 0.1 km) capture near-zero trips, while maximums (200 minutes, ~100 km) reflect occasional outliers or airport rides. In terms of speed, green taxis averaged 12.8 km/h and yellow taxis 11.8 km/h, with medians of 11.6 and 10.3 km/h respectively, consistent with New York’s congested traffic. These findings confirm that most trips are short, low-speed urban journeys, with only a small share of long-haul rides.**

#**Part 3**

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW q3_base AS
SELECT
  taxi_color,
  pickup_borough,
  dropoff_borough,
  pickup_datetime,
  trip_distance,
  total_amount
FROM taxi_trips
WHERE pickup_borough IS NOT NULL
  AND dropoff_borough IS NOT NULL
  AND YEAR(pickup_datetime) BETWEEN 2009 AND 2024;


###**Q3 a) Total number of trips**

In [0]:
%sql
SELECT
  taxi_color,
  pickup_borough,
  dropoff_borough,
  YEAR(pickup_datetime)  AS year,
  MONTH(pickup_datetime) AS month,
  DAYOFWEEK(pickup_datetime) AS day_of_week,   
  HOUR(pickup_datetime)  AS hour_of_day,
  COUNT(*) AS total_trips
FROM q3_base
GROUP BY
  taxi_color, pickup_borough, dropoff_borough,
  YEAR(pickup_datetime), MONTH(pickup_datetime),
  DAYOFWEEK(pickup_datetime), HOUR(pickup_datetime)
ORDER BY year, month, day_of_week, hour_of_day, total_trips DESC;


###**Data cleaning of single digit trips of months**

We excluded single-digit trip counts from early 2009 because they reflect incomplete or invalid records: green taxis did not exist before 2013 (so any green rows are errors), yellow taxi data in early 2009 was patchy as TLC only began publishing complete records that year, and missing location IDs often produced “Unknown” boroughs. These anomalies represent data quality issues rather than real traffic, so filtering them ensures the dataset remains valid and representative while keeping removals under the 10% threshold.

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW taxi_trips_clean AS
SELECT *
FROM taxi_trips
WHERE 
    YEAR(pickup_datetime) BETWEEN 2010 AND 2024
    
    AND NOT (taxi_color = 'yellow' AND YEAR(pickup_datetime) < 2010)
    
    AND NOT (taxi_color = 'green' AND YEAR(pickup_datetime) < 2013)
    
    AND pickup_borough IS NOT NULL
    AND dropoff_borough IS NOT NULL;


###**Q3 b) Average distance (km)**

In [0]:
%sql
SELECT
  taxi_color,
  pickup_borough,
  dropoff_borough,
  YEAR(pickup_datetime)  AS year,
  MONTH(pickup_datetime) AS month,
  DAYOFWEEK(pickup_datetime) AS day_of_week,
  HOUR(pickup_datetime)  AS hour_of_day,
  ROUND(AVG(trip_distance), 2) AS avg_distance_km
FROM taxi_trips_cleaned
GROUP BY
  taxi_color, pickup_borough, dropoff_borough,
  YEAR(pickup_datetime), MONTH(pickup_datetime),
  DAYOFWEEK(pickup_datetime), HOUR(pickup_datetime)
ORDER BY year, month, day_of_week, hour_of_day;


##**Q3c) Average amount paid per trip (using total_amount)**

In [0]:
%sql
SELECT
  taxi_color,
  pickup_borough,
  dropoff_borough,
  YEAR(pickup_datetime)  AS year,
  MONTH(pickup_datetime) AS month,
  DAYOFWEEK(pickup_datetime) AS day_of_week,
  HOUR(pickup_datetime)  AS hour_of_day,
  ROUND(AVG(total_amount), 2) AS avg_amount_per_trip
FROM taxi_trips_cleaned
GROUP BY
  taxi_color, pickup_borough, dropoff_borough,
  YEAR(pickup_datetime), MONTH(pickup_datetime),
  DAYOFWEEK(pickup_datetime), HOUR(pickup_datetime)
ORDER BY year, month, day_of_week, hour_of_day;


##**Q3d) Total amount paid (sum of total_amount)**

In [0]:
%sql
SELECT
  taxi_color,
  pickup_borough,
  dropoff_borough,
  YEAR(pickup_datetime)  AS year,
  MONTH(pickup_datetime) AS month,
  DAYOFWEEK(pickup_datetime) AS day_of_week,
  HOUR(pickup_datetime)  AS hour_of_day,
  ROUND(SUM(total_amount), 2) AS total_amount_paid
FROM taxi_trips_cleaned
GROUP BY
  taxi_color, pickup_borough, dropoff_borough,
  YEAR(pickup_datetime), MONTH(pickup_datetime),
  DAYOFWEEK(pickup_datetime), HOUR(pickup_datetime)
ORDER BY year, month, day_of_week, hour_of_day;


###**Question 3 in single data frame**

In [0]:
%sql
SELECT
  taxi_color,
  pickup_borough,
  dropoff_borough,
  YEAR(pickup_datetime)  AS year,
  MONTH(pickup_datetime) AS month,
  DAYOFWEEK(pickup_datetime) AS day_of_week,  
  HOUR(pickup_datetime)  AS hour_of_day,

  COUNT(*)                         AS total_trips,          
  ROUND(AVG(trip_distance), 2)     AS avg_distance_km,      
  ROUND(AVG(total_amount), 2)      AS avg_amount_per_trip,  
  ROUND(SUM(total_amount), 2)      AS total_amount_paid     

FROM taxi_trips_cleaned
GROUP BY
  taxi_color, pickup_borough, dropoff_borough,
  YEAR(pickup_datetime), MONTH(pickup_datetime),
  DAYOFWEEK(pickup_datetime), HOUR(pickup_datetime)
ORDER BY year, month, day_of_week, hour_of_day, total_trips DESC;


**From 2009 to 2024, the average passenger count per trip remained remarkably stable, fluctuating only slightly between 1.57 and 1.65 passengers. The overall mean stayed close to 1.6 passengers per trip, confirming that most rides are taken by individual passengers, with occasional shared rides. This consistency suggests little impact from carpooling or ride-sharing policies, highlighting that the taxi system is overwhelmingly used for solo or small-party trips.**

##**Question 4) compute the total revenue  by the Top 10 pickup→dropoff borough pairs (ranked by total_amount).**

In [0]:
%sql
WITH revenue_by_pair AS (
    SELECT
        pickup_borough,
        dropoff_borough,
        ROUND(SUM(total_amount), 2) AS total_revenue
    FROM taxi_trips_cleaned
    WHERE YEAR(pickup_datetime) = 2024
    GROUP BY pickup_borough, dropoff_borough
),
ranked_pairs AS (
    SELECT
        pickup_borough,
        dropoff_borough,
        total_revenue,
        RANK() OVER (ORDER BY total_revenue DESC) AS rank
    FROM revenue_by_pair
),
total_2024 AS (
    SELECT SUM(total_revenue) AS overall_revenue
    FROM revenue_by_pair
)
SELECT
    r.rank,
    r.pickup_borough,
    r.dropoff_borough,
    r.total_revenue,
    ROUND((r.total_revenue / t.overall_revenue) * 100, 2) AS revenue_share_percentage
FROM ranked_pairs r
CROSS JOIN total_2024 t
WHERE r.rank <= 10
ORDER BY r.rank;


**In 2024, revenue was highly concentrated among a few borough-to-borough routes. Manhattan → Manhattan trips dominated with $708.6M (62.6% of total revenue), followed by Queens → Manhattan ($171.7M, 15.2%) and Manhattan → Queens ($73.8M, 6.5%). Other notable flows included Queens → Brooklyn (3.4%), Manhattan → Brooklyn (2.9%), and Queens → Queens (2.8%). The remaining top pairs, such as Queens → Unknown, Manhattan → Newark Airport (EWR), and Brooklyn trips, each contributed around 1% or less. Altogether, the top 10 borough pairs accounted for 97.2% of total taxi revenue in 2024, underscoring the dominance of Manhattan-centric and airport-related trips in the industry.**

###**Q5) What was the percentage of trips where drivers received tips?**

In [0]:
%sql
WITH stats AS (
    SELECT
        COUNT(*) AS total_trips,
        SUM(CASE WHEN tip_amount > 0 THEN 1 ELSE 0 END) AS trips_with_tips
    FROM taxi_trips_cleaned
)
SELECT
    trips_with_tips,
    total_trips,
    ROUND((trips_with_tips * 100.0) / total_trips, 2) AS pct_with_tips
FROM stats;


**Out of 977.3 million total trips, about 613.9 million (62.8%) included a tip. This confirms that tipping is common practice in New York taxis, though more than one-third of trips still generated no gratuity, showing notable variation in passenger tipping behavior.**

###**Q6) For trips where the driver received tips, what was the percentage where the driver received tips of at least $15**

In [0]:
%sql
WITH stats AS (
    SELECT
        SUM(CASE WHEN tip_amount > 0 THEN 1 ELSE 0 END) AS trips_with_tips,
        SUM(CASE WHEN tip_amount >= 15 THEN 1 ELSE 0 END) AS trips_with_tips_15
    FROM taxi_trips_cleaned
)
SELECT
    trips_with_tips,
    trips_with_tips_15,
    ROUND((trips_with_tips_15 * 100.0) / trips_with_tips, 2) AS pct_tips_over_15
FROM stats;


**Out of 613.9 million trips with tips, only 5.1 million (0.83%) involved tips of $15 or higher. This shows that while tipping is common, large tips are extremely rare, making up less than 1% of all tipped trips. Most tips remain modest, reinforcing that high-value gratuities contribute little to overall driver income compared to the volume of smaller, more frequent tips.**

##**Q7) Classify each trip into bins of durations:**
a.Under 50 Mins 
b.From 50 mins to 100 mins
c.From 10 mins to 200 mins
d.From 200 mins to 300 mins
e.From 300 mins to 600 mins
f.At least 600 mins 

Then for each bins, calculate: 
Average speed (km per hour)
Average distance per aud dollar(km per $)

In [0]:
%sql
WITH trip_bins AS (
    SELECT
        CASE
            WHEN duration_sec < 300 THEN 'Under 5 mins'
            WHEN duration_sec BETWEEN 300 AND 600 THEN '5–10 mins'
            WHEN duration_sec BETWEEN 600 AND 1200 THEN '10–20 mins'
            WHEN duration_sec BETWEEN 1200 AND 1800 THEN '20–30 mins'
            WHEN duration_sec BETWEEN 1800 AND 3600 THEN '30–60 mins'
            ELSE '60+ mins'
        END AS duration_bin,
        trip_distance,
        total_amount,
        speed_kph,
        duration_sec
    FROM taxi_trips_cleaned
    WHERE duration_sec IS NOT NULL
      AND trip_distance > 0
      AND total_amount > 0
)
SELECT
    duration_bin,
    COUNT(*) AS total_trips,
    ROUND(AVG(speed_kph), 2) AS avg_speed_kph,
    ROUND(AVG(CASE WHEN total_amount >= 1 THEN trip_distance / total_amount END), 2) AS avg_distance_per_dollar,
    ROUND(AVG(total_amount), 2) AS avg_fare_per_trip,
    ROUND(AVG(total_amount / (duration_sec / 60.0)), 2) AS avg_fare_per_minute
FROM trip_bins
GROUP BY duration_bin
ORDER BY 
    CASE duration_bin
        WHEN 'Under 5 mins' THEN 1
        WHEN '5–10 mins' THEN 2
        WHEN '10–20 mins' THEN 3
        WHEN '20–30 mins' THEN 4
        WHEN '30–60 mins' THEN 5
        ELSE 6
    END;


**The duration bin analysis shows that very short trips (<5 mins) yield the highest efficiency at $2.34 per minute but only about $7 per trip, making them unsustainable for consistent income. Longer trips (30+ mins) generate higher absolute fares of $47–$71 but at lower efficiency (≤$1.25 per minute) and with significantly less demand. Medium-duration trips between 10–30 minutes strike the best balance, offering average fares of $16–$28, stable efficiency around $1.15 per minute, and the highest overall demand, making them the most practical category for both drivers and passengers.**

##**Q8) Which duration bin will you advise a taxi driver to target to maximise his income?**

In [0]:
%sql
WITH trip_bins AS (
    SELECT
        CASE
            WHEN duration_sec < 300 THEN 'Under 5 mins'
            WHEN duration_sec BETWEEN 300 AND 600 THEN '5–10 mins'
            WHEN duration_sec BETWEEN 600 AND 1200 THEN '10–20 mins'
            WHEN duration_sec BETWEEN 1200 AND 1800 THEN '20–30 mins'
            WHEN duration_sec BETWEEN 1800 AND 3600 THEN '30–60 mins'
            ELSE '60+ mins'
        END AS duration_bin,
        trip_distance,
        total_amount,
        duration_sec
    FROM taxi_trips_cleaned
    WHERE duration_sec IS NOT NULL
      AND trip_distance > 0
      AND total_amount >= 1   
)
SELECT
    duration_bin,
    COUNT(*) AS total_trips,
    ROUND(AVG(total_amount), 2) AS avg_fare_per_trip,
    ROUND(AVG(total_amount / (duration_sec / 60.0)), 2) AS avg_fare_per_minute
FROM trip_bins
GROUP BY duration_bin
ORDER BY 
    CASE duration_bin
        WHEN 'Under 5 mins' THEN 1
        WHEN '5–10 mins' THEN 2
        WHEN '10–20 mins' THEN 3
        WHEN '20–30 mins' THEN 4
        WHEN '30–60 mins' THEN 5
        ELSE 6
    END;


##
**While the highest earning efficiency appears in very short trips (<5 mins, $2.34/min), these trips only yield $7 on average and are not sustainable for maximizing daily income. Longer trips (>30 mins) provide higher totals but reduce turnover and have lower demand. The 10–20 mins bin strikes the best balance: with 342M trips, an average fare of $16.41, and sustainable efficiency ($1.16/min). Therefore, targeting 10–20 minute trips maximizes income potential for drivers.**

In [0]:
taxi_trips_cleaned = spark.table("taxi_trips_cleaned")

#**Part 3. ML Implementation** 

In [0]:
import pandas as pd
import numpy as np


In [0]:
pdf = taxi_trips_cleaned.sample(fraction=0.01, seed=42).toPandas()
print("Shape of sampled data:", pdf.shape)

In [0]:
corr = pdf.corr(numeric_only=True)  
corr_target = corr["total_amount"].sort_values(ascending=False)

print(corr_target)


In [0]:
pdf["pickup_datetime"] = pd.to_datetime(pdf["pickup_datetime"], errors="coerce")
pdf["pickup_year"] = pdf["pickup_datetime"].dt.year
pdf["pickup_month"] = pdf["pickup_datetime"].dt.month
pdf["pickup_day"] = pdf["pickup_datetime"].dt.day
pdf["pickup_hour"] = pdf["pickup_datetime"].dt.hour

In [0]:

mask_test = (pdf["pickup_year"] == 2024) & (pdf["pickup_month"].isin([10, 11, 12]))

train_data = pdf[~mask_test].copy()
test_data  = pdf[mask_test].copy()

print("Train size:", train_data.shape, "Test size:", test_data.shape)


In [0]:
from sklearn.impute import SimpleImputer

features = [
    "trip_distance", "duration_sec", "tip_amount",
    "speed_kph", "airport_fee", "improvement_surcharge"
]


train_data["fare_efficiency"] = train_data["tip_amount"] / (train_data["trip_distance"] + 1e-5)
train_data["duration_per_km"] = train_data["duration_sec"] / (train_data["trip_distance"] + 1e-5)

test_data["fare_efficiency"] = test_data["tip_amount"] / (test_data["trip_distance"] + 1e-5)
test_data["duration_per_km"] = test_data["duration_sec"] / (test_data["trip_distance"] + 1e-5)

features += ["fare_efficiency", "duration_per_km"]

X_train = train_data[features]
y_train = train_data["total_amount"]

X_test = test_data[features]
y_test = test_data["total_amount"]


imputer = SimpleImputer(strategy="median")
X_train_imputed = imputer.fit_transform(X_train)
X_test_imputed  = imputer.transform(X_test)

y_imputer = SimpleImputer(strategy="median")
y_train = y_imputer.fit_transform(y_train.values.reshape(-1, 1)).ravel()
y_test  = y_imputer.transform(y_test.values.reshape(-1, 1)).ravel()


In [0]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error


pdf["pickup_datetime"] = pd.to_datetime(pdf["pickup_datetime"])


pdf["pickup_year"] = pdf["pickup_datetime"].dt.year
pdf["pickup_month"] = pdf["pickup_datetime"].dt.month
pdf["pickup_dow"] = pdf["pickup_datetime"].dt.dayofweek
pdf["pickup_hour"] = pdf["pickup_datetime"].dt.hour

features = [
    "trip_distance", "duration_sec", "tip_amount", "speed_kph",
    "airport_fee", "improvement_surcharge", "extra",
    "pickup_month", "pickup_dow", "pickup_hour",
    "PULocationID", "DOLocationID"
]

pdf["fare_efficiency"] = pdf["tip_amount"] / (pdf["trip_distance"].replace(0, np.nan))
pdf["duration_per_km"] = pdf["duration_sec"] / (pdf["trip_distance"].replace(0, np.nan))

features.extend(["fare_efficiency", "duration_per_km"])

X = pdf[features]
y = pdf["total_amount"]

mask_test = (pdf["pickup_year"] == 2024) & (pdf["pickup_month"].isin([10, 11, 12]))
X_train, X_test = X[~mask_test], X[mask_test]
y_train, y_test = y[~mask_test], y[mask_test]

print("Train size:", X_train.shape, " Test size:", X_test.shape)

imputer = SimpleImputer(strategy="median")
X_train_imputed = imputer.fit_transform(X_train)
X_test_imputed  = imputer.transform(X_test)

y_imputer = SimpleImputer(strategy="median")
y_train = y_imputer.fit_transform(y_train.values.reshape(-1, 1)).ravel()
y_test  = y_imputer.transform(y_test.values.reshape(-1, 1)).ravel()

y_pred_baseline = np.full_like(y_test, y_train.mean(), dtype=float)
rmse_baseline = np.sqrt(mean_squared_error(y_test, y_pred_baseline))
print("Baseline RMSE:", rmse_baseline)

lr = LinearRegression()
lr.fit(X_train_imputed, y_train)
y_pred_lr = lr.predict(X_test_imputed)
rmse_lr = np.sqrt(mean_squared_error(y_test, y_pred_lr))
print("Linear Regression RMSE:", rmse_lr)

rf = RandomForestRegressor(n_estimators=100, max_depth=12, random_state=42, n_jobs=-1)
rf.fit(X_train_imputed, y_train)
y_pred_rf = rf.predict(X_test_imputed)
rmse_rf = np.sqrt(mean_squared_error(y_test, y_pred_rf))
print("Random Forest RMSE:", rmse_rf)

results = {
    "Baseline": rmse_baseline,
    "Linear Regression": rmse_lr,
    "Random Forest": rmse_rf
}
print("\nRMSE Results:", results)
best_model = min(results, key=results.get)
print(f"\nBest Model: {best_model} (lowest RMSE)")


In [0]:
mask_oct_dec = (pdf["pickup_year"] == 2024) & (pdf["pickup_month"].isin([10, 11, 12]))

X_holdout = pdf.loc[mask_oct_dec, features]
y_holdout = pdf.loc[mask_oct_dec, "total_amount"]

X_holdout_imputed = imputer.transform(X_holdout)
y_holdout = y_imputer.transform(y_holdout.values.reshape(-1, 1)).ravel()

y_pred_holdout = rf.predict(X_holdout_imputed)
rmse_holdout = np.sqrt(mean_squared_error(y_holdout, y_pred_holdout))

print("Final Holdout RMSE on Oct–Dec 2024:", rmse_holdout)


###
The baseline model, which predicted the mean fare for every trip, produced a high RMSE of 25.9, showing it is ineffective for capturing the variability in taxi fares. Linear Regression improved performance significantly, reducing RMSE to 10.0, but it still struggled to capture nonlinear relationships in the data. The Random Forest model delivered the best results with an RMSE of 4.6, representing a ~82% reduction in error compared to the baseline. This demonstrates that ensemble tree-based models are far more effective for predicting taxi fares, as they capture complex interactions between distance, duration, and other trip features.

##Q1 – Data Cleaning

We implemented strict but reasonable filters on duration, distance, speed, passenger counts, and fare logic. The final dataset retained ~98.7% of trips, well within the 10% removal limit. This ensures data reliability without excessive loss, while eliminating corrupted future dates, unrealistic speeds (>120 km/h), and zero/negative fares.

##Q2 – Trip Characteristics

Analysis of average trip duration, distance, and speed by taxi color showed that green and yellow taxis are broadly similar, with average trip speeds around 12 km/h and median distances around 2 km. This confirms consistency in operating conditions across both fleets.

In [0]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

pdf = taxi_trips_cleaned.sample(fraction=0.01, seed=42).toPandas()
# Average distance, duration, speed by taxi_color
q2_summary = pdf.groupby("taxi_color").agg({
    "trip_distance": "mean",
    "duration_sec": lambda x: (x/60).mean(),
    "speed_kph": "mean"
}).reset_index()

q2_summary.rename(columns={"duration_sec": "avg_duration_min"}, inplace=True)

fig, ax = plt.subplots(1, 3, figsize=(15, 5))

# Distance
ax[0].bar(q2_summary["taxi_color"], q2_summary["trip_distance"], color=["green", "yellow"])
ax[0].set_title("Avg Distance (km)")
ax[0].set_ylabel("km")

# Duration
ax[1].bar(q2_summary["taxi_color"], q2_summary["avg_duration_min"], color=["green", "yellow"])
ax[1].set_title("Avg Duration (min)")

# Speed
ax[2].bar(q2_summary["taxi_color"], q2_summary["speed_kph"], color=["green", "yellow"])
ax[2].set_title("Avg Speed (kph)")

plt.suptitle("Trip Characteristics by Taxi Color", fontsize=14)
plt.show()


##Q3 – Trip Distributions

Trip distributions reveal strong urban mobility patterns: the majority of rides are short (median ~11 minutes, ~2 km). This reflects NYC’s dense geography and high reliance on taxis for short intra-borough trips. Outlier trips (up to 200 minutes / ~100 km) exist but are rare and handled in cleaning.

##Q4 – Revenue by Borough Flows

Revenue is highly concentrated in Manhattan, with Manhattan→Manhattan trips contributing 62% of total revenue, followed by Queens→Manhattan (15%) and Manhattan→Queens (6.5%). The top 10 borough flows account for >97% of total revenue, confirming Manhattan’s dominance in the taxi economy.

In [0]:
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import seaborn as sns

q4_summary = (
    taxi_trips_cleaned
    .groupBy("pickup_borough", "dropoff_borough")
    .agg(F.round(F.sum("total_amount"), 2).alias("total_revenue"))
)

total_revenue = q4_summary.agg(F.sum("total_revenue")).collect()[0][0]

q4_summary = (
    q4_summary.withColumn(
        "revenue_share_percentage",
        (F.col("total_revenue") / F.lit(total_revenue)) * 100
    )
    .orderBy(F.desc("total_revenue"))
)

q4_summary.show(10, truncate=False)  


top10_flows = q4_summary.limit(10).toPandas()

plt.figure(figsize=(12, 6))
sns.barplot(
    data=top10_flows,
    x="total_revenue",
    y=top10_flows["pickup_borough"] + " → " + top10_flows["dropoff_borough"],
    palette="viridis"
)

for i, (val, pct) in enumerate(zip(top10_flows["total_revenue"], top10_flows["revenue_share_percentage"])):
    plt.text(val, i, f"{pct:.2f}%", va="center", ha="left", fontsize=10)

plt.title("Top 10 Borough-to-Borough Flows by Revenue", fontsize=14)
plt.xlabel("Total Revenue ($)")
plt.ylabel("Borough Flow")
plt.tight_layout()
plt.show()


###Q5 – Trips with Tips

About 62.8% of trips include tips, indicating strong tipping culture in NYC taxis. This highlights that nearly two-thirds of passengers add gratuity, which significantly contributes to driver earnings.

##Q6 – High Tipping Trips

Of all tipped trips, only 0.83% had tips exceeding 15% of fare, showing that while tipping is common, generous tipping is rare. Most passengers tip modestly, aligning with expected urban behavior.

##Q7 – Duration-Based Fare Analysis

Short trips less than 10 mins dominate in volume, with higher fares per minute due to base charges. As trip duration increases, fares per trip rise, but fares per minute decrease, reflecting economies of scale. This pattern confirms logical pricing behavior: longer trips are cheaper on a per-minute basis.

In [0]:
from pyspark.sql.functions import when


taxi_trips_binned = taxi_trips_cleaned.withColumn(
    "duration_bin",
    when(F.col("duration_sec") < 300, "Under 5 mins")
    .when((F.col("duration_sec") >= 300) & (F.col("duration_sec") < 600), "5–10 mins")
    .when((F.col("duration_sec") >= 600) & (F.col("duration_sec") < 1200), "10–20 mins")
    .when((F.col("duration_sec") >= 1200) & (F.col("duration_sec") < 1800), "20–30 mins")
    .when((F.col("duration_sec") >= 1800) & (F.col("duration_sec") < 3600), "30–60 mins")
    .otherwise("60+ mins")
)

q7_summary = (
    taxi_trips_binned
    .groupBy("duration_bin")
    .agg(
        F.count("*").alias("total_trips"),
        F.round(F.avg("total_amount"), 2).alias("avg_fare_per_trip"),
        F.round((F.avg("total_amount") / (F.avg("duration_sec")/60)), 2).alias("avg_fare_per_minute")
    )
    .orderBy("duration_bin")
)

plt.figure(figsize=(10, 6))
sns.barplot(
    data=q7_summary.toPandas(),
    x="duration_bin",
    y="avg_fare_per_trip",
    palette="magma"
)

plt.title("Avg Fare per Trip across Duration Bins", fontsize=14)
plt.xlabel("Trip Duration Bin")
plt.ylabel("Avg Fare ($)")
plt.show()



##Q8 – Predictive Modelling

We compared a baseline model, Linear Regression, and Random Forest.

Baseline RMSE: ~25.9

Linear Regression RMSE: ~10.0

Random Forest RMSE: ~4.6

The Random Forest model clearly outperformed others, capturing non-linear relationships (distance, duration, surcharges) better than linear methods. This demonstrates the value of ensemble learning for taxi fare prediction.