# NYC Taxi Analytics with dbt + DuckDB: A Complete Walkthrough

This notebook walks through the **design decisions and thought process** behind building a production-grade dbt project from scratch. We'll connect directly to the DuckDB database that dbt built and explore every layer of the pipeline.

**What you'll learn:**
1. Why we chose DuckDB + dbt for local analytics
2. How raw NYC taxi data flows through staging → intermediate → marts
3. The thinking behind dimensional modeling (facts vs. dimensions)
4. Testing strategies for real-world messy data
5. Advanced features: incremental models, contracts, Python models, parquet exports

---

## Prerequisites

Make sure you've run the full build first:
```bash
uv sync
uv run python scripts/download_data.py
uv run python scripts/setup_project.py
cd nyc_taxi_dbt && uv run dbt build --full-refresh --profiles-dir .
```

## Setup: Connect to Our DuckDB Warehouse

In [None]:
import duckdb
from pathlib import Path

# Connect to the dbt-managed database
DB_PATH = Path("dev.duckdb")
assert DB_PATH.exists(), "Run 'make build' first to create the database"

con = duckdb.connect(str(DB_PATH), read_only=True)

# Helper to run queries and display results
def q(sql):
    """Run a SQL query and return a pandas DataFrame."""
    return con.execute(sql).fetchdf()

# What schemas did dbt create?
q("""
    SELECT schema_name, count(*) as tables
    FROM information_schema.tables
    WHERE schema_name NOT IN ('information_schema', 'pg_catalog')
    GROUP BY schema_name
    ORDER BY schema_name
""")

dbt organized our data into **separate schemas by layer**:
- `main_raw` -- Seed data (CSV reference tables loaded by `dbt seed`)
- `main_staging` -- Cleaned, renamed versions of raw data
- `main_intermediate` -- Calculated metrics and aggregations
- `main_marts` -- Business-ready facts, dimensions, and analytics
- `snapshots` -- SCD Type 2 historical tracking

This separation is a **dbt best practice**. Each layer has a single responsibility, making the pipeline easy to debug and maintain.

---

## Part 1: The Raw Data -- What Are We Working With?

### Thought Process

Before writing any dbt models, we need to understand our data. The NYC Taxi & Limousine Commission publishes trip records monthly as parquet files. January 2024 has ~3 million yellow taxi trips.

**Why parquet?** Column-oriented, compressed, includes schema metadata. DuckDB reads it natively without importing -- perfect for analytics.

**Why DuckDB?** Zero infrastructure. No Docker, no server, no credentials. Just a single file. It handles 3M rows in under 2 seconds on a laptop.

In [None]:
# Let's peek at the raw parquet file directly
raw_stats = q("""
    SELECT
        count(*) as total_rows,
        min(tpep_pickup_datetime) as earliest_pickup,
        max(tpep_pickup_datetime) as latest_pickup,
        count(DISTINCT PULocationID) as unique_pickup_zones,
        count(DISTINCT DOLocationID) as unique_dropoff_zones
    FROM read_parquet('data/yellow_tripdata_2024-01.parquet')
""")
print(f"Raw parquet: {raw_stats['total_rows'][0]:,} rows")
print(f"Date range: {raw_stats['earliest_pickup'][0]} to {raw_stats['latest_pickup'][0]}")
print(f"Unique zones: {raw_stats['unique_pickup_zones'][0]} pickup, {raw_stats['unique_dropoff_zones'][0]} dropoff")

In [None]:
# What does the raw data look like? (column names are messy!)
q("SELECT * FROM read_parquet('data/yellow_tripdata_2024-01.parquet') LIMIT 3")

**Problems we can already see:**
- Column names are inconsistent: `VendorID`, `PULocationID`, `tpep_pickup_datetime`, `Airport_fee` (mixed case)
- Location IDs are just numbers -- meaningless without a lookup table
- Payment types are coded (1, 2, 3...) -- not human-readable
- Financial amounts need rounding to 2 decimal places
- There might be null values, negative fares, or out-of-range dates

