#NYC Taxi Intelligence Analytical Engineering Pipeline



>**Environment & Cloud Setup**.

This block initializes the analytical environment by connecting to an in-memory DuckDB instance and loading the httpfs extension. The extension is critical because it allows the engine to stream data directly from remote cloud storage using HTTP/S protocols. By setting up Pydantic alongside DuckDB, the project establishes a hybrid architecture that combines Python’s strict data validation with SQL’s processing power.



In [1]:
import duckdb
import pandas as pd
import time
from pydantic import BaseModel, Field, ValidationError
from typing import List

con = duckdb.connect(database=':memory:')
con.execute("INSTALL httpfs; LOAD httpfs;")

<duckdb.duckdb.DuckDBPyConnection at 0x78ba6a7a1af0>


> **Data Contract (Schema Guard)**.

This cell implements a "Contract-First" engineering approach by defining a Pydantic schema to enforce data types and business constraints on the incoming data. Since processing millions of rows through Python is inefficient, the script validates a 100-row sample to ensure the remote source hasn't undergone an unexpected schema change. This serves as a defensive layer that prevents "garbage" data from entering the warehouse and breaking downstream transformations.



In [2]:
# Define the contract
class TaxiTripContract(BaseModel):
    VendorID: int
    tpep_pickup_datetime: object
    trip_distance: float = Field(ge=0) # Must be >= 0
    fare_amount: float
    tip_amount: float

# 1. Fetch a 100-row sample from the cloud
remote_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet'
sample_df = con.sql(f"SELECT * FROM '{remote_url}' LIMIT 100").df()

# 2. Validate the sample
try:
    [TaxiTripContract(**r) for r in sample_df.to_dict(orient='records')]
    print("✅ Schema Contract Verified: Cloud data matches expected types.")
except ValidationError as e:
    print(f"❌ Schema Contract Violated: {e}")

Schema Contract Verified.




> **Bronze Layer (Remote Ingestion)**.

The Bronze layer establishes a virtual link to the NYC Taxi Parquet file hosted on a public cloud bucket without downloading the actual data to disk. By creating a view rather than a table, the project demonstrates a "Zero-ETL" pattern which minimizes storage costs and maximizes agility.


In [3]:
con.execute(f"CREATE VIEW bronze_taxi AS SELECT * FROM '{remote_url}';")
print(f"Bronze View Linked. Total potential rows: {con.sql('SELECT count(*) FROM bronze_taxi').fetchone()[0]}")

Bronze View Linked. Total potential rows: 2964624



> **Silver Layer (Cleaning & Filtering)**.

In the Silver layer, raw data is transformed into a clean, typed format suitable for internal analysis. This block handles critical "Data Wrangling" tasks, such as casting strings into Timestamps and calculating total revenue by summing fares and tips. It also implements foundational data filters, removing records with impossible values (like zero distance) to ensure the integrity of the analytical dataset.



In [4]:
con.execute("""
CREATE TABLE silver_taxi AS
SELECT
    VendorID,
    tpep_pickup_datetime::TIMESTAMP as pickup_time,
    tpep_dropoff_datetime::TIMESTAMP as dropoff_time,
    trip_distance,
    fare_amount,
    tip_amount,
    (fare_amount + tip_amount) as total_revenue
FROM bronze_taxi
WHERE trip_distance > 0
  AND fare_amount > 0
""")

<duckdb.duckdb.DuckDBPyConnection at 0x78ba6a7a1af0>



> **Gold Layer (Analytical Modeling)**.

The Gold layer materializes the final business "Marts" where raw numbers are turned into actionable insights. Using SQL Window Functions, the code calculates complex metrics like "Revenue per Mile" and compares each vendor's performance against a rolling market average.




In [6]:
con.execute("""
CREATE OR REPLACE TABLE mart_vendor_efficiency AS
WITH daily_metrics AS (
    SELECT
        VendorID,
        pickup_time::DATE as service_date,
        SUM(total_revenue) as daily_rev,
        SUM(trip_distance) as daily_dist
    FROM silver_taxi
    GROUP BY ALL
)
SELECT
    *,
    daily_rev / NULLIF(daily_dist, 0) as rev_per_mile,
    AVG(daily_rev / NULLIF(daily_dist, 0)) OVER(PARTITION BY service_date) as market_avg_rev_per_mile
FROM daily_metrics
""")

# The fix: Use max_rows=5
con.table("mart_vendor_efficiency").show(max_rows=5)

