# üß≠ NYC Yellow Taxi Data Analysis with Spark (PySpark)

This notebook reads the **NYC Yellow Taxi dataset** directly from **MinIO/S3 (s3a://)** in **Parquet** format and performs sampling + EDA with **Spark**.

Dataset path used:
- `s3a://okdp/examples/data/raw/tripdata/yellow/`


## üìò Notebook Documentation (README)

### What this notebook does
This notebook demonstrates a practical Spark-based EDA workflow on the NYC Yellow Taxi dataset stored in **Parquet** on **S3-compatible storage (MinIO)**.

You will:
- Configure Spark to access MinIO via `s3a://`
- Load Parquet into a Spark DataFrame
- Inspect schema with both DataFrame and SQL approaches
- Build a **balanced per-day random sample** (to reduce temporal skew)
- Engineer time features (hour of day, day of week)
- Run simple data quality checks
- Perform aggregations and visualize results with **Altair** (via small Pandas extracts)

### Why balanced sampling?
Simple random sampling can over-represent high-volume days. The per-day window-based sampling step aims to keep representation more even across days.

### Key constraints
- **Avoid converting large Spark DataFrames to Pandas.** Only convert small samples/aggregations.

### Inputs
- Parquet path: `s3a://okdp/examples/data/raw/tripdata/yellow/`
- Credentials from mounted files:
  - `/var/run/secrets/examples/s3/S3_ACCESS_KEY`
  - `/var/run/secrets/examples/s3/S3_SECRET_KEY`


## üõ†Ô∏è 1. Setup and Imports

Imports used in this notebook:
- **SparkSession / functions / Window**: core Spark APIs
- **pandas**: used only for small extracts to enable tabular visualization
- **altair**: interactive charts

Notes:
- Keep visualization datasets small to avoid driver memory pressure.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import pandas as pd
import altair as alt

alt.renderers.enable('default')

## üõ†Ô∏è 2. Initialize Spark Session

This SparkSession is configured for **MinIO/S3A** access:
- `spark.hadoop.fs.s3a.endpoint`: MinIO endpoint URL
- `SimpleAWSCredentialsProvider`: uses explicit access/secret key values
- `path.style.access=true`: important for many MinIO setups

If you move to production, you typically:
- use IAM roles / workload identity instead of static keys


In [None]:
from pathlib import Path

s3AccessKey = Path("/var/run/secrets/examples/s3/S3_ACCESS_KEY").read_text().strip()
s3SecretKey = Path("/var/run/secrets/examples/s3/S3_SECRET_KEY").read_text().strip()

spark = (
    SparkSession.builder
    .appName("NYC Tripdata ‚Äî PySpark")
    .config("spark.executor.memory", "2000M")
    .config("spark.executor.cores", "2")
    .config("spark.executor.instances", "1")
    .config("spark.hadoop.fs.s3a.endpoint", "https://minio-default.okdp.sandbox:443")
    .config("spark.hadoop.fs.s3a.access.key", s3AccessKey)
    .config("spark.hadoop.fs.s3a.secret.key", s3SecretKey)
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.maximum", "100")
    .getOrCreate()
)

## üì¶ 2. Load Yellow dataset (Parquet) from S3A

We load the Parquet dataset into a Spark DataFrame.

Tips:
- `printSchema()` is a fast way to validate expected columns and types.
- `limit(...).toPandas()` is safe only for small previews.


In [None]:
df = spark.read.parquet(f"s3a://okdp/examples/data/raw/tripdata/yellow/")
df.printSchema()
df.limit(5).toPandas()

## üß± 3. Inspect Table Schema (DESCRIBE TABLE equivalent)

This section creates a temporary SQL view and runs `DESCRIBE TABLE`.
It is useful when you want a SQL-native workflow (similar to Trino/Hive).


In [None]:
df.createOrReplaceTempView("yellow")
spark.sql("DESCRIBE TABLE yellow").show(truncate=False)

## üîç 4. Query a Random Sample (Daily Balanced Sampling)

Goal: build a small dataset that is still representative across time.

Approach:
- Filter to months **2025-01..2025-03**
- Use a window partitioned by pickup date
- Assign a random order and take up to **100 trips per day**
- Cap at **3000 rows** to keep downstream operations (Pandas/Altair) lightweight


In [None]:
months = ["2025-01", "2025-02", "2025-03"]

# Filter to Q1 2025
base = df.where(F.col("month").isin(months))

# Balanced random sample: 100 rows per day
w = Window.partitionBy(F.to_date("tpep_pickup_datetime")).orderBy(F.rand())

sample = (
    base
    .withColumn("rn", F.row_number().over(w))
    .where(F.col("rn") <= 100)
    .limit(3000)
)

sample.limit(5).toPandas()

## üïí 5. Time-Based Feature Engineering

We derive time features from pickup datetime:
- `hour`: hour of day (0‚Äì23)
- `day`: weekday name (Monday..Sunday)

These features help analyze demand patterns and seasonality.


In [None]:
sample_fe = (
    sample
    .withColumn("hour", F.hour("tpep_pickup_datetime"))
    .withColumn("day", F.date_format("tpep_pickup_datetime", "EEEE"))
)

sample_fe.select("tpep_pickup_datetime", "hour", "day").show(5, truncate=False)

In [None]:
sample_fe.groupBy("day").count().orderBy(F.desc("count")).show(truncate=False)

## üßπ 6. Data Quality Check

We inspect records where `passenger_count == 0`.
Depending on your downstream needs, these may be:
- data errors
- test trips
- unusual edge cases

Then we filter to `passenger_count > 0` for cleaner EDA.


In [None]:
bad = sample_fe.where(F.col("passenger_count") == 0)

bad.select("trip_distance", "fare_amount", "total_amount").describe().show(truncate=False)

In [None]:
sample_clean = sample_fe.where(F.col("passenger_count") > 0)
sample_clean.count()

## üìä 7. Visualize Fare vs Distance (Altair)

Altair works with in-memory data, so we convert a small Spark subset to Pandas.

Visualization goal:
- Explore the relationship between `trip_distance` and `fare_amount`
- Use interactive highlighting by `passenger_count`


In [None]:
pdf = sample_clean.select(
    "tpep_pickup_datetime", "trip_distance", "fare_amount", "passenger_count"
).toPandas()

pdf.head()

In [None]:
highlight = alt.selection_point(fields=['passenger_count'], bind='legend')

chart = (
    alt.Chart(pdf)
    .mark_circle(size=40)
    .encode(
        x='trip_distance:Q',
        y='fare_amount:Q',
        color=alt.condition(
            highlight,
            alt.Color('passenger_count:O', scale=alt.Scale(scheme='tableau10')),
            alt.value('lightgray')
        ),
        tooltip=['tpep_pickup_datetime', 'trip_distance', 'fare_amount', 'passenger_count']
    )
    .add_params(highlight)
    .properties(title='NYC Yellow Taxi ‚Äî Interactive Highlight by Passenger Count')
)
chart

## ‚è∞ 8. Trips by Hour of Day

We aggregate trips by `hour` to understand hourly demand patterns.
The output is kept small and then visualized with Altair.


In [None]:
hourly = (
    sample_clean.groupBy("hour")
    .agg(F.count("*").alias("trip_count"))
    .orderBy("hour")
)

hourly.show(24, truncate=False)

In [None]:
hourly_pdf = hourly.toPandas()

alt.Chart(hourly_pdf).mark_bar().encode(
    x=alt.X('hour:O', title='Hour of Day'),
    y=alt.Y('trip_count:Q', title='Number of Trips'),
    tooltip=[
        alt.Tooltip('hour:O', title='Hour of Day'),
        alt.Tooltip('trip_count:Q', title='Trips')
    ]
).properties(
    title='NYC Trips by Hour of Day'
)

## üìÖ 9. Average Fare by Day of Week

We compute the average `fare_amount` per weekday.
Because weekday names are strings, we define a custom order for clean chart sorting.


In [None]:
daily = (
    sample_clean.groupBy("day")
    .agg(F.avg("fare_amount").alias("fare_amount"))
)

daily.show(truncate=False)

In [None]:
daily_pdf = daily.toPandas()
order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

alt.Chart(daily_pdf).mark_bar().encode(
    x=alt.X('day:N', sort=order, title='Day of Week'),
    y=alt.Y('fare_amount:Q', title='Average Fare ($)'),
    tooltip=[
        alt.Tooltip('day:N', title='Day of Week'),
        alt.Tooltip('fare_amount:Q', title='Average Fare ($)', format='.2f')
    ]
).properties(
    title='Average NYC Taxi Fare by Day of Week'
)

## üéØ 10. Random Sample (~1%) for Visualization (Spark equivalent)

Some SQL engines expose sampling like `TABLESAMPLE BERNOULLI (1)`.
In Spark, the closest equivalent is:
- `df.sample(withReplacement=False, fraction=0.01, seed=...)`

We still apply a `.limit(3000)` to keep the Pandas conversion safe.


In [None]:
viz_sample = (
    df.where(F.col("month").isin(months))
          .select("trip_distance", "fare_amount", "total_amount", "passenger_count")
          .sample(withReplacement=False, fraction=0.01, seed=42)
          .limit(3000)
)

viz_pdf = viz_sample.toPandas()
viz_pdf.head()

In [None]:
highlight = alt.selection_point(fields=['passenger_count'], bind='legend')

chart2 = (
    alt.Chart(viz_pdf)
    .mark_circle(size=40, opacity=0.7)
    .encode(
        x='trip_distance:Q',
        y='fare_amount:Q',
        color=alt.condition(
            highlight,
            alt.Color('passenger_count:O', scale=alt.Scale(scheme='tableau10')),
            alt.value('lightgray')
        ),
        tooltip=['trip_distance', 'fare_amount', 'total_amount', 'passenger_count']
    )
    .add_params(highlight)
    .properties(title='NYC Yellow Taxi ‚Äî Fare vs Distance (Interactive Sample Highlight)')
)

chart2

## üöï 11. Top Pickup‚ÄìDropoff Pairs (March 2025)

We compute the most frequent pickup/dropoff location pairs for March 2025.

Note:
- Some datasets use lowercase column names (`pulocationid`, `dolocationid`).
- If you get an AnalysisException, adjust column names accordingly.


In [None]:
# Try with standard NYC schema (camel-case). If it fails, rename to lowercase equivalents.
top_pairs = (
    df.where(F.col("month") == "2025-03")
          .groupBy("PULocationID", "DOLocationID")
          .agg(F.count("*").alias("trips"))
          .orderBy(F.desc("trips"))
          .limit(20)
)

top_pairs.show(truncate=False)

## üßæ 12. Statistical Summary

`describe()` provides quick descriptive statistics for numeric columns.
We convert to Pandas for easier display.


In [None]:
sample_clean.describe().toPandas()

## ‚öôÔ∏è 13. Parameterized Query Example (Spark)

Example of using a variable to parameterize a filter.
This pattern is useful for building reusable notebooks and jobs.


In [None]:
month = "2025-03"
df.where(F.col("month") == month).count()

## üßπ 14. Stop Spark session

Stop the Spark session to free CPU/memory and release any open connections (e.g., to S3/MinIO)

In [None]:
spark.stop()

## ‚úÖ 15. Summary
- Read Parquet data from `s3a://okdp/...` with Spark
- Per-day balanced random sampling using window functions
- Feature engineering (hour/day)
- Aggregations + Altair visualizations (via small Pandas extracts)