This is exactly what the **staging layer** is for.

---

## Part 2: Staging -- Clean Once, Use Everywhere

### Thought Process

The staging layer is a **contract between raw data and the rest of the pipeline**. Its jobs:
1. **Rename** columns to consistent `snake_case`
2. **Cast** types explicitly (don't trust implicit casting)
3. **Filter** obviously bad data (nulls in required fields, negative fares)
4. **Generate** surrogate keys for deduplication

The rule: **one staging model per source**. Every downstream model references staging, never raw data directly.

Here's what `stg_yellow_trips` does:

```sql
-- Key parts of stg_yellow_trips.sql
select
    -- Surrogate key for deduplication
    {{ dbt_utils.generate_surrogate_key([
        'VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
        'PULocationID', 'DOLocationID', 'fare_amount', 'total_amount'
    ]) }} as trip_id,

    -- Consistent naming + explicit casting
    cast("VendorID" as integer) as vendor_id,
    cast("PULocationID" as integer) as pickup_location_id,
    round(cast(fare_amount as decimal(10, 2)), 2) as fare_amount,
    ...
from source
where tpep_pickup_datetime is not null    -- filter bad data
  and fare_amount >= 0                     -- no negative fares
  and cast(tpep_pickup_datetime as date) >= date '2024-01-01'
```

In [None]:
# How many rows survived staging filters?
raw_count = q("SELECT count(*) as n FROM read_parquet('data/yellow_tripdata_2024-01.parquet')")['n'][0]
staged_count = q("SELECT count(*) as n FROM main_staging.stg_yellow_trips")['n'][0]
filtered = raw_count - staged_count

print(f"Raw rows:     {raw_count:>12,}")
print(f"After staging:{staged_count:>12,}")
print(f"Filtered out: {filtered:>12,} ({filtered/raw_count*100:.2f}%)")

In [None]:
# Clean column names -- compare raw vs staged
print("=== Raw Columns ===")
raw_cols = q("SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet('data/yellow_tripdata_2024-01.parquet'))")
print(", ".join(raw_cols['column_name'].tolist()))

print("\n=== Staged Columns ===")
staged_cols = q("SELECT column_name FROM information_schema.columns WHERE table_schema='main_staging' AND table_name='stg_yellow_trips' ORDER BY ordinal_position")
print(", ".join(staged_cols['column_name'].tolist()))

In [None]:
# Seed tables -- small reference CSVs loaded by dbt seed
print("=== Payment Types ===")
display(q("SELECT * FROM main_raw.payment_type_lookup ORDER BY payment_type_id"))

print("\n=== Rate Codes ===")
display(q("SELECT * FROM main_raw.rate_code_lookup ORDER BY rate_code_id"))

print("\n=== Taxi Zones (sample) ===")
display(q("SELECT * FROM main_raw.taxi_zone_lookup ORDER BY LocationID LIMIT 10"))

### Key Decision: Why Include fare_amount + total_amount in the Surrogate Key?

NYC taxi data has **adjustment records** -- the same vendor, same timestamps, same locations, but different fare amounts (one original, one corrected). Without including financial columns in the key, these would collide and we'd lose data. This is a real-world data quality issue you only discover by testing.

### Data Quality Findings
- ~37K rows with negative fares (adjustment records) -- filtered out
- ~18 rows with dates outside January 2024 -- filtered out
- ~140K rows with null passenger_count -- kept (non-critical field)
- Location IDs 264/265 exist in trips but not in zone lookup -- handled with `severity: warn`

---

## Part 3: Intermediate -- Calculate Metrics

### Thought Process

The intermediate layer does **transformations that multiple downstream models need**. Instead of calculating trip duration in 5 different places, we calculate it once here.

`int_trip_metrics` adds:
- `trip_duration_minutes` -- using a custom macro: `{{ duration_minutes('pickup_datetime', 'dropoff_datetime') }}`
- `avg_speed_mph` -- distance / (duration / 60)
- `cost_per_mile` -- fare / distance
- `tip_percentage` -- tip / fare * 100
- Time dimensions: `pickup_date`, `pickup_hour`, `pickup_day_of_week`, `is_weekend`

It also **filters impossible trips**: duration < 1 minute, > 12 hours, or speed > 100 mph.

In [None]:
# Row counts through the pipeline
counts = q("""
    SELECT
        (SELECT count(*) FROM main_staging.stg_yellow_trips) as staging,
        (SELECT count(*) FROM main_intermediate.int_trip_metrics) as intermediate,
        (SELECT count(*) FROM main_marts.fct_trips) as marts
""")

stg = counts['staging'][0]
intm = counts['intermediate'][0]
mart = counts['marts'][0]

print(f"Staging:      {stg:>12,} rows")
print(f"Intermediate: {intm:>12,} rows  (filtered {stg - intm:,} impossible trips)")
print(f"Marts:        {mart:>12,} rows  (1:1 with intermediate -- joins don't add/remove rows)")

In [None]:
# What does the enriched data look like?
q("""
    SELECT
        trip_id,
        pickup_date,
        pickup_hour,
        pickup_day_of_week,
        is_weekend,
        trip_duration_minutes,
        trip_distance_miles,
        round(avg_speed_mph, 1) as avg_speed_mph,
        fare_amount,
        tip_amount,
        round(tip_percentage, 1) as tip_pct,
        round(cost_per_mile, 2) as cost_per_mile
    FROM main_intermediate.int_trip_metrics
    WHERE trip_distance_miles > 0
    LIMIT 10
""")

In [None]:
# Distribution of calculated metrics
q("""
    SELECT
        round(avg(trip_duration_minutes), 1) as avg_duration_min,
        round(median(trip_duration_minutes), 1) as median_duration_min,
        round(avg(avg_speed_mph), 1) as avg_speed,
        round(avg(tip_percentage), 1) as avg_tip_pct,
        round(avg(cost_per_mile), 2) as avg_cost_per_mile,
        count(CASE WHEN is_weekend THEN 1 END) as weekend_trips,
        count(CASE WHEN NOT is_weekend THEN 1 END) as weekday_trips
    FROM main_intermediate.int_trip_metrics
""")

### The Pre-Aggregation Models

Two more intermediate models aggregate trip-level data:

- **`int_daily_summary`** -- One row per day (31 rows for January). Total trips, revenue, averages.
- **`int_hourly_patterns`** -- One row per date + hour (~744 rows). For understanding demand curves.

In [None]:
# Daily summary -- what does January look like?
q("""
    SELECT
        pickup_date,
        pickup_day_of_week,
        is_weekend,
        total_trips,
        round(total_revenue, 0) as total_revenue,
        round(avg_trip_distance, 1) as avg_dist,
        round(avg_tip_percentage, 1) as avg_tip_pct
    FROM main_intermediate.int_daily_summary
    ORDER BY pickup_date
""")

---

## Part 4: Marts -- The Business Layer

### Thought Process: Dimensional Modeling

The marts layer follows **dimensional modeling** (Kimball methodology):

- **Fact tables** contain events/transactions with numeric measures (fares, distances, durations)
- **Dimension tables** contain descriptive attributes for filtering and grouping (locations, dates, payment types)

This design answers: *"Who took a trip, where, when, how did they pay, and how much did it cost?"*

```
                    dim_locations
                         |
int_trip_metrics --> fct_trips <-- dim_dates
                         |
                    dim_payment_types
```

### Why Not Just One Big Table?

You could, and for a small project it works fine. But dimensional modeling:
1. **Reduces redundancy** -- zone names stored once in `dim_locations`, not 2.9M times in the fact table
2. **Enables flexible analysis** -- join any dimension to the fact table
3. **Scales** -- when you add a new dimension (e.g., `dim_weather`), you only add one join
4. **Is the industry standard** -- every analytics engineer is expected to know this pattern

In [None]:
# Explore the dimension tables
print(f"dim_locations:      {q('SELECT count(*) as n FROM main_marts.dim_locations')['n'][0]} zones")
print(f"dim_dates:          {q('SELECT count(*) as n FROM main_marts.dim_dates')['n'][0]} days")
print(f"dim_payment_types:  {q('SELECT count(*) as n FROM main_marts.dim_payment_types')['n'][0]} types")

print("\n=== dim_dates (sample) ===")
display(q("SELECT * FROM main_marts.dim_dates LIMIT 7"))

print("\n=== dim_locations (top boroughs) ===")
display(q("""
    SELECT borough, count(*) as zones
    FROM main_marts.dim_locations
    GROUP BY borough ORDER BY zones DESC
"""))

In [None]:
# The fact table -- trips enriched with location names
q("""
    SELECT
        trip_id,
        pickup_datetime,
        pickup_borough,
        pickup_zone,
        dropoff_borough,
        dropoff_zone,
        trip_duration_minutes,
        trip_distance_miles,
        fare_amount,
        tip_amount,
        total_amount
    FROM main_marts.fct_trips
    WHERE pickup_borough = 'Manhattan'
    LIMIT 10
""")

### Analytics Marts -- Pre-Aggregated for Speed

While the fact table supports any ad-hoc query, common business questions deserve **pre-computed answers**:

| Mart | Question It Answers |
|------|---------------------|
| `mart_daily_revenue` | How much revenue per day? What's the trend? |
| `mart_location_performance` | Which zones are most profitable? |
| `mart_hourly_demand` | When do people take taxis? Weekday vs weekend? |

In [None]:
# Daily revenue trend
daily = q("""
    SELECT
        date_key,
        day_of_week_name,
        is_weekend,
        total_trips,
        round(total_revenue, 0) as revenue,
        round(cumulative_revenue, 0) as cumulative,
        round(revenue_change_vs_prior_day, 0) as day_over_day
    FROM main_marts.mart_daily_revenue
    ORDER BY date_key
""")
daily

In [None]:
# Top 15 pickup zones by total pickups
q("""
    SELECT
        pickup_borough,
        pickup_zone,
        total_pickups,
        round(total_revenue, 0) as total_revenue,
        round(avg_revenue_per_trip, 2) as avg_revenue,
        round(avg_tip_pct, 1) as avg_tip_pct,
        most_common_dropoff_zone,
        peak_pickup_hour
    FROM main_marts.mart_location_performance
    ORDER BY total_pickups DESC
    LIMIT 15
""")

In [None]:
# Hourly demand: weekday vs weekend
q("""
    SELECT
        pickup_hour,
        CASE WHEN is_weekend THEN 'Weekend' ELSE 'Weekday' END as day_type,
        round(avg_trips_per_period, 0) as avg_trips,
        round(avg_revenue_per_period, 0) as avg_revenue,
        round(avg_distance, 1) as avg_distance,
        round(avg_duration_min, 1) as avg_duration
    FROM main_marts.mart_hourly_demand
    ORDER BY is_weekend, pickup_hour
""")

---

## Part 5: Testing -- How We Trust the Data

### Thought Process

Real-world data is messy. Tests are how we define and enforce our expectations. dbt supports:

| Test Type | What It Does | Example |
|-----------|-------------|----------|
| **Generic** (YAML) | Reusable checks | `not_null`, `unique`, `accepted_values` |
| **Singular** (SQL) | Custom queries that return failing rows | `assert_trip_duration_positive.sql` |
| **Unit** (YAML) | Test transformations with mock data | Verify column renaming, calculation logic |
| **Custom generic** (macro) | Your own reusable test | `test_positive_value` |

Our project has **91 tests**: 84 data tests + 7 unit tests.

In [None]:
# Let's verify key data quality expectations manually
print("=== Data Quality Checks ===")

# 1. No null trip IDs
null_ids = q("SELECT count(*) as n FROM main_staging.stg_yellow_trips WHERE trip_id IS NULL")['n'][0]
print(f"Null trip_ids in staging: {null_ids} (expected: 0)")

# 2. Trip IDs are unique
total = q("SELECT count(*) as n FROM main_staging.stg_yellow_trips")['n'][0]
distinct = q("SELECT count(DISTINCT trip_id) as n FROM main_staging.stg_yellow_trips")['n'][0]
print(f"Total rows: {total:,}, Distinct trip_ids: {distinct:,}, Duplicates: {total - distinct}")

# 3. All trip durations are positive (after intermediate filtering)
neg_dur = q("SELECT count(*) as n FROM main_intermediate.int_trip_metrics WHERE trip_duration_minutes < 0")['n'][0]
print(f"Negative durations: {neg_dur} (expected: 0)")

# 4. All fares are non-negative
neg_fares = q("SELECT count(*) as n FROM main_staging.stg_yellow_trips WHERE fare_amount < 0")['n'][0]
print(f"Negative fares in staging: {neg_fares} (expected: 0 -- filtered in staging)")

# 5. Mart contracts -- check column types
contracts = q("""
    SELECT table_name, count(*) as columns
    FROM information_schema.columns
    WHERE table_schema = 'main_marts'
    GROUP BY table_name
    ORDER BY table_name
""")
print(f"\nMart models with enforced contracts:")
for _, row in contracts.iterrows():
    print(f"  {row['table_name']}: {row['columns']} columns")

### Unit Tests: Testing Logic Without Real Data

Unit tests are powerful because they test **transformation logic in isolation**. You provide mock inputs and assert expected outputs.

Example from our project -- testing that the staging model renames columns correctly:

```yaml
# _unit_tests.yml (staging)
unit_tests:
  - name: test_stg_trips_renames_and_casts
    model: stg_yellow_trips
    given:
      - input: source('raw_nyc_taxi', 'raw_yellow_trips')
        format: sql  # needed for dbt-duckdb external sources
        rows: |
          SELECT 2 AS "VendorID", 161 AS "PULocationID", ...
    expect:
      rows:
        - {vendor_id: 2, pickup_location_id: 161, ...}
```

Key insight: For dbt-duckdb external sources, you must use `format: sql` for mock inputs because the source doesn't exist as a physical table.

---

## Part 6: Advanced Features

### Incremental Models: Only Process New Data

`fct_trips` is materialized as `incremental` with `delete+insert` strategy. On subsequent runs, it only processes trips newer than what's already in the table.

```sql
-- fct_trips.sql (key part)
{{ config(
    materialized='incremental',
    unique_key='trip_id',
    incremental_strategy='delete+insert',
    on_schema_change='fail'
) }}

...
{% if is_incremental() %}
where t.pickup_datetime > (select max(pickup_datetime) from {{ this }})
{% endif %}
```

In [None]:
# fct_trips materialization type from dbt
fct_count = q("SELECT count(*) as n FROM main_marts.fct_trips")['n'][0]
print(f"fct_trips: {fct_count:,} rows")
print("\nOn a full refresh: processes all ~2.9M rows (~4 seconds)")
print("On incremental run: processes 0 new rows (~0.2 seconds)")
print("\nThis is the difference between 'rebuild everything' and 'process only changes'.")

### Model Contracts: Enforce Schema at Build Time

All 7 mart models have **contracts enforced**. This means dbt checks the output schema (column names and data types) at build time. If a model produces a column with the wrong type, the build fails.

```yaml
# core.yml (excerpt)
- name: fct_trips
  config:
    contract:
      enforced: true
  columns:
    - name: trip_id
      data_type: varchar
    - name: fare_amount
      data_type: decimal(10,2)
```

This catches bugs early -- if someone changes a column type upstream, the build breaks with a clear error instead of silently corrupting data.

### Python Models: Anomaly Detection

`anomaly_daily_trips` is a native **dbt Python model** that uses pandas for statistical analysis. It applies two anomaly detection methods:

1. **Z-score**: Flag days where trips/revenue are >2 standard deviations from the mean
2. **IQR (Interquartile Range)**: Flag days outside 1.5x the IQR -- more robust to outliers

This demonstrates when to reach for Python over SQL: statistical functions, ML pipelines, or complex transformations that are awkward in pure SQL.

In [None]:
# Anomaly detection results
anomalies = q("""
    SELECT
        pickup_date,
        pickup_day_of_week,
        is_weekend,
        total_trips,
        total_revenue,
        total_trips_z_score,
        total_revenue_z_score,
        is_anomaly
    FROM main_marts.anomaly_daily_trips
    ORDER BY total_trips_z_score ASC
""")

print(f"Total days analyzed: {len(anomalies)}")
print(f"Anomalous days: {anomalies['is_anomaly'].sum()}")

print("\n=== Flagged Anomalies ===")
display(anomalies[anomalies['is_anomaly'] == True])

print("\n=== Most Extreme Z-Scores (top 5 each direction) ===")
display(anomalies.head(5)[['pickup_date', 'pickup_day_of_week', 'total_trips', 'total_trips_z_score']])
display(anomalies.tail(5)[['pickup_date', 'pickup_day_of_week', 'total_trips', 'total_trips_z_score']])

### External Materialization: Parquet Export

`export_daily_revenue` uses dbt-duckdb's `external` materialization to write data directly to a parquet file. This is useful when you need to share data with tools outside DuckDB (Jupyter, Polars, Spark, S3 uploads).

In [None]:
import os

parquet_path = "exports/daily_revenue.parquet"
if os.path.exists(parquet_path):
    size_kb = os.path.getsize(parquet_path) / 1024
    # Read directly with DuckDB -- no need for the dbt-managed database
    exported = duckdb.query(f"SELECT * FROM read_parquet('{parquet_path}') LIMIT 5").fetchdf()
    rows = duckdb.query(f"SELECT count(*) FROM read_parquet('{parquet_path}')").fetchone()[0]
    print(f"Exported parquet: {parquet_path} ({size_kb:.1f} KB, {rows} rows)")
    print("\nThis file can be read by any tool that supports parquet:")
    print("  - pandas: pd.read_parquet('exports/daily_revenue.parquet')")
    print("  - polars: pl.read_parquet('exports/daily_revenue.parquet')")
    print("  - DuckDB: SELECT * FROM read_parquet('exports/daily_revenue.parquet')")
    display(exported)
else:
    print(f"File not found: {parquet_path}")
    print("Run: cd nyc_taxi_dbt && uv run dbt run --select export_daily_revenue --profiles-dir .")

---

## Part 7: Snapshots -- Tracking Changes Over Time

`snap_locations` implements **SCD Type 2** (Slowly Changing Dimensions) for taxi zone definitions. If a zone's borough or name changes, the old record gets a `dbt_valid_to` timestamp and a new record is inserted.

In practice, taxi zones rarely change -- but the pattern is essential for production systems tracking things like product prices, customer addresses, or employee roles.

In [None]:
# Snapshot: SCD Type 2 columns
snap = q("""
    SELECT
        location_id,
        borough,
        zone_name,
        dbt_valid_from,
        dbt_valid_to,
        dbt_scd_id
    FROM snapshots.snap_locations
    ORDER BY location_id
    LIMIT 10
""")

total_snap = q("SELECT count(*) as n FROM snapshots.snap_locations")['n'][0]
current = q("SELECT count(*) as n FROM snapshots.snap_locations WHERE dbt_valid_to IS NULL")['n'][0]
historical = total_snap - current

print(f"Snapshot records: {total_snap} total ({current} current, {historical} historical)")
display(snap)

---

## Part 8: Putting It All Together -- The Full DAG

Here's how everything connects:

```
┌─────────────┐     ┌──────────────────┐     ┌──────────────────┐     ┌─────────────────────────┐
│  SOURCES    │     │    STAGING       │     │  INTERMEDIATE    │     │        MARTS            │
│             │     │                  │     │                  │     │                         │
│ parquet ────┼────>│ stg_yellow_trips ┼────>│ int_trip_metrics ┼────>│ fct_trips (incremental) │
│             │     │                  │     │     │            │     │   │                     │
│ seeds:      │     │ stg_taxi_zones ──┼─────┼─────┼────────────┼────>│ dim_locations           │
│ zone_lookup │     │ stg_payments ────┼─────┼─────┼────────────┼────>│ dim_payment_types       │
│ payment_lkp │     │ stg_rate_codes   │     │     │            │     │ dim_dates               │
│ rate_lkp    │     │                  │     │     v            │     │                         │
│             │     │                  │     │ int_daily_summary┼────>│ mart_daily_revenue      │
│             │     │                  │     │     │            │     │   └─> export (parquet)  │
│             │     │                  │     │     └────────────┼────>│ anomaly_daily (Python)  │
│             │     │                  │     │ int_hourly_patt. ┼────>│ mart_hourly_demand      │
│             │     │                  │     │                  │     │                         │
│             │     │                  │     │                  │     │ mart_location_perf      │
└─────────────┘     └──────────────────┘     └──────────────────┘     └─────────────────────────┘
```

**18 models. 91 tests. 1 snapshot. 2 exposures. Zero infrastructure.**

That's the power of dbt + DuckDB: a complete, tested, documented analytics pipeline running entirely on your laptop.

In [None]:
# Final summary: what did we build?
print("=" * 60)
print("  PROJECT SUMMARY")
print("=" * 60)

tables = q("""
    SELECT table_schema, table_name, table_type
    FROM information_schema.tables
    WHERE table_schema NOT IN ('information_schema', 'pg_catalog')
    ORDER BY table_schema, table_name
""")

for schema in tables['table_schema'].unique():
    schema_tables = tables[tables['table_schema'] == schema]
    print(f"\n  {schema}:")
    for _, row in schema_tables.iterrows():
        count = q(f"SELECT count(*) as n FROM {schema}.{row['table_name']}")['n'][0]
        print(f"    {row['table_name']:<35} {count:>12,} rows  ({row['table_type']})")

print("\n" + "=" * 60)
total_tests = 91
print(f"  Tests: {total_tests} (84 data + 7 unit)")
print(f"  Materializations: view, table, incremental, external, python")
print(f"  Advanced: contracts, snapshots, exposures, source freshness")
print("=" * 60)

In [None]:
# Clean up
con.close()
print("Connection closed. Happy learning!")

---

## Next Steps

Now that you understand the full pipeline, try these exercises:

1. **Add a new dimension**: Create `dim_rate_codes` from `stg_rate_codes` and join it into `fct_trips`
2. **Create a new mart**: `mart_borough_comparison` comparing metrics across boroughs
3. **Write a unit test**: Test that `int_daily_summary` correctly counts credit card vs cash trips
4. **Try the CLI tools**:
   ```bash
   make shell       # Explore the database interactively
   make validate    # Run the validation suite
   make benchmark   # See performance numbers
   make docs        # Browse the auto-generated documentation site
   ```

## Resources

- [dbt Documentation](https://docs.getdbt.com/)
- [dbt Best Practices](https://docs.getdbt.com/best-practices/how-we-structure/1-guide-overview)
- [DuckDB Documentation](https://duckdb.org/docs/)
- [NYC TLC Trip Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)