┌──────────┬──────────────┬────────────────────┬────────────────────┬───────────────────┬─────────────────────────┐
│ VendorID │ service_date │     daily_rev      │     daily_dist     │   rev_per_mile    │ market_avg_rev_per_mile │
│  int32   │     date     │       double       │       double       │      double       │         double          │
├──────────┼──────────────┼────────────────────┼────────────────────┼───────────────────┼─────────────────────────┤
│        2 │ 2024-01-08   │ 1331720.1199999903 │ 209850.29999999993 │ 6.346048206745431 │        5.63428376795682 │
│        1 │ 2024-01-08   │ 421069.22000000125 │  67581.30000000002 │ 6.230558157360115 │        5.63428376795682 │
│        6 │ 2024-01-08   │  887.0100000000001 │             205.03 │ 4.326244939764913 │        5.63428376795682 │
│        · │     ·        │          ·         │                ·   │         ·         │                ·        │
│        · │     ·        │          ·         │                ·   │   


> **Data Quality Testing**.

This block functions as an automated Quality Gate by running a suite of dbt-style integrity tests against the final output. It checks for critical failures such as null values in primary columns, division-by-zero errors, and suspicious outliers that exceed realistic business bounds.


In [7]:
tests = {
    "Zero Distance Check": "SELECT COUNT(*) FROM mart_vendor_efficiency WHERE daily_dist = 0",
    "Null Revenue Check": "SELECT COUNT(*) FROM mart_vendor_efficiency WHERE daily_rev IS NULL",
    "Market Comparison Check": "SELECT COUNT(*) FROM mart_vendor_efficiency WHERE rev_per_mile > (market_avg_rev_per_mile * 10)"
}

for name, query in tests.items():
    err_count = con.sql(query).fetchone()[0]
    print(f"{'✅' if err_count == 0 else '❌'} {name}: {err_count} errors")

✅ Zero Distance Check: 0 errors
✅ Null Revenue Check: 0 errors
✅ Market Comparison Check: 0 errors



> **Performance Benchmarking**.

This final block provides a technical audit of the system’s efficiency by timing the execution of complex aggregations across millions of rows. This proves that the chosen stack—DuckDB and Parquet—is significantly faster and more scalable than traditional CSV-based workflows. These benchmakrs are provided to show a conscious thought of compute costs and pipeline latency.



In [8]:
import time
start = time.time()
con.sql("SELECT VendorID, AVG(total_revenue) FROM silver_taxi GROUP BY 1").show(max_rows=10)
print(f"Aggregation completed in {round(time.time() - start, 4)} seconds.")

┌──────────┬────────────────────┐
│ VendorID │ avg(total_revenue) │
│  int32   │       double       │
├──────────┼────────────────────┤
│        1 │   20.5866409254181 │
│        2 │ 22.318927916087564 │
│        6 │  46.89934615384613 │
└──────────┴────────────────────┘

Aggregation completed in 0.0339 seconds.


###Cloud Infrastructure Analysis Demo

> **Visualization (Client Explanation).**

But how do we explain this to non technical clients/customers? This visualization acts as the "final reveal" of the project, taking millions of rows of raw, messy taxi logs and turning them into a clear story about market share and revenue. To make this look professional, I used a 3-day rolling average to smooth out the chaotic daily spikes, making it much easier for a client to spot the actual growth trends instead of getting lost in the noise.

I also implemented a strict date filter to cut out the "ghost data" caused by glitchy taxi meters, ensuring the timeline doesn't stretch across empty decades. By using a Stacked Area Chart with a unified hover feature, anyone looking at the dashboard can instantly see the total market volume while simultaneously comparing how much of that "pie" each vendor owns, proving that you can bridge the gap between deep-tier engineering and high-level business strategy.



In [12]:
import plotly.express as px

viz_df = con.sql("""
    SELECT
        service_date,
        VendorID::VARCHAR as Vendor,
        -- Calculate 3-day rolling average to smooth out daily volatility
        AVG(daily_rev) OVER (
            PARTITION BY VendorID
            ORDER BY service_date
            ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
        ) as smoothed_revenue,
        daily_dist
    FROM mart_vendor_efficiency
    WHERE service_date BETWEEN '2024-01-01' AND '2024-01-31'
    ORDER BY 1
""").df()

# 2. Area Chart with a clean professional color palette
fig = px.area(
    viz_df,
    x="service_date",
    y="smoothed_revenue",
    color="Vendor",
    title="<b>Cloud Infrastructure Spend: Market Share Analysis</b><br><sup>January 2024 Operations - 3-Day Rolling Average</sup>",
    labels={
        "smoothed_revenue": "Daily Revenue ($)",
        "service_date": "Timeline",
        "Vendor": "Vendor ID"
    },
    color_discrete_sequence=px.colors.qualitative.Bold
)

fig.update_layout(
    hovermode="x unified",
    yaxis_tickprefix="$",
    xaxis_range=['2024-01-01', '2024-01-31'], # Force focus on the valid data
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
    template="plotly_white"
)

fig.show()