In [1]:
# Cell 1: Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt

In [2]:
# Cell 2: Create Spark session
spark = SparkSession.builder \
    .appName("NYC Taxi Exploration") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .master("local[*]") \
    .getOrCreate()

print(f"âœ… Spark {spark.version} started!")
print(f"ðŸ“Š Using {spark.sparkContext.defaultParallelism} cores")

âœ… Spark 3.5.1 started!
ðŸ“Š Using 16 cores


In [3]:
# Cell 3: Load one month of data
df = spark.read.parquet("../data/raw/yellow_tripdata_2024-01.parquet")

print(f"ðŸ“¦ Loaded {df.count():,} rows")
print(f"ðŸ“‹ Columns: {len(df.columns)}")

ðŸ“¦ Loaded 2,964,624 rows
ðŸ“‹ Columns: 19


In [4]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [5]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

In [6]:
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+---------------------+------------------+--------------------+-------------------+
|summary|          VendorID|   passenger_count|     trip_distance|       RatecodeID|store_and_fwd_flag|      PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|        tip_amount|      tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        Airport_fee|
+-------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+---------------------+------------------+--------------------+----

In [7]:
from pyspark.sql.functions import col, sum as spark_sum
null_count=df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
])
null_count.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|         140162|            0|    140162|            140162|           0|           0|           0|          0|    0|      0|         

In [8]:
# Cell 8: Data quality checks

# 1. Check trip distance distribution
print("Trip Distance Statistics:")
df.select("trip_distance").summary().show()

# 2. Find suspicious trips
suspicious = df.filter(
    (col("trip_distance") < 0) |  # Negative distance
    (col("trip_distance") > 100) |  # More than 100 miles
    (col("fare_amount") < 0) |  # Negative fare
    (col("passenger_count") < 1) |  # No passengers
    (col("passenger_count") > 6)  # Too many passengers
)

print(f"ðŸš¨ Found {suspicious.count():,} suspicious trips")
suspicious.show(10)

# 3. Check payment types
payment_dist = df.groupBy("payment_type") \
    .count() \
    .orderBy(col("count").desc())

print("Payment Type Distribution:")
payment_dist.show()

# 4. Pickup hour distribution
df_with_hour = df.withColumn(
    "pickup_hour", 
    hour(col("tpep_pickup_datetime"))
)

hourly_trips = df_with_hour.groupBy("pickup_hour") \
    .count() \
    .orderBy("pickup_hour")

hourly_trips.show(24)

Trip Distance Statistics:
+-------+------------------+
|summary|     trip_distance|
+-------+------------------+
|  count|           2964624|
|   mean|3.6521691789583146|
| stddev|225.46257238220082|
|    min|               0.0|
|    25%|               1.0|
|    50%|              1.68|
|    75%|              3.11|
|    max|          312722.3|
+-------+------------------+

ðŸš¨ Found 69,022 suspicious trips
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+-----------------