# Pipeline 01: Complete Production Walkthrough
## Kafka 4.0 → Flink 2.0.1 → Iceberg 1.10.1 → dbt

**Pipeline:** P01 - Kafka + Flink + Iceberg (Production-Hardened Template)
**Status:** Production-Grade (94/94 dbt tests passing)
**Stack:** Flink 2.0.1, Iceberg 1.10.1, Kafka 4.0 (KRaft), Lakekeeper REST Catalog (opt-in)
**Updated:** 2026-02-16

---

### What This Notebook Does

Every code cell uses `%%writefile` to create the **exact production files** on disk.
After running all cells top-to-bottom, you will have a complete, working pipeline
that you can start with `make up && make benchmark`.

### Production Hardening Features

| Layer | Pattern | Purpose |
|-------|---------|---------|
| **Ingestion** | Idempotent producer (`acks=all`) | Exactly-once delivery |
| **Ingestion** | Dead Letter Queue | Poison message capture |
| **Processing** | Event-time watermarks | Out-of-order handling |
| **Processing** | ROW_NUMBER dedup | Duplicate elimination |
| **Processing** | Batch + streaming modes | Same SQL for both |
| **Storage** | Lakekeeper REST catalog (opt-in) | No hardcoded S3 creds |
| **Quality** | dbt source freshness | Stale data detection |
| **Observability** | Prometheus metrics | Dashboard-ready monitoring |

### Table of Contents

| # | Section | What You'll Build |
|---|---------|-------------------|
| 1 | Architecture Overview | Understanding the data flow |
| 2 | Shared Infrastructure | Dockerfiles, data generator, schemas |
| 3 | Docker Compose | 7+4 service container orchestration |
| 4 | Kafka Layer | Topic creation, DLQ, event ingestion |
| 5 | Flink Configuration | Cluster config, Prometheus, S3/MinIO |
| 6 | Flink SQL - Init & Sources | Catalog + Kafka connector + watermarks |
| 6b | Flink SQL - REST Catalog Init | Lakekeeper alternative (opt-in) |
| 7 | Flink SQL - Bronze Layer | Kafka → Iceberg raw ingestion |
| 7b | Flink SQL - Streaming Bronze | Continuous streaming alternative |
| 8 | Flink SQL - Silver Layer | Data quality + dedup + enrichment |
| 9 | Flink SQL - Combined Pipeline | Single-file Bronze + Silver |
| 10 | dbt Project Configuration | Project, profiles, packages |
| 11 | dbt Seeds | Reference data (zones, payments, rates, vendors) |
| 12 | dbt Macros | Cross-database compatibility helpers |
| 13 | dbt Staging Models | Light transforms from Iceberg Silver |
| 14 | dbt Intermediate Models | Trip metrics, daily & hourly aggregations |
| 15 | dbt Core Marts | Fact & dimension tables (Gold layer) |
| 16 | dbt Analytics Marts | Revenue, demand, location performance |
| 17 | dbt Tests | Data quality assertions |
| 18 | Pipeline Makefile | One-command orchestration |
| 19 | Airflow DAGs | Production scheduling & maintenance |
| 20 | Running the Pipeline | Step-by-step execution guide |
| 21 | Production Operations | Monitoring, alerting, scaling |

---

## 1. Architecture Overview

### Technology Stack (2026 Production-Grade)

| Component | Technology | Version | Role |
|-----------|-----------|---------|------|
| **Ingestion** | Apache Kafka | 4.0.0 (KRaft) | Event streaming with idempotent delivery |
| **Processing** | Apache Flink | 2.0.1 (Java 17) | Stream/batch SQL processing |
| **Storage** | Apache Iceberg | 1.10.1 (V3 format) | Lakehouse table format on MinIO |
| **Catalog** | Hadoop (default) / Lakekeeper (opt-in) | v0.11.2 | Table metadata management |
| **Transform** | dbt + DuckDB | dbt-core 1.8+ | Gold layer modeling (94 tests) |
| **Object Store** | MinIO | Latest | S3-compatible storage |
| **Schema** | Confluent Schema Registry | 7.9.0 | Data contract enforcement |

### Data Flow

```
Parquet File → [Data Generator] → Kafka (taxi.raw_trips) → Flink SQL
   (source)     (idempotent, acks=all)     (3 partitions)        │    │
                                              │                  │    │
                                     taxi.raw_trips.dlq       │    │
                                        (DLQ, 7-day)          │    │
                                                         Iceberg Bronze    Iceberg Silver
                                                         (raw_trips)      (cleaned_trips)
                                                              │           (+ ROW_NUMBER dedup)
                                                              │
                                                         dbt (DuckDB)
                                                              │
                                                    Gold Layer (94 tests)
                                                    ├─ fct_trips
                                                    ├─ dim_dates / dim_locations
                                                    ├─ dim_payment_types / dim_vendors
                                                    ├─ mart_daily_revenue
                                                    ├─ mart_hourly_demand
                                                    └─ mart_location_performance
```

### Defense-in-Depth (Data Quality Layers)

```
Layer 1: Idempotent Producer    → Prevents duplicate writes at source
Layer 2: Dead Letter Queue      → Captures poison messages without data loss
Layer 3: Event-Time Watermarks  → Handles out-of-order arrivals correctly
Layer 4: ROW_NUMBER Dedup       → Eliminates duplicates in Silver layer
Layer 5: dbt Tests (94 tests)   → Validates business logic and data contracts
Layer 6: Source Freshness       → Detects pipeline stalls
```

## 2. Shared Infrastructure

These files live in `shared/` and are reused across multiple pipelines.
We create them first since Docker Compose references them.

### 2.1 Flink Dockerfile

Custom Flink image with **7 JARs** pre-installed:
- Kafka SQL connector (for reading Kafka topics as Flink tables)
- Iceberg Flink runtime (for writing to Iceberg tables)
- Iceberg AWS bundle (for S3FileIO with MinIO)
- Hadoop client API + runtime (for Iceberg Hadoop catalog)
- Hadoop AWS (for S3A filesystem)
- AWS SDK bundle (required by hadoop-aws)

In [None]:
%%writefile ../shared/docker/flink.Dockerfile
# =============================================================================
# Shared Flink Image with Kafka + Iceberg Connectors
# =============================================================================
# Base: Flink 2.0.1 (Java 17)
# Adds: Kafka SQL connector, Iceberg Flink runtime, AWS S3 bundle
# Used by: Pipelines 01, 04, 07-09, 11-12, 16-18, 21, 23
# =============================================================================

FROM flink:2.0.1-java17

# Connector versions (Flink 2.0 requires new connector builds)
ARG FLINK_KAFKA_CONNECTOR_VERSION=4.0.1-2.0
ARG ICEBERG_VERSION=1.10.1
ARG FLINK_MAJOR_MINOR=2.0

# Download Kafka SQL connector (fat jar)
RUN wget -q -P /opt/flink/lib/ \
    "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/${FLINK_KAFKA_CONNECTOR_VERSION}/flink-sql-connector-kafka-${FLINK_KAFKA_CONNECTOR_VERSION}.jar" \
    && echo "Kafka SQL connector downloaded"

# Download Iceberg Flink runtime
RUN wget -q -P /opt/flink/lib/ \
    "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_MAJOR_MINOR}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_MAJOR_MINOR}-${ICEBERG_VERSION}.jar" \
    && echo "Iceberg Flink runtime downloaded"

# Download Iceberg AWS bundle (for S3FileIO with MinIO)
RUN wget -q -P /opt/flink/lib/ \
    "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar" \
    && echo "Iceberg AWS bundle downloaded"

# Download Hadoop client (required for Iceberg Hadoop catalog)
ARG HADOOP_VERSION=3.3.6
RUN wget -q -P /opt/flink/lib/ \
    "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/${HADOOP_VERSION}/hadoop-client-api-${HADOOP_VERSION}.jar" \
    && wget -q -P /opt/flink/lib/ \
    "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/${HADOOP_VERSION}/hadoop-client-runtime-${HADOOP_VERSION}.jar" \
    && echo "Hadoop client jars downloaded"

# Download Hadoop AWS module (for S3A filesystem in Iceberg Hadoop catalog)
RUN wget -q -P /opt/flink/lib/ \
    "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar" \
    && echo "Hadoop AWS jar downloaded"

# Download AWS SDK v1 bundle (required by hadoop-aws)
ARG AWS_SDK_VERSION=1.12.367
RUN wget -q -P /opt/flink/lib/ \
    "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar" \
    && echo "AWS SDK bundle downloaded"

# Enable S3 filesystem plugin (for Flink checkpoints on S3)
RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop \
    && cp /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/s3-fs-hadoop/ 2>/dev/null || true

# Verify all JARs are present
RUN ls -la /opt/flink/lib/flink-sql-connector-kafka*.jar \
           /opt/flink/lib/iceberg-flink-runtime*.jar \
           /opt/flink/lib/iceberg-aws-bundle*.jar \
           /opt/flink/lib/hadoop-client-*.jar \
           /opt/flink/lib/hadoop-aws-*.jar \
           /opt/flink/lib/aws-java-sdk-bundle-*.jar


### 2.2 dbt Dockerfile

Slim Python image with dbt-core and dbt-duckdb for reading Iceberg tables via DuckDB's `iceberg_scan()` function.

In [None]:
%%writefile ../shared/docker/dbt.Dockerfile
FROM python:3.12-slim

# Build argument to select the dbt adapter
ARG DBT_ADAPTER=dbt-duckdb
ARG DBT_ADAPTER_VERSION=">=1.8"

WORKDIR /dbt

# Install dbt with the specified adapter
RUN pip install --no-cache-dir \
    "dbt-core>=1.8" \
    "${DBT_ADAPTER}${DBT_ADAPTER_VERSION}" \
    pyarrow \
    pandas

# For dbt-duckdb with Iceberg support
RUN if [ "$DBT_ADAPTER" = "dbt-duckdb" ]; then \
    pip install --no-cache-dir duckdb; \
    fi

# Copy dbt project (mounted or copied at build time)
COPY dbt_project/ /dbt/

# Install dbt packages
RUN dbt deps --profiles-dir . 2>/dev/null || true

ENTRYPOINT ["dbt"]
CMD ["build", "--profiles-dir", "."]

### 2.3 Data Generator

Reads NYC Yellow Taxi parquet data and produces JSON events to Kafka.
Three modes: `burst` (benchmarking), `realtime` (simulated pacing), `batch` (chunked).

**Dependencies:**

In [None]:
%%writefile ../shared/data-generator/requirements.txt
pyarrow>=14.0.0
confluent-kafka>=2.3.0
orjson>=3.9.0

**Generator script** (~210 lines, production-grade with metrics output):

Key production features:
- **Idempotent producer:** `enable.idempotence: True` + `acks: all` prevents duplicate delivery
- **LZ4 compression** and batch tuning for throughput
- **Metrics output** to JSON for benchmark collection
- Three modes: `burst` (benchmarking), `realtime` (simulated), `batch` (configurable)

In [None]:
%%writefile ../shared/data-generator/generator.py
"""Taxi trip event generator.

Reads NYC Yellow Taxi parquet data and produces events to a Kafka-compatible
broker (Kafka or Redpanda). Supports three modes:
  - burst:    As fast as possible (benchmarking)
  - realtime: Simulates actual event-time spacing
  - batch:    Sends events in configurable batch sizes with delays

Configuration via environment variables:
  BROKER_URL    Kafka/Redpanda bootstrap servers  (default: localhost:9092)
  TOPIC         Target topic name                  (default: taxi.raw_trips)
  MODE          burst | realtime | batch           (default: burst)
  RATE_LIMIT    Max events/sec in burst mode, 0=unlimited (default: 0)
  BATCH_SIZE    Events per batch in batch mode     (default: 1000)
  BATCH_DELAY   Seconds between batches            (default: 1.0)
  DATA_PATH     Path to parquet file               (default: /data/yellow_tripdata_2024-01.parquet)
  MAX_EVENTS    Stop after N events, 0=all         (default: 0)

Usage:
    python generator.py
    python generator.py --mode burst --broker localhost:9092
"""

import argparse
import math
import os
import sys
import time
from datetime import datetime

import orjson
import pyarrow.parquet as pq
from confluent_kafka import Producer


def delivery_callback(err, msg):
    if err is not None:
        print(f"  [ERROR] Delivery failed: {err}", file=sys.stderr)


def read_parquet(path: str, max_events: int = 0):
    """Yield rows from parquet file as dicts."""
    table = pq.read_table(path)
    total = table.num_rows if max_events == 0 else min(max_events, table.num_rows)
    print(f"  Source: {path} ({table.num_rows:,} rows, sending {total:,})")

    batches = table.to_batches(max_chunksize=10_000)
    sent = 0
    for batch in batches:
        for row in batch.to_pylist():
            if sent >= total:
                return
            # Convert timestamps to ISO strings for JSON serialization
            for key, val in row.items():
                if isinstance(val, datetime):
                    row[key] = val.isoformat()
            yield row
            sent += 1


def create_producer(broker_url: str) -> Producer:
    conf = {
        "bootstrap.servers": broker_url,
        "enable.idempotence": True,
        "acks": "all",
        "linger.ms": 5,
        "batch.num.messages": 10000,
        "queue.buffering.max.messages": 500000,
        "queue.buffering.max.kbytes": 1048576,
        "compression.type": "lz4",
    }
    return Producer(conf)


def produce_burst(producer: Producer, topic: str, rows, rate_limit: int):
    """Produce as fast as possible, optionally rate-limited."""
    count = 0
    start = time.perf_counter()
    last_report = start

    for row in rows:
        key = str(row.get("PULocationID", "")).encode("utf-8")
        value = orjson.dumps(row)
        producer.produce(topic, value=value, key=key, callback=delivery_callback)
        count += 1

        if count % 10000 == 0:
            producer.poll(0)
            now = time.perf_counter()
            if now - last_report >= 5.0:
                elapsed = now - start
                rate = count / elapsed
                print(f"  Produced {count:,} events ({rate:,.0f} evt/s)")
                last_report = now

        # Rate limiting
        if rate_limit > 0 and count % rate_limit == 0:
            elapsed = time.perf_counter() - start
            expected = count / rate_limit
            if elapsed < expected:
                time.sleep(expected - elapsed)

    producer.flush(timeout=30)
    elapsed = time.perf_counter() - start
    rate = count / elapsed if elapsed > 0 else 0
    return count, elapsed, rate


def produce_batch(producer: Producer, topic: str, rows, batch_size: int, batch_delay: float):
    """Produce in fixed-size batches with delays between them."""
    count = 0
    batch_count = 0
    start = time.perf_counter()

    batch_buffer = []
    for row in rows:
        batch_buffer.append(row)
        if len(batch_buffer) >= batch_size:
            for r in batch_buffer:
                key = str(r.get("PULocationID", "")).encode("utf-8")
                value = orjson.dumps(r)
                producer.produce(topic, value=value, key=key, callback=delivery_callback)
                count += 1
            producer.flush(timeout=30)
            batch_count += 1
            elapsed = time.perf_counter() - start
            rate = count / elapsed if elapsed > 0 else 0
            print(f"  Batch {batch_count}: {count:,} total ({rate:,.0f} evt/s)")
            batch_buffer = []
            time.sleep(batch_delay)

    # Final partial batch
    if batch_buffer:
        for r in batch_buffer:
            key = str(r.get("PULocationID", "")).encode("utf-8")
            value = orjson.dumps(r)
            producer.produce(topic, value=value, key=key, callback=delivery_callback)
            count += 1
        producer.flush(timeout=30)

    elapsed = time.perf_counter() - start
    rate = count / elapsed if elapsed > 0 else 0
    return count, elapsed, rate


def main():
    parser = argparse.ArgumentParser(description="Taxi trip event generator")
    parser.add_argument("--broker", default=os.environ.get("BROKER_URL", "localhost:9092"))
    parser.add_argument("--topic", default=os.environ.get("TOPIC", "taxi.raw_trips"))
    parser.add_argument("--mode", default=os.environ.get("MODE", "burst"),
                        choices=["burst", "realtime", "batch"])
    parser.add_argument("--rate-limit", type=int,
                        default=int(os.environ.get("RATE_LIMIT", "0")))
    parser.add_argument("--batch-size", type=int,
                        default=int(os.environ.get("BATCH_SIZE", "1000")))
    parser.add_argument("--batch-delay", type=float,
                        default=float(os.environ.get("BATCH_DELAY", "1.0")))
    parser.add_argument("--data-path",
                        default=os.environ.get("DATA_PATH", "/data/yellow_tripdata_2024-01.parquet"))
    parser.add_argument("--max-events", type=int,
                        default=int(os.environ.get("MAX_EVENTS", "0")))
    args = parser.parse_args()

    print("=" * 60)
    print("  Taxi Trip Event Generator")
    print("=" * 60)
    print(f"  Broker:     {args.broker}")
    print(f"  Topic:      {args.topic}")
    print(f"  Mode:       {args.mode}")
    print(f"  Data:       {args.data_path}")
    max_events_str = "all" if args.max_events == 0 else f"{args.max_events:,}"
    print(f"  Max events: {max_events_str}")
    print()

    producer = create_producer(args.broker)
    rows = read_parquet(args.data_path, args.max_events)

    if args.mode == "burst":
        count, elapsed, rate = produce_burst(producer, args.topic, rows, args.rate_limit)
    elif args.mode == "batch":
        count, elapsed, rate = produce_batch(
            producer, args.topic, rows, args.batch_size, args.batch_delay
        )
    else:
        # realtime mode: use burst with rate limiting to approximate real-time
        count, elapsed, rate = produce_burst(producer, args.topic, rows, rate_limit=5000)

    print()
    print("=" * 60)
    print("  GENERATOR COMPLETE")
    print(f"  Events:  {count:,}")
    print(f"  Elapsed: {elapsed:.2f}s")
    print(f"  Rate:    {rate:,.0f} events/sec")
    print("=" * 60)

    # Write metrics for benchmark collection
    metrics_path = os.environ.get("METRICS_PATH", "/tmp/generator_metrics.json")
    metrics = {
        "events": count,
        "elapsed_seconds": round(elapsed, 3),
        "events_per_second": round(rate, 1),
        "mode": args.mode,
        "broker": args.broker,
        "topic": args.topic,
    }
    with open(metrics_path, "wb") as f:
        f.write(orjson.dumps(metrics))
    print(f"  Metrics written to {metrics_path}")


if __name__ == "__main__":
    main()


### 2.4 Event Schema (JSON Schema)

Defines the contract for taxi trip events. Field names match the raw NYC TLC parquet source.

In [None]:
%%writefile ../shared/schemas/taxi_trip.json
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "TaxiTrip",
  "description": "NYC Yellow Taxi trip record. Field names match the raw parquet source exactly.",
  "type": "object",
  "properties": {
    "VendorID": {"type": ["integer", "null"], "description": "TPEP provider: 1=Creative Mobile Technologies, 2=VeriFone Inc."},
    "tpep_pickup_datetime": {"type": "string", "format": "date-time", "description": "Meter engaged timestamp (ISO 8601)"},
    "tpep_dropoff_datetime": {"type": "string", "format": "date-time", "description": "Meter disengaged timestamp (ISO 8601)"},
    "passenger_count": {"type": ["integer", "null"], "description": "Number of passengers (driver-entered)"},
    "trip_distance": {"type": ["number", "null"], "description": "Trip distance in miles from taximeter"},
    "RatecodeID": {"type": ["integer", "null"], "description": "Rate code: 1=Standard, 2=JFK, 3=Newark, 4=Nassau/Westchester, 5=Negotiated, 6=Group"},
    "store_and_fwd_flag": {"type": ["string", "null"], "description": "Y=stored then forwarded, N=not a store-and-forward trip"},
    "PULocationID": {"type": ["integer", "null"], "description": "TLC Taxi Zone pickup location ID"},
    "DOLocationID": {"type": ["integer", "null"], "description": "TLC Taxi Zone dropoff location ID"},
    "payment_type": {"type": ["integer", "null"], "description": "Payment method: 1=Credit, 2=Cash, 3=No charge, 4=Dispute, 5=Unknown, 6=Voided"},
    "fare_amount": {"type": ["number", "null"], "description": "Time-and-distance fare in dollars"},
    "extra": {"type": ["number", "null"], "description": "Misc extras and surcharges"},
    "mta_tax": {"type": ["number", "null"], "description": "MTA tax"},
    "tip_amount": {"type": ["number", "null"], "description": "Tip amount"},
    "tolls_amount": {"type": ["number", "null"], "description": "Total tolls paid during trip"},
    "improvement_surcharge": {"type": ["number", "null"], "description": "$0.30 improvement surcharge"},
    "total_amount": {"type": ["number", "null"], "description": "Total amount charged to passengers"},
    "congestion_surcharge": {"type": ["number", "null"], "description": "NYC congestion surcharge"},
    "Airport_fee": {"type": ["number", "null"], "description": "$1.25 for pickups at LaGuardia and JFK"}
  },
  "required": ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
}

## 3. Docker Compose: Container Orchestration

**7 always-on services** + **4 opt-in Lakekeeper services** (REST catalog):

| Service | Image | Purpose | Port |
|---------|-------|---------|------|
| kafka | apache/kafka:4.0.0 | KRaft event streaming | 9092 |
| schema-registry | cp-schema-registry:7.9.0 | Data contract enforcement | 8085 |
| minio | minio/minio:latest | S3-compatible object storage | 9000/9001 |
| mc-init | minio/mc:latest | Create warehouse bucket | - |
| flink-jobmanager | Custom (Flink 2.0.1) | Flink SQL coordinator | 8083 |
| flink-taskmanager | Custom (Flink 2.0.1) | Flink SQL worker | - |
| data-generator | Custom (Python) | Parquet → Kafka producer | - |
| dbt | Custom (dbt-duckdb) | Silver → Gold transforms | - |

**Opt-in Lakekeeper services** (`docker compose --profile lakekeeper up -d`):

| Service | Image | Purpose | Port |
|---------|-------|---------|------|
| lakekeeper-db | postgres:17 | Catalog metadata store | - |
| lakekeeper-migrate | lakekeeper/catalog:v0.11.2 | Schema migration | - |
| lakekeeper | lakekeeper/catalog:v0.11.2 | REST catalog API | 8181 |
| lakekeeper-init | curlimages/curl | Bootstrap + warehouse init | - |

Key architecture decisions:
- **Profiles:** `generator`, `dbt`, `lakekeeper` keep services opt-in (not started by `make up`)
- **Resource limits:** Memory caps prevent Docker Desktop from running out of memory
- **Health checks:** Every service has a health check for dependency ordering
- **YAML anchors:** `x-flink-common` DRYs Flink configuration

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/docker-compose.yml
# =============================================================================
# Pipeline 01: Kafka + Flink + Iceberg (Production-Grade Template)
# =============================================================================
# Architecture: Kafka (KRaft) -> Flink SQL -> Iceberg (on MinIO) -> dbt (DuckDB)
# Compose V2+ (version key removed - deprecated since Docker Compose 2.x)
# =============================================================================

# ---------------------------------------------------------------------------
# Shared Flink configuration (YAML anchor)
# ---------------------------------------------------------------------------
x-flink-common: &flink-common
  build:
    context: .
    dockerfile: ../../shared/docker/flink.Dockerfile
  environment: &flink-env
    FLINK_PROPERTIES: |
      jobmanager.rpc.address: flink-jobmanager
      taskmanager.numberOfTaskSlots: 4
      parallelism.default: 2
      state.backend: hashmap
      state.checkpoints.dir: file:///tmp/flink-checkpoints
      execution.checkpointing.interval: 30s
      rest.flamegraph.enabled: true
      classloader.check-leaked-classloader: false
  networks:
    - pipeline-net

services:
  # ---------------------------------------------------------------------------
  # Kafka (KRaft mode - ZooKeeper fully removed in 4.0)
  # ---------------------------------------------------------------------------
  kafka:
    image: apache/kafka:4.0.0
    container_name: p01-kafka
    hostname: kafka
    restart: unless-stopped
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: "p01-kafka-flink-iceberg-001"
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '2.0'
        reservations:
          memory: 1G
    healthcheck:
      test: /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 || exit 1
      interval: 10s
      timeout: 10s
      retries: 15
      start_period: 30s
    networks:
      - pipeline-net

  # ---------------------------------------------------------------------------
  # Schema Registry (Confluent)
  # ---------------------------------------------------------------------------
  schema-registry:
    image: confluentinc/cp-schema-registry:7.9.0
    container_name: p01-schema-registry
    hostname: schema-registry
    restart: unless-stopped
    ports:
      - "8085:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    depends_on:
      kafka:
        condition: service_healthy
    healthcheck:
      test: curl -f http://localhost:8081/subjects || exit 1
      interval: 10s
      timeout: 5s
      retries: 10
      start_period: 20s
    networks:
      - pipeline-net

  # ---------------------------------------------------------------------------
  # MinIO (S3-compatible object storage for Iceberg warehouse)
  # ---------------------------------------------------------------------------
  minio:
    image: minio/minio:latest
    container_name: p01-minio
    hostname: minio
    restart: unless-stopped
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
      MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
    command: server /data --console-address ":9001"
    deploy:
      resources:
        limits:
          memory: 1G
          cpus: '1.0'
    healthcheck:
      test: mc ready local || exit 1
      interval: 10s
      timeout: 5s
      retries: 10
      start_period: 10s
    volumes:
      - minio-data:/data
    networks:
      - pipeline-net

  # ---------------------------------------------------------------------------
  # MinIO Client Init (create warehouse bucket)
  # ---------------------------------------------------------------------------
  mc-init:
    image: minio/mc:latest
    container_name: p01-mc-init
    depends_on:
      minio:
        condition: service_healthy
    entrypoint: >
      /bin/sh -c "
      mc alias set myminio http://minio:9000 ${MINIO_ROOT_USER:-minioadmin} ${MINIO_ROOT_PASSWORD:-minioadmin} &&
      mc mb myminio/warehouse --ignore-existing &&
      mc anonymous set download myminio/warehouse &&
      echo 'Bucket warehouse created successfully'
      "
    networks:
      - pipeline-net

  # ---------------------------------------------------------------------------
  # Lakekeeper REST Catalog (opt-in: docker compose --profile lakekeeper up -d)
  # ---------------------------------------------------------------------------
  lakekeeper-db:
    image: postgres:17
    container_name: p01-lakekeeper-db
    hostname: lakekeeper-db
    restart: unless-stopped
    environment:
      POSTGRES_USER: lakekeeper
      POSTGRES_PASSWORD: lakekeeper
      POSTGRES_DB: lakekeeper
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U lakekeeper"]
      interval: 5s
      timeout: 5s
      retries: 10
    volumes:
      - lakekeeper-db-data:/var/lib/postgresql/data
    profiles:
      - lakekeeper
    networks:
      - pipeline-net

  lakekeeper-migrate:
    image: quay.io/lakekeeper/catalog:v0.11.2
    container_name: p01-lakekeeper-migrate
    command: ["migrate"]
    environment:
      LAKEKEEPER__PG_DATABASE_URL_READ: postgresql://lakekeeper:lakekeeper@lakekeeper-db:5432/lakekeeper
      LAKEKEEPER__PG_DATABASE_URL_WRITE: postgresql://lakekeeper:lakekeeper@lakekeeper-db:5432/lakekeeper
      LAKEKEEPER__PG_ENCRYPTION_KEY: "this-is-NOT-secure-change-in-prod!!"
    depends_on:
      lakekeeper-db:
        condition: service_healthy
    profiles:
      - lakekeeper
    networks:
      - pipeline-net

  lakekeeper:
    image: quay.io/lakekeeper/catalog:v0.11.2
    container_name: p01-lakekeeper
    hostname: lakekeeper
    restart: unless-stopped
    command: ["serve"]
    ports:
      - "8181:8181"
    environment:
      LAKEKEEPER__PG_DATABASE_URL_READ: postgresql://lakekeeper:lakekeeper@lakekeeper-db:5432/lakekeeper
      LAKEKEEPER__PG_DATABASE_URL_WRITE: postgresql://lakekeeper:lakekeeper@lakekeeper-db:5432/lakekeeper
      LAKEKEEPER__PG_ENCRYPTION_KEY: "this-is-NOT-secure-change-in-prod!!"
      LAKEKEEPER__LISTEN_PORT: 8181
    depends_on:
      lakekeeper-migrate:
        condition: service_completed_successfully
    healthcheck:
      test: curl -f http://localhost:8181/health || exit 1
      interval: 5s
      timeout: 5s
      retries: 10
      start_period: 10s
    profiles:
      - lakekeeper
    networks:
      - pipeline-net

  lakekeeper-init:
    image: curlimages/curl:latest
    container_name: p01-lakekeeper-init
    depends_on:
      lakekeeper:
        condition: service_healthy
      mc-init:
        condition: service_completed_successfully
    entrypoint: /bin/sh
    command:
      - -c
      - |
        echo 'Bootstrapping Lakekeeper...' &&
        curl -sf -X POST http://lakekeeper:8181/management/v1/bootstrap \
          -H 'Content-Type: application/json' \
          -d '{"accept-terms-of-use": true}' &&
        echo '' &&
        echo 'Creating warehouse...' &&
        curl -sf -X POST http://lakekeeper:8181/management/v1/warehouse \
          -H 'Content-Type: application/json' \
          -d '{
            "warehouse-name": "warehouse",
            "storage-profile": {
              "type": "s3",
              "bucket": "warehouse",
              "region": "us-east-1",
              "endpoint": "http://minio:9000",
              "path-style-access": true,
              "flavor": "minio"
            },
            "storage-credential": {
              "type": "s3",
              "credential-type": "access-key",
              "aws-access-key-id": "minioadmin",
              "aws-secret-access-key": "minioadmin"
            }
          }' &&
        echo '' &&
        echo 'Lakekeeper initialized successfully'
    profiles:
      - lakekeeper
    networks:
      - pipeline-net

  # ---------------------------------------------------------------------------
  # Flink JobManager
  # ---------------------------------------------------------------------------
  flink-jobmanager:
    <<: *flink-common
    container_name: p01-flink-jobmanager
    hostname: flink-jobmanager
    restart: unless-stopped
    command: jobmanager
    ports:
      - "8083:8081"
    volumes:
      - ./flink/sql:/opt/flink/sql:ro
      - ./flink/conf/config.yaml:/opt/flink/conf/config.yaml:ro
      - ./flink/conf/core-site.xml:/opt/hadoop/conf/core-site.xml:ro
      - flink-checkpoints:/tmp/flink-checkpoints
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '1.0'
        reservations:
          memory: 1G
    depends_on:
      kafka:
        condition: service_healthy
      mc-init:
        condition: service_completed_successfully
    healthcheck:
      test: curl -f http://localhost:8081/overview || exit 1
      interval: 10s
      timeout: 5s
      retries: 15
      start_period: 30s
    environment:
      <<: *flink-env
      AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY:-minioadmin}
      AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY:-minioadmin}
      AWS_REGION: us-east-1
      HADOOP_CONF_DIR: /opt/hadoop/conf

  # ---------------------------------------------------------------------------
  # Flink TaskManager
  # ---------------------------------------------------------------------------
  flink-taskmanager:
    <<: *flink-common
    container_name: p01-flink-taskmanager
    hostname: flink-taskmanager
    restart: unless-stopped
    command: taskmanager
    volumes:
      - ./flink/conf/core-site.xml:/opt/hadoop/conf/core-site.xml:ro
      - flink-checkpoints:/tmp/flink-checkpoints
    deploy:
      resources:
        limits:
          memory: 3G
          cpus: '2.0'
        reservations:
          memory: 2G
    depends_on:
      flink-jobmanager:
        condition: service_healthy
    environment:
      <<: *flink-env
      AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY:-minioadmin}
      AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY:-minioadmin}
      AWS_REGION: us-east-1
      HADOOP_CONF_DIR: /opt/hadoop/conf

  # ---------------------------------------------------------------------------
  # dbt (DuckDB adapter - reads Iceberg tables from MinIO)
  # ---------------------------------------------------------------------------
  dbt:
    build:
      context: .
      dockerfile: ../../shared/docker/dbt.Dockerfile
      args:
        DBT_ADAPTER: dbt-duckdb
    container_name: p01-dbt
    volumes:
      - ./dbt_project:/dbt
    working_dir: /dbt
    entrypoint: ["/bin/sh", "-c"]
    command: ["dbt deps --profiles-dir . && dbt build --full-refresh --profiles-dir ."]
    environment:
      AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY:-minioadmin}
      AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY:-minioadmin}
      AWS_ENDPOINT_URL: http://minio:9000
      AWS_REGION: us-east-1
      DBT_PROFILES_DIR: /dbt
    depends_on:
      minio:
        condition: service_healthy
    profiles:
      - dbt
    networks:
      - pipeline-net

  # ---------------------------------------------------------------------------
  # Data Generator (reads parquet, produces to Kafka)
  # ---------------------------------------------------------------------------
  data-generator:
    build:
      context: ../../shared/data-generator/
      dockerfile: Dockerfile
    container_name: p01-data-generator
    volumes:
      - ../../data:/data:ro
    environment:
      BROKER_URL: kafka:9092
      TOPIC: taxi.raw_trips
      MODE: burst
      DATA_PATH: /data/yellow_tripdata_2024-01.parquet
    depends_on:
      kafka:
        condition: service_healthy
    profiles:
      - generator
    networks:
      - pipeline-net

# =============================================================================
# Volumes
# =============================================================================
volumes:
  minio-data:
    driver: local
  flink-checkpoints:
    driver: local
  lakekeeper-db-data:
    driver: local

# =============================================================================
# Networks
# =============================================================================
networks:
  pipeline-net:
    name: p01-pipeline-net
    driver: bridge


## 4. Kafka Layer: Event Ingestion

The topic creation script creates two topics:
- `taxi.raw_trips` — Main event stream (3 partitions, 24h retention)
- `taxi.raw_trips.dlq` — Dead Letter Queue for failed events (1 partition, 7-day retention)

> **Production pattern:** DLQ captures poison messages that fail validation. 7-day retention
> gives operators time to investigate and replay. The main topic uses 3 partitions for
> Flink parallelism matching.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/kafka/create-topics.sh
#!/bin/bash
# =============================================================================
# Pipeline 01: Create Kafka Topics
# =============================================================================
# Creates the required topics for the taxi trip streaming pipeline.
# Run this after Kafka is fully started and healthy.
#
# Usage:
#   docker compose exec kafka /bin/bash /opt/kafka/scripts/create-topics.sh
#   -- or --
#   make create-topics
# =============================================================================

set -euo pipefail

BOOTSTRAP_SERVER="${BOOTSTRAP_SERVER:-localhost:9092}"
KAFKA_BIN="/opt/kafka/bin"

echo "============================================================"
echo "  Creating Kafka Topics"
echo "  Bootstrap server: ${BOOTSTRAP_SERVER}"
echo "============================================================"

# Wait for Kafka to be ready
echo "Waiting for Kafka to be ready..."
MAX_RETRIES=30
RETRY=0
until ${KAFKA_BIN}/kafka-broker-api-versions.sh --bootstrap-server "${BOOTSTRAP_SERVER}" > /dev/null 2>&1; do
    RETRY=$((RETRY + 1))
    if [ "${RETRY}" -ge "${MAX_RETRIES}" ]; then
        echo "ERROR: Kafka not available after ${MAX_RETRIES} retries"
        exit 1
    fi
    echo "  Attempt ${RETRY}/${MAX_RETRIES} - waiting..."
    sleep 2
done
echo "Kafka is ready."
echo ""

# ---------------------------------------------------------------------------
# taxi.raw_trips - Main ingest topic
# ---------------------------------------------------------------------------
echo "Creating topic: taxi.raw_trips"
${KAFKA_BIN}/kafka-topics.sh \
    --bootstrap-server "${BOOTSTRAP_SERVER}" \
    --create \
    --topic taxi.raw_trips \
    --partitions 3 \
    --replication-factor 1 \
    --if-not-exists \
    --config retention.ms=86400000 \
    --config cleanup.policy=delete \
    --config segment.bytes=104857600

echo "  taxi.raw_trips created (3 partitions, 24h retention)"
echo ""

# ---------------------------------------------------------------------------
# taxi.raw_trips.dlq - Dead Letter Queue for failed/invalid events
# ---------------------------------------------------------------------------
echo "Creating topic: taxi.raw_trips.dlq"
${KAFKA_BIN}/kafka-topics.sh \
    --bootstrap-server "${BOOTSTRAP_SERVER}" \
    --create \
    --topic taxi.raw_trips.dlq \
    --partitions 1 \
    --replication-factor 1 \
    --if-not-exists \
    --config retention.ms=604800000 \
    --config cleanup.policy=delete

echo "  taxi.raw_trips.dlq created (1 partition, 7d retention)"
echo ""

# ---------------------------------------------------------------------------
# Verify
# ---------------------------------------------------------------------------
echo "============================================================"
echo "  Topics:"
${KAFKA_BIN}/kafka-topics.sh \
    --bootstrap-server "${BOOTSTRAP_SERVER}" \
    --list

echo ""
echo "  Topic Details:"
${KAFKA_BIN}/kafka-topics.sh \
    --bootstrap-server "${BOOTSTRAP_SERVER}" \
    --describe \
    --topic taxi.raw_trips

echo "============================================================"
echo "  Topic creation complete."
echo "============================================================"


## 5. Flink Configuration

### 5.1 Hadoop core-site.xml

**Critical for Flink → MinIO connectivity.** Without this, Flink cannot write Iceberg tables to S3-compatible storage.

Key settings:
- `fs.s3a.endpoint` → MinIO URL
- `fs.s3a.path.style.access` → `true` (required for MinIO, not real S3)
- `fs.s3a.impl` → S3AFileSystem (Hadoop's S3 adapter)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/conf/core-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <!-- MinIO (S3-compatible) Configuration for Hadoop S3A -->
    <property>
        <name>fs.s3a.endpoint</name>
        <value>http://minio:9000</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>minioadmin</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>minioadmin</value>
    </property>
    <property>
        <name>fs.s3a.path.style.access</name>
        <value>true</value>
    </property>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name>
        <value>false</value>
    </property>
</configuration>

### 5.2 Flink Cluster Configuration (`config.yaml`)

> **Flink 2.0 Change:** Configuration file renamed from `flink-conf.yaml` to `config.yaml`
> (standard YAML 1.2 format). This file is bind-mounted into the JobManager only.
> The TaskManager receives its config via `FLINK_PROPERTIES` environment variable.

Key settings:
- `classloader.check-leaked-classloader: false` — Required for Iceberg + batch DML sync
- `HADOOP_CONF_DIR: /opt/hadoop/conf` — Required for S3A filesystem access
- `metrics.reporter.prom.*` — Prometheus metrics on port 9249
- `execution.checkpointing.interval: 30s` — Exactly-once checkpointing

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/conf/config.yaml
# =============================================================================
# Pipeline 01: Flink Configuration
# =============================================================================
# Configuration for Flink 1.20 with Iceberg + Kafka connectors.
# This file is mounted into the JobManager container.
# =============================================================================

# Cluster
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600m

taskmanager.bind-host: 0.0.0.0
taskmanager.host: flink-taskmanager
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 4

parallelism.default: 2

# REST API (Flink Dashboard)
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
rest.port: 8081
rest.flamegraph.enabled: true

# Checkpointing
execution.checkpointing.interval: 30s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 10s
execution.checkpointing.timeout: 5min
state.backend: hashmap
state.checkpoints.dir: file:///tmp/flink-checkpoints
state.savepoints.dir: file:///tmp/flink-savepoints

# Table / SQL Configuration
table.exec.state.ttl: 0
table.exec.sink.not-null-enforcer: DROP

# Classloader (avoid Iceberg classloader leak with batch DML sync)
classloader.check-leaked-classloader: false

# S3 (MinIO) filesystem configuration
s3.endpoint: http://minio:9000
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.path.style.access: true

# Prometheus metrics (expose on :9249 for scraping)
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249

# Logging
env.log.max: 5
env.log.dir: /opt/flink/log


## 6. Flink SQL: Session Initialization

This is the most important SQL file — it creates the Kafka source table and Iceberg catalog
that all subsequent SQL files depend on. It's used as an init script (`-i` flag):

```bash
sql-client.sh embedded -i 00-init.sql -f 05-bronze.sql
```

Key features:
- **Batch mode** (`execution.runtime-mode = batch`): Process all available data, then stop
- **DML sync** (`table.dml-sync = true`): Wait for each INSERT to complete before next
- **Event-time watermark:** `WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND`
  enables correct out-of-order handling (no-op in batch mode, essential in streaming)
- **Bounded consumption:** `scan.bounded.mode = latest-offset` reads everything available then stops

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/00-init.sql
-- =============================================================================
-- Pipeline 01: Flink SQL - Session Initialization
-- =============================================================================
-- Creates the Kafka source table and Iceberg catalog. This file is used as
-- an init script (-i flag) for all subsequent SQL files so they have access
-- to the catalog within the same session.
--
-- Uses BATCH execution mode so jobs process all available data and terminate.
-- =============================================================================

-- Use batch mode (process available data, then stop)
SET 'execution.runtime-mode' = 'batch';

-- Wait for each INSERT to complete before proceeding to next statement
SET 'table.dml-sync' = 'true';

-- Create Kafka source table
-- NOTE: event_time computed column + WATERMARK enables event-time processing
-- in streaming mode. In batch mode (default), the watermark is simply ignored.
CREATE TABLE IF NOT EXISTS kafka_raw_trips (
    VendorID                BIGINT,
    tpep_pickup_datetime    STRING,
    tpep_dropoff_datetime   STRING,
    passenger_count         BIGINT,
    trip_distance           DOUBLE,
    RatecodeID              BIGINT,
    store_and_fwd_flag      STRING,
    PULocationID            BIGINT,
    DOLocationID            BIGINT,
    payment_type            BIGINT,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    congestion_surcharge    DOUBLE,
    Airport_fee             DOUBLE,
    -- Computed column for event-time processing
    event_time AS TO_TIMESTAMP(tpep_pickup_datetime, 'yyyy-MM-dd''T''HH:mm:ss'),
    -- Watermark: allow 10s late arrivals (ignored in batch mode)
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'taxi.raw_trips',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-consumer',
    'scan.startup.mode' = 'earliest-offset',
    'scan.bounded.mode' = 'latest-offset',
    'format' = 'json'
);

-- Create Iceberg catalog backed by MinIO
CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'hadoop',
    'warehouse' = 's3a://warehouse/',
    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
    's3.endpoint' = 'http://minio:9000',
    's3.access-key-id' = 'minioadmin',
    's3.secret-access-key' = 'minioadmin',
    's3.path-style-access' = 'true'
);


### 6b. REST Catalog Session Init (Lakekeeper Alternative)> **When to use:** If you started with `make up-lakekeeper`, use this init file instead of `00-init.sql`.> The REST catalog eliminates hardcoded S3 credentials in SQL — Lakekeeper handles credential vending.```bash# Usage with REST catalog:sql-client.sh embedded -i 00-init-rest.sql -f 05-bronze.sql```

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/00-init-rest.sql-- =============================================================================-- Pipeline 01: Flink SQL - Session Initialization (REST Catalog via Lakekeeper)-- =============================================================================-- Alternative to 00-init.sql that uses Lakekeeper REST catalog instead of-- Hadoop catalog. Requires: docker compose --profile lakekeeper up -d---- Usage:--   sql-client.sh embedded -i 00-init-rest.sql -f 05-bronze.sql--   sql-client.sh embedded -i 00-init-rest.sql -f 06-silver.sql-- =============================================================================-- Use batch mode (process available data, then stop)SET 'execution.runtime-mode' = 'batch';-- Wait for each INSERT to complete before proceeding to next statementSET 'table.dml-sync' = 'true';-- Create Kafka source table-- NOTE: event_time computed column + WATERMARK enables event-time processing-- in streaming mode. In batch mode (default), the watermark is simply ignored.CREATE TABLE IF NOT EXISTS kafka_raw_trips (    VendorID                BIGINT,    tpep_pickup_datetime    STRING,    tpep_dropoff_datetime   STRING,    passenger_count         BIGINT,    trip_distance           DOUBLE,    RatecodeID              BIGINT,    store_and_fwd_flag      STRING,    PULocationID            BIGINT,    DOLocationID            BIGINT,    payment_type            BIGINT,    fare_amount             DOUBLE,    extra                   DOUBLE,    mta_tax                 DOUBLE,    tip_amount              DOUBLE,    tolls_amount            DOUBLE,    improvement_surcharge   DOUBLE,    total_amount            DOUBLE,    congestion_surcharge    DOUBLE,    Airport_fee             DOUBLE,    -- Computed column for event-time processing    event_time AS TO_TIMESTAMP(tpep_pickup_datetime, 'yyyy-MM-dd''T''HH:mm:ss'),    -- Watermark: allow 10s late arrivals (ignored in batch mode)    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND) WITH (    'connector' = 'kafka',    'topic' = 'taxi.raw_trips',    'properties.bootstrap.servers' = 'kafka:9092',    'properties.group.id' = 'flink-consumer',    'scan.startup.mode' = 'earliest-offset',    'scan.bounded.mode' = 'latest-offset',    'format' = 'json');-- Create Iceberg catalog via Lakekeeper REST API-- No S3 credentials needed here - Lakekeeper handles credential vendingCREATE CATALOG iceberg_catalog WITH (    'type' = 'iceberg',    'catalog-type' = 'rest',    'uri' = 'http://lakekeeper:8181/catalog',    'warehouse' = 'warehouse');

### 6.2 Kafka Source Table (Reference)

This is the standalone version of the Kafka table definition. Included in `00-init.sql` but useful as documentation.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/01-create-kafka-source.sql
-- =============================================================================
-- Pipeline 01: Flink SQL - Kafka Source Table
-- =============================================================================
-- Creates a Flink SQL table backed by the Kafka topic taxi.raw_trips.
-- The data generator produces JSON records with these exact field names
-- matching the NYC Yellow Taxi parquet schema.
-- =============================================================================

CREATE TABLE kafka_raw_trips (
    VendorID                BIGINT,
    tpep_pickup_datetime    STRING,
    tpep_dropoff_datetime   STRING,
    passenger_count         BIGINT,
    trip_distance           DOUBLE,
    RatecodeID              BIGINT,
    store_and_fwd_flag      STRING,
    PULocationID            BIGINT,
    DOLocationID            BIGINT,
    payment_type            BIGINT,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    congestion_surcharge    DOUBLE,
    Airport_fee             DOUBLE
) WITH (
    'connector' = 'kafka',
    'topic' = 'taxi.raw_trips',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-consumer',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
);

### 6.3 Iceberg Catalog (Reference)

Standalone catalog creation. Also included in `00-init.sql`.

Key properties:
- `catalog-type: hadoop` → Uses filesystem-based catalog metadata
- `warehouse: s3a://warehouse/` → MinIO bucket for all Iceberg data
- `io-impl: S3FileIO` → Iceberg's own S3 implementation

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/02-create-iceberg-catalog.sql
-- =============================================================================
-- Pipeline 01: Flink SQL - Iceberg Catalog
-- =============================================================================
-- Creates a Hadoop-based Iceberg catalog backed by MinIO (S3-compatible).
-- All Bronze and Silver tables will be created within this catalog.
-- =============================================================================

CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'hadoop',
    'warehouse' = 's3a://warehouse/',
    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
    's3.endpoint' = 'http://minio:9000',
    's3.access-key-id' = 'minioadmin',
    's3.secret-access-key' = 'minioadmin',
    's3.path-style-access' = 'true'
);

## 7. Flink SQL: Bronze Layer (Kafka → Iceberg)

### What Bronze Does
- Preserves **all** original fields from Kafka events
- Parses ISO 8601 timestamp strings → `TIMESTAMP(3)` type
- Adds `ingestion_ts` metadata column (when the event was processed)
- **No filtering**, no validation, no business logic
- Writes ACID Iceberg tables to MinIO (`s3a://warehouse/bronze/raw_trips/`)

### 7.1 Bronze with Documentation (03-bronze-raw-trips.sql)

The verbose version with inline comments explaining each decision:

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/03-bronze-raw-trips.sql
-- =============================================================================
-- Pipeline 01: Flink SQL - Bronze Layer (Raw Trips)
-- =============================================================================
-- Creates the Bronze Iceberg table and starts a continuous INSERT job
-- that reads from the Kafka source table.
--
-- Bronze layer preserves original column names from the source data.
-- Timestamps are parsed from ISO 8601 strings to TIMESTAMP type.
-- No filtering or cleaning is applied at this layer.
-- =============================================================================

-- Use the Iceberg catalog
USE CATALOG iceberg_catalog;

-- Create the Bronze database
CREATE DATABASE IF NOT EXISTS bronze;
USE bronze;

-- Create the Bronze raw trips table
CREATE TABLE IF NOT EXISTS raw_trips (
    VendorID                BIGINT,
    tpep_pickup_datetime    TIMESTAMP(3),
    tpep_dropoff_datetime   TIMESTAMP(3),
    passenger_count         BIGINT,
    trip_distance           DOUBLE,
    RatecodeID              BIGINT,
    store_and_fwd_flag      STRING,
    PULocationID            BIGINT,
    DOLocationID            BIGINT,
    payment_type            BIGINT,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    congestion_surcharge    DOUBLE,
    Airport_fee             DOUBLE,
    ingestion_ts            TIMESTAMP(3)
);

-- Switch back to default catalog for the Kafka source table reference
USE CATALOG default_catalog;
USE default_database;

-- Continuous INSERT from Kafka into Bronze Iceberg table
-- Timestamps are parsed from ISO 8601 string format (e.g. "2024-01-15T08:30:00")
INSERT INTO iceberg_catalog.bronze.raw_trips
SELECT
    VendorID,
    TO_TIMESTAMP(tpep_pickup_datetime, 'yyyy-MM-dd''T''HH:mm:ss')   AS tpep_pickup_datetime,
    TO_TIMESTAMP(tpep_dropoff_datetime, 'yyyy-MM-dd''T''HH:mm:ss')  AS tpep_dropoff_datetime,
    passenger_count,
    trip_distance,
    RatecodeID,
    store_and_fwd_flag,
    PULocationID,
    DOLocationID,
    payment_type,
    fare_amount,
    extra,
    mta_tax,
    tip_amount,
    tolls_amount,
    improvement_surcharge,
    total_amount,
    congestion_surcharge,
    Airport_fee,
    CURRENT_TIMESTAMP AS ingestion_ts
FROM kafka_raw_trips;

### 7.2 Bronze Standalone (05-bronze.sql)

The production version used by `make process-bronze`. Identical logic, minimal comments.
Run with: `sql-client.sh embedded -i 00-init.sql -f 05-bronze.sql`

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/05-bronze.sql
-- =============================================================================
-- Pipeline 01: Bronze Layer (Kafka → Iceberg)
-- =============================================================================
-- Run: sql-client.sh embedded -i 00-init.sql -f 05-bronze.sql
-- =============================================================================

USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS bronze;

CREATE TABLE IF NOT EXISTS bronze.raw_trips (
    VendorID                BIGINT,
    tpep_pickup_datetime    TIMESTAMP(3),
    tpep_dropoff_datetime   TIMESTAMP(3),
    passenger_count         BIGINT,
    trip_distance           DOUBLE,
    RatecodeID              BIGINT,
    store_and_fwd_flag      STRING,
    PULocationID            BIGINT,
    DOLocationID            BIGINT,
    payment_type            BIGINT,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    congestion_surcharge    DOUBLE,
    Airport_fee             DOUBLE,
    ingestion_ts            TIMESTAMP(3)
);

USE CATALOG default_catalog;
USE default_database;

INSERT INTO iceberg_catalog.bronze.raw_trips
SELECT
    VendorID,
    TO_TIMESTAMP(tpep_pickup_datetime, 'yyyy-MM-dd''T''HH:mm:ss')   AS tpep_pickup_datetime,
    TO_TIMESTAMP(tpep_dropoff_datetime, 'yyyy-MM-dd''T''HH:mm:ss')  AS tpep_dropoff_datetime,
    passenger_count,
    trip_distance,
    RatecodeID,
    store_and_fwd_flag,
    PULocationID,
    DOLocationID,
    payment_type,
    fare_amount,
    extra,
    mta_tax,
    tip_amount,
    tolls_amount,
    improvement_surcharge,
    total_amount,
    congestion_surcharge,
    Airport_fee,
    CURRENT_TIMESTAMP AS ingestion_ts
FROM kafka_raw_trips;

## 8. Flink SQL: Silver Layer (Bronze → Cleaned Iceberg)

The Silver layer applies data quality rules and deduplication:

1. **ROW_NUMBER dedup:** `PARTITION BY natural_key ORDER BY ingestion_ts DESC` → keeps latest record
2. **Null filtering:** Removes rows with null passenger_count or trip_distance
3. **Negative filtering:** Removes negative fares, tips, tolls, totals
4. **Date range:** Only January 2024 data passes through
5. **Computed columns:** duration_minutes, avg_speed_mph, cost_per_mile, tip_percentage
6. **Date dimensions:** pickup_date, pickup_hour, is_weekend
7. **Surrogate key:** MD5 hash trip_id for downstream joins

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/04-silver-cleaned-trips.sql
-- =============================================================================
-- Pipeline 01: Flink SQL - Silver Layer (Cleaned Trips)
-- =============================================================================
-- Creates the Silver Iceberg table and starts a continuous INSERT job
-- that reads from the Bronze table, applies data quality filters,
-- renames columns to snake_case, and computes enrichment columns.
--
-- Silver layer transformations:
--   1. Column renaming (VendorID -> vendor_id, PULocationID -> pickup_location_id, etc.)
--   2. Type casting (BIGINT -> INT where appropriate)
--   3. Data quality filters:
--      - Reject null timestamps
--      - Reject negative fare amounts and trip distances
--      - Reject pickup dates outside January 2024
--   4. Surrogate key: MD5 hash of composite natural key
--   5. Computed columns:
--      - duration_minutes
--      - avg_speed_mph
--      - cost_per_mile
--      - tip_percentage
--      - pickup_date, pickup_hour
--      - is_weekend
-- =============================================================================

-- Use the Iceberg catalog
USE CATALOG iceberg_catalog;

-- Create the Silver database
CREATE DATABASE IF NOT EXISTS silver;
USE silver;

-- Create the Silver cleaned trips table
CREATE TABLE IF NOT EXISTS cleaned_trips (
    -- surrogate key
    trip_id                 STRING,

    -- identifiers
    vendor_id               INT,
    rate_code_id            INT,
    pickup_location_id      INT,
    dropoff_location_id     INT,
    payment_type_id         INT,

    -- timestamps
    pickup_datetime         TIMESTAMP(3),
    dropoff_datetime        TIMESTAMP(3),

    -- trip info
    passenger_count         INT,
    trip_distance_miles     DOUBLE,
    store_and_fwd_flag      STRING,

    -- financials
    fare_amount             DECIMAL(10, 2),
    extra_amount            DECIMAL(10, 2),
    mta_tax                 DECIMAL(10, 2),
    tip_amount              DECIMAL(10, 2),
    tolls_amount            DECIMAL(10, 2),
    improvement_surcharge   DECIMAL(10, 2),
    total_amount            DECIMAL(10, 2),
    congestion_surcharge    DECIMAL(10, 2),
    airport_fee             DECIMAL(10, 2),

    -- computed: enrichments
    duration_minutes        BIGINT,
    avg_speed_mph           DOUBLE,
    cost_per_mile           DOUBLE,
    tip_percentage          DOUBLE,

    -- computed: time dimensions
    pickup_date             DATE,
    pickup_hour             INT,
    is_weekend              BOOLEAN
);

-- Continuous INSERT from Bronze into Silver with transformations
INSERT INTO iceberg_catalog.silver.cleaned_trips
SELECT
    -- Surrogate key: MD5 hash of composite natural key
    MD5(CONCAT_WS('|',
        CAST(VendorID AS STRING),
        CAST(tpep_pickup_datetime AS STRING),
        CAST(tpep_dropoff_datetime AS STRING),
        CAST(PULocationID AS STRING),
        CAST(DOLocationID AS STRING),
        CAST(fare_amount AS STRING),
        CAST(total_amount AS STRING)
    )) AS trip_id,

    -- Identifiers (renamed + cast)
    CAST(VendorID AS INT)       AS vendor_id,
    CAST(RatecodeID AS INT)     AS rate_code_id,
    CAST(PULocationID AS INT)   AS pickup_location_id,
    CAST(DOLocationID AS INT)   AS dropoff_location_id,
    CAST(payment_type AS INT)   AS payment_type_id,

    -- Timestamps
    tpep_pickup_datetime        AS pickup_datetime,
    tpep_dropoff_datetime       AS dropoff_datetime,

    -- Trip info
    CAST(passenger_count AS INT) AS passenger_count,
    trip_distance               AS trip_distance_miles,
    store_and_fwd_flag,

    -- Financials (rounded to 2 decimal places)
    CAST(ROUND(fare_amount, 2)             AS DECIMAL(10, 2)) AS fare_amount,
    CAST(ROUND(extra, 2)                   AS DECIMAL(10, 2)) AS extra_amount,
    CAST(ROUND(mta_tax, 2)                 AS DECIMAL(10, 2)) AS mta_tax,
    CAST(ROUND(tip_amount, 2)              AS DECIMAL(10, 2)) AS tip_amount,
    CAST(ROUND(tolls_amount, 2)            AS DECIMAL(10, 2)) AS tolls_amount,
    CAST(ROUND(improvement_surcharge, 2)   AS DECIMAL(10, 2)) AS improvement_surcharge,
    CAST(ROUND(total_amount, 2)            AS DECIMAL(10, 2)) AS total_amount,
    CAST(ROUND(congestion_surcharge, 2)    AS DECIMAL(10, 2)) AS congestion_surcharge,
    CAST(ROUND(Airport_fee, 2)             AS DECIMAL(10, 2)) AS airport_fee,

    -- Computed: duration in minutes
    TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) AS duration_minutes,

    -- Computed: average speed in mph (avoid division by zero)
    CASE
        WHEN TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) > 0
        THEN ROUND(
            trip_distance / (CAST(TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) AS DOUBLE) / 60.0),
            2
        )
        ELSE NULL
    END AS avg_speed_mph,

    -- Computed: cost per mile (avoid division by zero)
    CASE
        WHEN trip_distance > 0
        THEN ROUND(fare_amount / trip_distance, 2)
        ELSE NULL
    END AS cost_per_mile,

    -- Computed: tip percentage (avoid division by zero)
    CASE
        WHEN fare_amount > 0
        THEN ROUND((tip_amount / fare_amount) * 100, 2)
        ELSE NULL
    END AS tip_percentage,

    -- Computed: date dimensions
    CAST(tpep_pickup_datetime AS DATE) AS pickup_date,
    EXTRACT(HOUR FROM tpep_pickup_datetime) AS pickup_hour,
    CASE
        WHEN DAYOFWEEK(tpep_pickup_datetime) IN (1, 7) THEN TRUE
        ELSE FALSE
    END AS is_weekend

FROM iceberg_catalog.bronze.raw_trips

-- Data quality filters
WHERE tpep_pickup_datetime IS NOT NULL
  AND tpep_dropoff_datetime IS NOT NULL
  AND trip_distance >= 0
  AND fare_amount >= 0
  AND CAST(tpep_pickup_datetime AS DATE) >= DATE '2024-01-01'
  AND CAST(tpep_pickup_datetime AS DATE) <  DATE '2024-02-01';

### 8.2 Silver Standalone (06-silver.sql)

Production version used by `make process-silver`.
Run with: `sql-client.sh embedded -i 00-init.sql -f 06-silver.sql`

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/06-silver.sql
-- =============================================================================
-- Pipeline 01: Silver Layer (Bronze Iceberg → Silver Iceberg)
-- =============================================================================
-- Run: sql-client.sh embedded -i 00-init.sql -f 06-silver.sql
-- =============================================================================

USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS silver;

CREATE TABLE IF NOT EXISTS silver.cleaned_trips (
    trip_id                 STRING,
    vendor_id               INT,
    rate_code_id            INT,
    pickup_location_id      INT,
    dropoff_location_id     INT,
    payment_type_id         INT,
    pickup_datetime         TIMESTAMP(3),
    dropoff_datetime        TIMESTAMP(3),
    passenger_count         INT,
    trip_distance_miles     DOUBLE,
    store_and_fwd_flag      STRING,
    fare_amount             DECIMAL(10, 2),
    extra_amount            DECIMAL(10, 2),
    mta_tax                 DECIMAL(10, 2),
    tip_amount              DECIMAL(10, 2),
    tolls_amount            DECIMAL(10, 2),
    improvement_surcharge   DECIMAL(10, 2),
    total_amount            DECIMAL(10, 2),
    congestion_surcharge    DECIMAL(10, 2),
    airport_fee             DECIMAL(10, 2),
    duration_minutes        BIGINT,
    avg_speed_mph           DOUBLE,
    cost_per_mile           DOUBLE,
    tip_percentage          DOUBLE,
    pickup_date             DATE,
    pickup_hour             INT,
    is_weekend              BOOLEAN
);

-- Deduplication: ROW_NUMBER partitioned by natural key, keeping latest ingestion
INSERT INTO iceberg_catalog.silver.cleaned_trips
WITH deduped AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
                         PULocationID, DOLocationID, fare_amount, total_amount
            ORDER BY ingestion_ts DESC
        ) AS rn
    FROM iceberg_catalog.bronze.raw_trips
    WHERE tpep_pickup_datetime IS NOT NULL
      AND tpep_dropoff_datetime IS NOT NULL
      AND trip_distance >= 0
      AND fare_amount >= 0
      AND CAST(tpep_pickup_datetime AS DATE) >= DATE '2024-01-01'
      AND CAST(tpep_pickup_datetime AS DATE) <  DATE '2024-02-01'
)
SELECT
    CAST(MD5(CONCAT_WS('|',
        CAST(VendorID AS STRING),
        CAST(tpep_pickup_datetime AS STRING),
        CAST(tpep_dropoff_datetime AS STRING),
        CAST(PULocationID AS STRING),
        CAST(DOLocationID AS STRING),
        CAST(fare_amount AS STRING),
        CAST(total_amount AS STRING)
    )) AS STRING) AS trip_id,
    CAST(VendorID AS INT)       AS vendor_id,
    CAST(RatecodeID AS INT)     AS rate_code_id,
    CAST(PULocationID AS INT)   AS pickup_location_id,
    CAST(DOLocationID AS INT)   AS dropoff_location_id,
    CAST(payment_type AS INT)   AS payment_type_id,
    tpep_pickup_datetime        AS pickup_datetime,
    tpep_dropoff_datetime       AS dropoff_datetime,
    CAST(passenger_count AS INT) AS passenger_count,
    trip_distance               AS trip_distance_miles,
    store_and_fwd_flag,
    CAST(ROUND(fare_amount, 2)             AS DECIMAL(10, 2)) AS fare_amount,
    CAST(ROUND(extra, 2)                   AS DECIMAL(10, 2)) AS extra_amount,
    CAST(ROUND(mta_tax, 2)                 AS DECIMAL(10, 2)) AS mta_tax,
    CAST(ROUND(tip_amount, 2)              AS DECIMAL(10, 2)) AS tip_amount,
    CAST(ROUND(tolls_amount, 2)            AS DECIMAL(10, 2)) AS tolls_amount,
    CAST(ROUND(improvement_surcharge, 2)   AS DECIMAL(10, 2)) AS improvement_surcharge,
    CAST(ROUND(total_amount, 2)            AS DECIMAL(10, 2)) AS total_amount,
    CAST(ROUND(congestion_surcharge, 2)    AS DECIMAL(10, 2)) AS congestion_surcharge,
    CAST(ROUND(Airport_fee, 2)             AS DECIMAL(10, 2)) AS airport_fee,
    CAST(TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) AS BIGINT) AS duration_minutes,
    CASE
        WHEN TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) > 0
        THEN ROUND(
            trip_distance / (CAST(TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) AS DOUBLE) / 60.0),
            2
        )
        ELSE NULL
    END AS avg_speed_mph,
    CASE
        WHEN trip_distance > 0
        THEN ROUND(fare_amount / trip_distance, 2)
        ELSE NULL
    END AS cost_per_mile,
    CASE
        WHEN fare_amount > 0
        THEN ROUND((tip_amount / fare_amount) * 100, 2)
        ELSE NULL
    END AS tip_percentage,
    CAST(tpep_pickup_datetime AS DATE) AS pickup_date,
    CAST(EXTRACT(HOUR FROM tpep_pickup_datetime) AS INT) AS pickup_hour,
    CASE
        WHEN DAYOFWEEK(tpep_pickup_datetime) IN (1, 7) THEN TRUE
        ELSE FALSE
    END AS is_weekend
FROM deduped
WHERE rn = 1;


## 9. Flink SQL: Combined Bronze + Silver Pipeline

This single file runs both layers sequentially. Useful for understanding the
complete Flink processing flow in one place.

Run with: `sql-client.sh embedded -i 00-init.sql -f 05-run-all.sql`

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/05-run-all.sql
-- =============================================================================
-- Pipeline 01: Flink SQL - Full Pipeline (Bronze + Silver)
-- =============================================================================
-- Run with init: sql-client.sh embedded -i 00-init.sql -f 05-run-all.sql
-- =============================================================================

-- ═══════════════════════════════════════════════════════════════════════════════
-- BRONZE LAYER: Raw data from Kafka → Iceberg
-- ═══════════════════════════════════════════════════════════════════════════════

USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS bronze;

CREATE TABLE IF NOT EXISTS bronze.raw_trips (
    VendorID                BIGINT,
    tpep_pickup_datetime    TIMESTAMP(3),
    tpep_dropoff_datetime   TIMESTAMP(3),
    passenger_count         BIGINT,
    trip_distance           DOUBLE,
    RatecodeID              BIGINT,
    store_and_fwd_flag      STRING,
    PULocationID            BIGINT,
    DOLocationID            BIGINT,
    payment_type            BIGINT,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    congestion_surcharge    DOUBLE,
    Airport_fee             DOUBLE,
    ingestion_ts            TIMESTAMP(3)
);

-- Switch back to default catalog for Kafka source table reference
USE CATALOG default_catalog;
USE default_database;

-- Insert from Kafka into Bronze Iceberg table
INSERT INTO iceberg_catalog.bronze.raw_trips
SELECT
    VendorID,
    TO_TIMESTAMP(tpep_pickup_datetime, 'yyyy-MM-dd''T''HH:mm:ss')   AS tpep_pickup_datetime,
    TO_TIMESTAMP(tpep_dropoff_datetime, 'yyyy-MM-dd''T''HH:mm:ss')  AS tpep_dropoff_datetime,
    passenger_count,
    trip_distance,
    RatecodeID,
    store_and_fwd_flag,
    PULocationID,
    DOLocationID,
    payment_type,
    fare_amount,
    extra,
    mta_tax,
    tip_amount,
    tolls_amount,
    improvement_surcharge,
    total_amount,
    congestion_surcharge,
    Airport_fee,
    CURRENT_TIMESTAMP AS ingestion_ts
FROM kafka_raw_trips;

-- ═══════════════════════════════════════════════════════════════════════════════
-- SILVER LAYER: Cleaned + enriched data from Bronze → Silver
-- ═══════════════════════════════════════════════════════════════════════════════

USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS silver;

CREATE TABLE IF NOT EXISTS silver.cleaned_trips (
    trip_id                 STRING,
    vendor_id               INT,
    rate_code_id            INT,
    pickup_location_id      INT,
    dropoff_location_id     INT,
    payment_type_id         INT,
    pickup_datetime         TIMESTAMP(3),
    dropoff_datetime        TIMESTAMP(3),
    passenger_count         INT,
    trip_distance_miles     DOUBLE,
    store_and_fwd_flag      STRING,
    fare_amount             DECIMAL(10, 2),
    extra_amount            DECIMAL(10, 2),
    mta_tax                 DECIMAL(10, 2),
    tip_amount              DECIMAL(10, 2),
    tolls_amount            DECIMAL(10, 2),
    improvement_surcharge   DECIMAL(10, 2),
    total_amount            DECIMAL(10, 2),
    congestion_surcharge    DECIMAL(10, 2),
    airport_fee             DECIMAL(10, 2),
    duration_minutes        BIGINT,
    avg_speed_mph           DOUBLE,
    cost_per_mile           DOUBLE,
    tip_percentage          DOUBLE,
    pickup_date             DATE,
    pickup_hour             INT,
    is_weekend              BOOLEAN
);

-- Deduplication: ROW_NUMBER partitioned by natural key, keeping latest ingestion
INSERT INTO iceberg_catalog.silver.cleaned_trips
WITH deduped AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
                         PULocationID, DOLocationID, fare_amount, total_amount
            ORDER BY ingestion_ts DESC
        ) AS rn
    FROM iceberg_catalog.bronze.raw_trips
    WHERE tpep_pickup_datetime IS NOT NULL
      AND tpep_dropoff_datetime IS NOT NULL
      AND trip_distance >= 0
      AND fare_amount >= 0
      AND CAST(tpep_pickup_datetime AS DATE) >= DATE '2024-01-01'
      AND CAST(tpep_pickup_datetime AS DATE) <  DATE '2024-02-01'
)
SELECT
    CAST(MD5(CONCAT_WS('|',
        CAST(VendorID AS STRING),
        CAST(tpep_pickup_datetime AS STRING),
        CAST(tpep_dropoff_datetime AS STRING),
        CAST(PULocationID AS STRING),
        CAST(DOLocationID AS STRING),
        CAST(fare_amount AS STRING),
        CAST(total_amount AS STRING)
    )) AS STRING) AS trip_id,
    CAST(VendorID AS INT)       AS vendor_id,
    CAST(RatecodeID AS INT)     AS rate_code_id,
    CAST(PULocationID AS INT)   AS pickup_location_id,
    CAST(DOLocationID AS INT)   AS dropoff_location_id,
    CAST(payment_type AS INT)   AS payment_type_id,
    tpep_pickup_datetime        AS pickup_datetime,
    tpep_dropoff_datetime       AS dropoff_datetime,
    CAST(passenger_count AS INT) AS passenger_count,
    trip_distance               AS trip_distance_miles,
    store_and_fwd_flag,
    CAST(ROUND(fare_amount, 2)             AS DECIMAL(10, 2)) AS fare_amount,
    CAST(ROUND(extra, 2)                   AS DECIMAL(10, 2)) AS extra_amount,
    CAST(ROUND(mta_tax, 2)                 AS DECIMAL(10, 2)) AS mta_tax,
    CAST(ROUND(tip_amount, 2)              AS DECIMAL(10, 2)) AS tip_amount,
    CAST(ROUND(tolls_amount, 2)            AS DECIMAL(10, 2)) AS tolls_amount,
    CAST(ROUND(improvement_surcharge, 2)   AS DECIMAL(10, 2)) AS improvement_surcharge,
    CAST(ROUND(total_amount, 2)            AS DECIMAL(10, 2)) AS total_amount,
    CAST(ROUND(congestion_surcharge, 2)    AS DECIMAL(10, 2)) AS congestion_surcharge,
    CAST(ROUND(Airport_fee, 2)             AS DECIMAL(10, 2)) AS airport_fee,
    CAST(TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) AS BIGINT) AS duration_minutes,
    CASE
        WHEN TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) > 0
        THEN ROUND(
            trip_distance / (CAST(TIMESTAMPDIFF(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime) AS DOUBLE) / 60.0),
            2
        )
        ELSE NULL
    END AS avg_speed_mph,
    CASE
        WHEN trip_distance > 0
        THEN ROUND(fare_amount / trip_distance, 2)
        ELSE NULL
    END AS cost_per_mile,
    CASE
        WHEN fare_amount > 0
        THEN ROUND((tip_amount / fare_amount) * 100, 2)
        ELSE NULL
    END AS tip_percentage,
    CAST(tpep_pickup_datetime AS DATE) AS pickup_date,
    CAST(EXTRACT(HOUR FROM tpep_pickup_datetime) AS INT) AS pickup_hour,
    CASE
        WHEN DAYOFWEEK(tpep_pickup_datetime) IN (1, 7) THEN TRUE
        ELSE FALSE
    END AS is_weekend
FROM deduped
WHERE rn = 1;


## 10. dbt Project Configuration

dbt (data build tool) handles the **Silver → Gold** transformation layer.
It reads Iceberg Silver tables via DuckDB's `iceberg_scan()` function and
builds dimensional models (facts, dimensions, analytics marts).

### 10.1 Project Config (dbt_project.yml)

Defines project structure, materialization strategies, and seed column types.

Key patterns:
- `stg_yellow_trips` is materialized as `table` (not view) because it reads from Iceberg via DuckDB
- Intermediate models are `view` (lightweight, computed on-the-fly)
- Marts are `table` (materialized for query performance)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/dbt_project.yml
name: 'nyc_taxi_dbt'
version: '1.0.0'
config-version: 2

profile: 'nyc_taxi_dbt'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:
  - "target"
  - "dbt_packages"

seeds:
  nyc_taxi_dbt:
    +schema: raw
    taxi_zone_lookup:
      +column_types:
        LocationID: INTEGER
        Borough: VARCHAR
        Zone: VARCHAR
        service_zone: VARCHAR
    payment_type_lookup:
      +column_types:
        payment_type_id: INTEGER
        payment_type_name: VARCHAR
    rate_code_lookup:
      +column_types:
        rate_code_id: INTEGER
        rate_code_name: VARCHAR

models:
  nyc_taxi_dbt:
    +materialized: view
    staging:
      +materialized: view
      +schema: staging
      stg_yellow_trips:
        +materialized: table
    intermediate:
      +materialized: view
      +schema: intermediate
    marts:
      core:
        +materialized: table
        +schema: marts
      analytics:
        +materialized: table
        +schema: marts

### 9b. Streaming Mode Alternative (`07-streaming-bronze.sql`)> **Batch vs Streaming:** The default `05-bronze.sql` uses batch mode (process available data, stop).> This file uses **streaming mode** — it runs continuously, processing events as they arrive in Kafka.> Same SQL, same tables, same catalog — only the runtime mode changes.When to use:- **Batch mode** (`05-bronze.sql`): Catch-up processing, backfill, benchmarking- **Streaming mode** (`07-streaming-bronze.sql`): Continuous real-time processing

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/flink/sql/07-streaming-bronze.sql-- =============================================================================-- Pipeline 01: Streaming Bronze Layer (Kafka → Iceberg, continuous)-- =============================================================================-- Alternative to 05-bronze.sql that runs in STREAMING mode.-- Uses event_time watermarks defined in 00-init.sql for event-time processing.---- Run: sql-client.sh embedded -i 00-init-streaming.sql -f 07-streaming-bronze.sql--   (or override execution.runtime-mode inline)---- NOTE: This job runs continuously until cancelled. It will process new Kafka-- events as they arrive and write them to the Bronze Iceberg table.-- =============================================================================-- Override to streaming mode (00-init.sql sets batch by default)SET 'execution.runtime-mode' = 'streaming';-- Don't wait for each INSERT to complete (streaming jobs run indefinitely)RESET 'table.dml-sync';-- Checkpoint every 30s for exactly-once guaranteesSET 'execution.checkpointing.interval' = '30s';USE CATALOG iceberg_catalog;CREATE DATABASE IF NOT EXISTS bronze;CREATE TABLE IF NOT EXISTS bronze.raw_trips (    VendorID                BIGINT,    tpep_pickup_datetime    TIMESTAMP(3),    tpep_dropoff_datetime   TIMESTAMP(3),    passenger_count         BIGINT,    trip_distance           DOUBLE,    RatecodeID              BIGINT,    store_and_fwd_flag      STRING,    PULocationID            BIGINT,    DOLocationID            BIGINT,    payment_type            BIGINT,    fare_amount             DOUBLE,    extra                   DOUBLE,    mta_tax                 DOUBLE,    tip_amount              DOUBLE,    tolls_amount            DOUBLE,    improvement_surcharge   DOUBLE,    total_amount            DOUBLE,    congestion_surcharge    DOUBLE,    Airport_fee             DOUBLE,    ingestion_ts            TIMESTAMP(3));-- Switch back to default catalog for Kafka source table referenceUSE CATALOG default_catalog;USE default_database;-- Streaming INSERT: runs continuously, processing new Kafka events as they arriveINSERT INTO iceberg_catalog.bronze.raw_tripsSELECT    VendorID,    TO_TIMESTAMP(tpep_pickup_datetime, 'yyyy-MM-dd''T''HH:mm:ss')   AS tpep_pickup_datetime,    TO_TIMESTAMP(tpep_dropoff_datetime, 'yyyy-MM-dd''T''HH:mm:ss')  AS tpep_dropoff_datetime,    passenger_count,    trip_distance,    RatecodeID,    store_and_fwd_flag,    PULocationID,    DOLocationID,    payment_type,    fare_amount,    extra,    mta_tax,    tip_amount,    tolls_amount,    improvement_surcharge,    total_amount,    congestion_surcharge,    Airport_fee,    CURRENT_TIMESTAMP AS ingestion_tsFROM kafka_raw_trips;

### 10.2 Connection Profile (profiles.yml)

Connects dbt to DuckDB with Iceberg + S3 (MinIO) extensions.

Key settings:
- `extensions: [httpfs, parquet, iceberg]` → DuckDB can read Iceberg tables over S3
- `s3_endpoint: minio:9000` → Points to MinIO container
- `s3_url_style: path` → Required for MinIO (vs virtual-hosted for real S3)
- `memory_limit: 2GB` → DuckDB in-process memory

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/profiles.yml
# =============================================================================
# dbt Profile for Pipeline 01: Kafka + Flink + Iceberg
# =============================================================================
# Uses DuckDB with iceberg and httpfs extensions to read Iceberg tables
# from MinIO (S3-compatible) object storage.
# =============================================================================

nyc_taxi_dbt:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: /tmp/p01_dbt.duckdb
      schema: main
      threads: 4
      extensions:
        - httpfs
        - parquet
        - iceberg
      settings:
        memory_limit: "2GB"
        s3_endpoint: "minio:9000"
        s3_access_key_id: "minioadmin"
        s3_secret_access_key: "minioadmin"
        s3_url_style: "path"
        s3_use_ssl: false
        s3_region: "us-east-1"

### 10.3 Package Dependencies (packages.yml)

Only dependency: `dbt-utils` for utility macros (`date_spine`, `accepted_range`, etc.)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: [">=1.1.0", "<2.0.0"]

### 10.4 Source Definition (sources.yml)

> **Production feature:** `freshness` config enables `dbt source freshness` checks.
> - `warn_after: 30 days` — alerts if data is stale for batch/historical loads
> - `error_after: 365 days` — hard failure for ancient data
> - `loaded_at_field: pickup_datetime` — which column to check for freshness

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/sources/sources.yml
version: 2

sources:
  - name: raw_nyc_taxi
    description: "NYC TLC Yellow Taxi data - Iceberg Silver table via DuckDB iceberg_scan"
    schema: main
    freshness:
      # Adjust thresholds for your SLA (using generous window for historical data)
      warn_after: {count: 30, period: day}
      error_after: {count: 365, period: day}
    loaded_at_field: pickup_datetime
    tables:
      - name: raw_yellow_trips
        description: "Silver-layer cleaned trips from Iceberg (stream-processed by Flink)"
        config:
          external_location: "iceberg_scan('s3://warehouse/silver/cleaned_trips', allow_moved_paths = true)"


## 11. dbt Seeds: Reference Data

Seeds are CSV files that dbt loads as tables. They provide lookup/reference data
for enriching trip records with human-readable names.

### 11.1 Payment Type Lookup

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/seeds/payment_type_lookup.csv
payment_type_id,payment_type_name
1,Credit card
2,Cash
3,No charge
4,Dispute
5,Unknown
6,Voided trip

### 11.2 Rate Code Lookup

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/seeds/rate_code_lookup.csv
rate_code_id,rate_code_name
1,Standard rate
2,JFK
3,Newark
4,Nassau or Westchester
5,Negotiated fare
6,Group ride
99,Unknown

### 11.3 Seed Properties (column types and tests)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/seeds/seed_properties.yml
version: 2

seeds:
  - name: taxi_zone_lookup
    description: "NYC TLC Taxi Zone lookup table (~265 zones)"
    columns:
      - name: LocationID
        tests: [unique, not_null]
      - name: Borough
        tests: [not_null]

  - name: payment_type_lookup
    description: "Payment type ID to description mapping"
    columns:
      - name: payment_type_id
        tests: [unique, not_null]

  - name: rate_code_lookup
    description: "Rate code ID to description mapping"
    columns:
      - name: rate_code_id
        tests: [unique, not_null]

### 11.4 Taxi Zone Lookup (265 NYC zones)

Maps LocationID to borough and zone name. This is the official NYC TLC taxi zone reference.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/seeds/taxi_zone_lookup.csv
"LocationID","Borough","Zone","service_zone"
1,"EWR","Newark Airport","EWR"
2,"Queens","Jamaica Bay","Boro Zone"
3,"Bronx","Allerton/Pelham Gardens","Boro Zone"
4,"Manhattan","Alphabet City","Yellow Zone"
5,"Staten Island","Arden Heights","Boro Zone"
6,"Staten Island","Arrochar/Fort Wadsworth","Boro Zone"
7,"Queens","Astoria","Boro Zone"
8,"Queens","Astoria Park","Boro Zone"
9,"Queens","Auburndale","Boro Zone"
10,"Queens","Baisley Park","Boro Zone"
11,"Brooklyn","Bath Beach","Boro Zone"
12,"Manhattan","Battery Park","Yellow Zone"
13,"Manhattan","Battery Park City","Yellow Zone"
14,"Brooklyn","Bay Ridge","Boro Zone"
15,"Queens","Bay Terrace/Fort Totten","Boro Zone"
16,"Queens","Bayside","Boro Zone"
17,"Brooklyn","Bedford","Boro Zone"
18,"Bronx","Bedford Park","Boro Zone"
19,"Queens","Bellerose","Boro Zone"
20,"Bronx","Belmont","Boro Zone"
21,"Brooklyn","Bensonhurst East","Boro Zone"
22,"Brooklyn","Bensonhurst West","Boro Zone"
23,"Staten Island","Bloomfield/Emerson Hill","Boro Zone"
24,"Manhattan","Bloomingdale","Yellow Zone"
25,"Brooklyn","Boerum Hill","Boro Zone"
26,"Brooklyn","Borough Park","Boro Zone"
27,"Queens","Breezy Point/Fort Tilden/Riis Beach","Boro Zone"
28,"Queens","Briarwood/Jamaica Hills","Boro Zone"
29,"Brooklyn","Brighton Beach","Boro Zone"
30,"Queens","Broad Channel","Boro Zone"
31,"Bronx","Bronx Park","Boro Zone"
32,"Bronx","Bronxdale","Boro Zone"
33,"Brooklyn","Brooklyn Heights","Boro Zone"
34,"Brooklyn","Brooklyn Navy Yard","Boro Zone"
35,"Brooklyn","Brownsville","Boro Zone"
36,"Brooklyn","Bushwick North","Boro Zone"
37,"Brooklyn","Bushwick South","Boro Zone"
38,"Queens","Cambria Heights","Boro Zone"
39,"Brooklyn","Canarsie","Boro Zone"
40,"Brooklyn","Carroll Gardens","Boro Zone"
41,"Manhattan","Central Harlem","Boro Zone"
42,"Manhattan","Central Harlem North","Boro Zone"
43,"Manhattan","Central Park","Yellow Zone"
44,"Staten Island","Charleston/Tottenville","Boro Zone"
45,"Manhattan","Chinatown","Yellow Zone"
46,"Bronx","City Island","Boro Zone"
47,"Bronx","Claremont/Bathgate","Boro Zone"
48,"Manhattan","Clinton East","Yellow Zone"
49,"Brooklyn","Clinton Hill","Boro Zone"
50,"Manhattan","Clinton West","Yellow Zone"
51,"Bronx","Co-Op City","Boro Zone"
52,"Brooklyn","Cobble Hill","Boro Zone"
53,"Queens","College Point","Boro Zone"
54,"Brooklyn","Columbia Street","Boro Zone"
55,"Brooklyn","Coney Island","Boro Zone"
56,"Queens","Corona","Boro Zone"
57,"Queens","Corona","Boro Zone"
58,"Bronx","Country Club","Boro Zone"
59,"Bronx","Crotona Park","Boro Zone"
60,"Bronx","Crotona Park East","Boro Zone"
61,"Brooklyn","Crown Heights North","Boro Zone"
62,"Brooklyn","Crown Heights South","Boro Zone"
63,"Brooklyn","Cypress Hills","Boro Zone"
64,"Queens","Douglaston","Boro Zone"
65,"Brooklyn","Downtown Brooklyn/MetroTech","Boro Zone"
66,"Brooklyn","DUMBO/Vinegar Hill","Boro Zone"
67,"Brooklyn","Dyker Heights","Boro Zone"
68,"Manhattan","East Chelsea","Yellow Zone"
69,"Bronx","East Concourse/Concourse Village","Boro Zone"
70,"Queens","East Elmhurst","Boro Zone"
71,"Brooklyn","East Flatbush/Farragut","Boro Zone"
72,"Brooklyn","East Flatbush/Remsen Village","Boro Zone"
73,"Queens","East Flushing","Boro Zone"
74,"Manhattan","East Harlem North","Boro Zone"
75,"Manhattan","East Harlem South","Boro Zone"
76,"Brooklyn","East New York","Boro Zone"
77,"Brooklyn","East New York/Pennsylvania Avenue","Boro Zone"
78,"Bronx","East Tremont","Boro Zone"
79,"Manhattan","East Village","Yellow Zone"
80,"Brooklyn","East Williamsburg","Boro Zone"
81,"Bronx","Eastchester","Boro Zone"
82,"Queens","Elmhurst","Boro Zone"
83,"Queens","Elmhurst/Maspeth","Boro Zone"
84,"Staten Island","Eltingville/Annadale/Prince's Bay","Boro Zone"
85,"Brooklyn","Erasmus","Boro Zone"
86,"Queens","Far Rockaway","Boro Zone"
87,"Manhattan","Financial District North","Yellow Zone"
88,"Manhattan","Financial District South","Yellow Zone"
89,"Brooklyn","Flatbush/Ditmas Park","Boro Zone"
90,"Manhattan","Flatiron","Yellow Zone"
91,"Brooklyn","Flatlands","Boro Zone"
92,"Queens","Flushing","Boro Zone"
93,"Queens","Flushing Meadows-Corona Park","Boro Zone"
94,"Bronx","Fordham South","Boro Zone"
95,"Queens","Forest Hills","Boro Zone"
96,"Queens","Forest Park/Highland Park","Boro Zone"
97,"Brooklyn","Fort Greene","Boro Zone"
98,"Queens","Fresh Meadows","Boro Zone"
99,"Staten Island","Freshkills Park","Boro Zone"
100,"Manhattan","Garment District","Yellow Zone"
101,"Queens","Glen Oaks","Boro Zone"
102,"Queens","Glendale","Boro Zone"
103,"Manhattan","Governor's Island/Ellis Island/Liberty Island","Yellow Zone"
104,"Manhattan","Governor's Island/Ellis Island/Liberty Island","Yellow Zone"
105,"Manhattan","Governor's Island/Ellis Island/Liberty Island","Yellow Zone"
106,"Brooklyn","Gowanus","Boro Zone"
107,"Manhattan","Gramercy","Yellow Zone"
108,"Brooklyn","Gravesend","Boro Zone"
109,"Staten Island","Great Kills","Boro Zone"
110,"Staten Island","Great Kills Park","Boro Zone"
111,"Brooklyn","Green-Wood Cemetery","Boro Zone"
112,"Brooklyn","Greenpoint","Boro Zone"
113,"Manhattan","Greenwich Village North","Yellow Zone"
114,"Manhattan","Greenwich Village South","Yellow Zone"
115,"Staten Island","Grymes Hill/Clifton","Boro Zone"
116,"Manhattan","Hamilton Heights","Boro Zone"
117,"Queens","Hammels/Arverne","Boro Zone"
118,"Staten Island","Heartland Village/Todt Hill","Boro Zone"
119,"Bronx","Highbridge","Boro Zone"
120,"Manhattan","Highbridge Park","Boro Zone"
121,"Queens","Hillcrest/Pomonok","Boro Zone"
122,"Queens","Hollis","Boro Zone"
123,"Brooklyn","Homecrest","Boro Zone"
124,"Queens","Howard Beach","Boro Zone"
125,"Manhattan","Hudson Sq","Yellow Zone"
126,"Bronx","Hunts Point","Boro Zone"
127,"Manhattan","Inwood","Boro Zone"
128,"Manhattan","Inwood Hill Park","Boro Zone"
129,"Queens","Jackson Heights","Boro Zone"
130,"Queens","Jamaica","Boro Zone"
131,"Queens","Jamaica Estates","Boro Zone"
132,"Queens","JFK Airport","Airports"
133,"Brooklyn","Kensington","Boro Zone"
134,"Queens","Kew Gardens","Boro Zone"
135,"Queens","Kew Gardens Hills","Boro Zone"
136,"Bronx","Kingsbridge Heights","Boro Zone"
137,"Manhattan","Kips Bay","Yellow Zone"
138,"Queens","LaGuardia Airport","Airports"
139,"Queens","Laurelton","Boro Zone"
140,"Manhattan","Lenox Hill East","Yellow Zone"
141,"Manhattan","Lenox Hill West","Yellow Zone"
142,"Manhattan","Lincoln Square East","Yellow Zone"
143,"Manhattan","Lincoln Square West","Yellow Zone"
144,"Manhattan","Little Italy/NoLiTa","Yellow Zone"
145,"Queens","Long Island City/Hunters Point","Boro Zone"
146,"Queens","Long Island City/Queens Plaza","Boro Zone"
147,"Bronx","Longwood","Boro Zone"
148,"Manhattan","Lower East Side","Yellow Zone"
149,"Brooklyn","Madison","Boro Zone"
150,"Brooklyn","Manhattan Beach","Boro Zone"
151,"Manhattan","Manhattan Valley","Yellow Zone"
152,"Manhattan","Manhattanville","Boro Zone"
153,"Manhattan","Marble Hill","Boro Zone"
154,"Brooklyn","Marine Park/Floyd Bennett Field","Boro Zone"
155,"Brooklyn","Marine Park/Mill Basin","Boro Zone"
156,"Staten Island","Mariners Harbor","Boro Zone"
157,"Queens","Maspeth","Boro Zone"
158,"Manhattan","Meatpacking/West Village West","Yellow Zone"
159,"Bronx","Melrose South","Boro Zone"
160,"Queens","Middle Village","Boro Zone"
161,"Manhattan","Midtown Center","Yellow Zone"
162,"Manhattan","Midtown East","Yellow Zone"
163,"Manhattan","Midtown North","Yellow Zone"
164,"Manhattan","Midtown South","Yellow Zone"
165,"Brooklyn","Midwood","Boro Zone"
166,"Manhattan","Morningside Heights","Boro Zone"
167,"Bronx","Morrisania/Melrose","Boro Zone"
168,"Bronx","Mott Haven/Port Morris","Boro Zone"
169,"Bronx","Mount Hope","Boro Zone"
170,"Manhattan","Murray Hill","Yellow Zone"
171,"Queens","Murray Hill-Queens","Boro Zone"
172,"Staten Island","New Dorp/Midland Beach","Boro Zone"
173,"Queens","North Corona","Boro Zone"
174,"Bronx","Norwood","Boro Zone"
175,"Queens","Oakland Gardens","Boro Zone"
176,"Staten Island","Oakwood","Boro Zone"
177,"Brooklyn","Ocean Hill","Boro Zone"
178,"Brooklyn","Ocean Parkway South","Boro Zone"
179,"Queens","Old Astoria","Boro Zone"
180,"Queens","Ozone Park","Boro Zone"
181,"Brooklyn","Park Slope","Boro Zone"
182,"Bronx","Parkchester","Boro Zone"
183,"Bronx","Pelham Bay","Boro Zone"
184,"Bronx","Pelham Bay Park","Boro Zone"
185,"Bronx","Pelham Parkway","Boro Zone"
186,"Manhattan","Penn Station/Madison Sq West","Yellow Zone"
187,"Staten Island","Port Richmond","Boro Zone"
188,"Brooklyn","Prospect-Lefferts Gardens","Boro Zone"
189,"Brooklyn","Prospect Heights","Boro Zone"
190,"Brooklyn","Prospect Park","Boro Zone"
191,"Queens","Queens Village","Boro Zone"
192,"Queens","Queensboro Hill","Boro Zone"
193,"Queens","Queensbridge/Ravenswood","Boro Zone"
194,"Manhattan","Randalls Island","Yellow Zone"
195,"Brooklyn","Red Hook","Boro Zone"
196,"Queens","Rego Park","Boro Zone"
197,"Queens","Richmond Hill","Boro Zone"
198,"Queens","Ridgewood","Boro Zone"
199,"Bronx","Rikers Island","Boro Zone"
200,"Bronx","Riverdale/North Riverdale/Fieldston","Boro Zone"
201,"Queens","Rockaway Park","Boro Zone"
202,"Manhattan","Roosevelt Island","Boro Zone"
203,"Queens","Rosedale","Boro Zone"
204,"Staten Island","Rossville/Woodrow","Boro Zone"
205,"Queens","Saint Albans","Boro Zone"
206,"Staten Island","Saint George/New Brighton","Boro Zone"
207,"Queens","Saint Michaels Cemetery/Woodside","Boro Zone"
208,"Bronx","Schuylerville/Edgewater Park","Boro Zone"
209,"Manhattan","Seaport","Yellow Zone"
210,"Brooklyn","Sheepshead Bay","Boro Zone"
211,"Manhattan","SoHo","Yellow Zone"
212,"Bronx","Soundview/Bruckner","Boro Zone"
213,"Bronx","Soundview/Castle Hill","Boro Zone"
214,"Staten Island","South Beach/Dongan Hills","Boro Zone"
215,"Queens","South Jamaica","Boro Zone"
216,"Queens","South Ozone Park","Boro Zone"
217,"Brooklyn","South Williamsburg","Boro Zone"
218,"Queens","Springfield Gardens North","Boro Zone"
219,"Queens","Springfield Gardens South","Boro Zone"
220,"Bronx","Spuyten Duyvil/Kingsbridge","Boro Zone"
221,"Staten Island","Stapleton","Boro Zone"
222,"Brooklyn","Starrett City","Boro Zone"
223,"Queens","Steinway","Boro Zone"
224,"Manhattan","Stuy Town/Peter Cooper Village","Yellow Zone"
225,"Brooklyn","Stuyvesant Heights","Boro Zone"
226,"Queens","Sunnyside","Boro Zone"
227,"Brooklyn","Sunset Park East","Boro Zone"
228,"Brooklyn","Sunset Park West","Boro Zone"
229,"Manhattan","Sutton Place/Turtle Bay North","Yellow Zone"
230,"Manhattan","Times Sq/Theatre District","Yellow Zone"
231,"Manhattan","TriBeCa/Civic Center","Yellow Zone"
232,"Manhattan","Two Bridges/Seward Park","Yellow Zone"
233,"Manhattan","UN/Turtle Bay South","Yellow Zone"
234,"Manhattan","Union Sq","Yellow Zone"
235,"Bronx","University Heights/Morris Heights","Boro Zone"
236,"Manhattan","Upper East Side North","Yellow Zone"
237,"Manhattan","Upper East Side South","Yellow Zone"
238,"Manhattan","Upper West Side North","Yellow Zone"
239,"Manhattan","Upper West Side South","Yellow Zone"
240,"Bronx","Van Cortlandt Park","Boro Zone"
241,"Bronx","Van Cortlandt Village","Boro Zone"
242,"Bronx","Van Nest/Morris Park","Boro Zone"
243,"Manhattan","Washington Heights North","Boro Zone"
244,"Manhattan","Washington Heights South","Boro Zone"
245,"Staten Island","West Brighton","Boro Zone"
246,"Manhattan","West Chelsea/Hudson Yards","Yellow Zone"
247,"Bronx","West Concourse","Boro Zone"
248,"Bronx","West Farms/Bronx River","Boro Zone"
249,"Manhattan","West Village","Yellow Zone"
250,"Bronx","Westchester Village/Unionport","Boro Zone"
251,"Staten Island","Westerleigh","Boro Zone"
252,"Queens","Whitestone","Boro Zone"
253,"Queens","Willets Point","Boro Zone"
254,"Bronx","Williamsbridge/Olinville","Boro Zone"
255,"Brooklyn","Williamsburg (North Side)","Boro Zone"
256,"Brooklyn","Williamsburg (South Side)","Boro Zone"
257,"Brooklyn","Windsor Terrace","Boro Zone"
258,"Queens","Woodhaven","Boro Zone"
259,"Bronx","Woodlawn/Wakefield","Boro Zone"
260,"Queens","Woodside","Boro Zone"
261,"Manhattan","World Trade Center","Yellow Zone"
262,"Manhattan","Yorkville East","Yellow Zone"
263,"Manhattan","Yorkville West","Yellow Zone"
264,"Unknown","N/A","N/A"
265,"N/A","Outside of NYC","N/A"

### 11.4 Vendor LookupMaps VendorID to vendor name/abbreviation. NYC TLC has two TPEP providers:- 1 = Creative Mobile Technologies (CMT)- 2 = VeriFone Inc. (VFI)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/seeds/vendor_lookup.csvvendor_id,vendor_name,vendor_abbr1,Creative Mobile Technologies,CMT2,VeriFone Inc.,VFI

## 12. dbt Macros: Cross-Database Compatibility

These macros use dbt's `adapter.dispatch()` pattern to work across DuckDB, PostgreSQL (RisingWave),
and Spark. This means the same dbt models can be reused in Pipelines 01-11.

### 12.1 cents_to_dollars

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/macros/cents_to_dollars.sql
/*
    Macro: Convert a cents column to dollars with rounding.

    Usage:
        {{ cents_to_dollars('fare_cents') }}
        {{ cents_to_dollars('fare_cents', 4) }}
*/

{% macro cents_to_dollars(column_name, precision=2) %}
    round(cast({{ column_name }} as decimal(10, {{ precision }})) / 100, {{ precision }})
{% endmacro %}

### 12.2 dayname_compat, monthname_compat, mode_compat

Three adapter-dispatched macros that handle DuckDB/PostgreSQL/Spark syntax differences:
- `dayname_compat()` → `dayname()` (DuckDB) vs `to_char(..., 'Day')` (Postgres) vs `date_format(..., 'EEEE')` (Spark)
- `monthname_compat()` → Same pattern for month names
- `mode_compat()` → Statistical mode (most common value)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/macros/dayname_compat.sql
/*
    Macro: Get day-of-week name from a timestamp.
    Adapter-dispatched for DuckDB, PostgreSQL (RisingWave), and Spark.

    Usage:
        {{ dayname_compat('pickup_datetime') }}
*/

{% macro dayname_compat(col) %}
    {{ return(adapter.dispatch('dayname_compat', 'nyc_taxi_dbt')(col)) }}
{% endmacro %}

{% macro duckdb__dayname_compat(col) %}
    dayname({{ col }})
{% endmacro %}

{% macro postgres__dayname_compat(col) %}
    trim(to_char({{ col }}, 'Day'))
{% endmacro %}

{% macro spark__dayname_compat(col) %}
    date_format({{ col }}, 'EEEE')
{% endmacro %}


/*
    Macro: Get month name from a timestamp.
    Adapter-dispatched for DuckDB, PostgreSQL (RisingWave), and Spark.
*/

{% macro monthname_compat(col) %}
    {{ return(adapter.dispatch('monthname_compat', 'nyc_taxi_dbt')(col)) }}
{% endmacro %}

{% macro duckdb__monthname_compat(col) %}
    monthname({{ col }})
{% endmacro %}

{% macro postgres__monthname_compat(col) %}
    trim(to_char({{ col }}, 'Month'))
{% endmacro %}

{% macro spark__monthname_compat(col) %}
    date_format({{ col }}, 'MMMM')
{% endmacro %}


/*
    Macro: Statistical mode (most common value).
    Adapter-dispatched for DuckDB, PostgreSQL, and Spark.
*/

{% macro mode_compat(col) %}
    {{ return(adapter.dispatch('mode_compat', 'nyc_taxi_dbt')(col)) }}
{% endmacro %}

{% macro duckdb__mode_compat(col) %}
    mode({{ col }})
{% endmacro %}

{% macro postgres__mode_compat(col) %}
    mode() WITHIN GROUP (ORDER BY {{ col }})
{% endmacro %}

{% macro spark__mode_compat(col) %}
    mode({{ col }})
{% endmacro %}

### 12.3 duration_minutes

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/macros/duration_minutes.sql
/*
    Macro: Calculate duration between two timestamps in minutes.
    Adapter-dispatched for DuckDB, PostgreSQL (RisingWave), and Spark.

    Usage:
        {{ duration_minutes('pickup_datetime', 'dropoff_datetime') }}
*/

{% macro duration_minutes(start_col, end_col) %}
    {{ return(adapter.dispatch('duration_minutes', 'nyc_taxi_dbt')(start_col, end_col)) }}
{% endmacro %}

{% macro duckdb__duration_minutes(start_col, end_col) %}
    datediff('minute', {{ start_col }}, {{ end_col }})
{% endmacro %}

{% macro postgres__duration_minutes(start_col, end_col) %}
    (EXTRACT(EPOCH FROM ({{ end_col }} - {{ start_col }})) / 60)::bigint
{% endmacro %}

{% macro spark__duration_minutes(start_col, end_col) %}
    CAST((UNIX_TIMESTAMP({{ end_col }}) - UNIX_TIMESTAMP({{ start_col }})) / 60 AS BIGINT)
{% endmacro %}

### 12.4 test_positive_value (custom generic test)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/macros/test_positive_value.sql
/*
    Custom generic test: Asserts that all values in a column are >= 0.

    Usage in schema.yml:
        columns:
          - name: fare_amount
            tests:
              - positive_value
*/

{% test positive_value(model, column_name) %}

select
    {{ column_name }} as invalid_value
from {{ model }}
where {{ column_name }} < 0

{% endtest %}

## 13. dbt Staging Models

Staging models are **thin wrappers** over sources. In this pipeline, Flink already
did the heavy lifting (column renaming, type casting, quality filtering), so staging
is mostly a passthrough with minor re-casting for DuckDB type compatibility.

### dbt Lineage (DAG)

```
source(raw_yellow_trips)  seed(payment_type_lookup)  seed(rate_code_lookup)  seed(taxi_zone_lookup)
         │                         │                        │                       │
         ▼                         ▼                        ▼                       ▼
  stg_yellow_trips          stg_payment_types        stg_rate_codes          stg_taxi_zones
         │                         │                        │                       │
         ▼                         │                        │                       │
  int_trip_metrics                 │                        │                       │
    │       │                      │                        │                       │
    ▼       ▼                      │                        │                       │
int_daily  int_hourly              │                        │                       │
 _summary   _patterns              │                        │                       │
    │       │                      │                        │                       │
    ▼       ▼                      ▼                        │                       ▼
mart_daily  mart_hourly      dim_payment_types              │               dim_locations
 _revenue    _demand               │                        │                  │
                                   │                        │                  │
                                   └──────────┬─────────────┘                  │
                                              ▼                                │
                                          fct_trips ◄──────────────────────────┘
                                              │
                                              ▼
                                   mart_location_performance
```

### 13.1 stg_yellow_trips.sql

The main staging model. Since Flink already cleaned the data, this is a passthrough
with safety-net null filtering and DuckDB type re-casting.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/staging/stg_yellow_trips.sql
{#
    Staging model: Yellow taxi trip records (Iceberg pipeline variant)

    This is a simple passthrough since Flink already performed the heavy lifting:
      - Column renaming (VendorID -> vendor_id, etc.)
      - Type casting
      - Data quality filtering (nulls, negative fares, date range)
      - Surrogate key generation (MD5 hash)
      - Computed columns (duration, speed, cost, tip %)

    The source reads the Silver Iceberg table via DuckDB iceberg_scan().
#}

with source as (
    select * from {{ source('raw_nyc_taxi', 'raw_yellow_trips') }}
),

final as (
    select
        -- Flink already generated the surrogate key
        trip_id,

        -- identifiers (already renamed and cast by Flink)
        vendor_id,
        rate_code_id,
        pickup_location_id,
        dropoff_location_id,
        payment_type_id,

        -- timestamps (already parsed by Flink)
        cast(pickup_datetime as timestamp) as pickup_datetime,
        cast(dropoff_datetime as timestamp) as dropoff_datetime,

        -- trip info
        passenger_count,
        trip_distance_miles,
        store_and_fwd_flag,

        -- financials (already rounded by Flink)
        round(cast(fare_amount as decimal(10, 2)), 2) as fare_amount,
        round(cast(extra_amount as decimal(10, 2)), 2) as extra_amount,
        round(cast(mta_tax as decimal(10, 2)), 2) as mta_tax,
        round(cast(tip_amount as decimal(10, 2)), 2) as tip_amount,
        round(cast(tolls_amount as decimal(10, 2)), 2) as tolls_amount,
        round(cast(improvement_surcharge as decimal(10, 2)), 2) as improvement_surcharge,
        round(cast(total_amount as decimal(10, 2)), 2) as total_amount,
        round(cast(congestion_surcharge as decimal(10, 2)), 2) as congestion_surcharge,
        round(cast(airport_fee as decimal(10, 2)), 2) as airport_fee

    from source
    -- Flink already applied quality filters; this is a safety net
    where pickup_datetime is not null
      and dropoff_datetime is not null
)

select * from final

### 13.2 stg_payment_types.sql

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/staging/stg_payment_types.sql
/*
    Staging model: Payment type lookup
    Maps payment_type_id to human-readable names.
*/

with source as (
    select * from {{ ref('payment_type_lookup') }}
),

renamed as (
    select
        payment_type_id,
        payment_type_name
    from source
)

select * from renamed

### 13.3 stg_rate_codes.sql

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/staging/stg_rate_codes.sql
/*
    Staging model: Rate code lookup
    Maps rate_code_id to human-readable names.
*/

with source as (
    select * from {{ ref('rate_code_lookup') }}
),

renamed as (
    select
        rate_code_id,
        rate_code_name
    from source
)

select * from renamed

### 13.4 stg_taxi_zones.sql

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/staging/stg_taxi_zones.sql
/*
    Staging model: Taxi zone lookup
    Maps LocationID to borough and zone name.
*/

with source as (
    select * from {{ ref('taxi_zone_lookup') }}
),

renamed as (
    select
        cast("LocationID" as integer) as location_id,
        "Borough" as borough,
        "Zone" as zone_name,
        service_zone
    from source
)

select * from renamed

### 13.5 stg_vendors.sql

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/staging/stg_vendors.sql/*    Staging model: Vendor lookup    Maps vendor_id to vendor name and abbreviation.*/with source as (    select * from {{ ref('vendor_lookup') }}),renamed as (    select        vendor_id,        vendor_name,        vendor_abbr    from source)select * from renamed

### 13.5 staging.yml (schema + tests)

Defines **32 tests** across staging models: uniqueness, not-null, accepted values, relationships.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/staging/staging.yml
version: 2

models:
  - name: stg_yellow_trips
    description: "Cleaned and renamed yellow taxi trip records from Flink Silver Iceberg table. Flink already applied quality filters and column renaming."
    columns:
      - name: trip_id
        description: "Surrogate key generated by Flink via MD5 hash of VendorID + timestamps + locations + fare/total amounts"
        tests:
          - not_null
          - unique
      - name: vendor_id
        description: "TPEP provider: 1=Creative Mobile Technologies, 2=VeriFone Inc., 6=Unknown/Other"
        tests:
          - not_null
          - accepted_values:
              arguments:
                values: [1, 2, 6]
              config:
                severity: warn
      - name: rate_code_id
        description: "Rate code: 1=Standard, 2=JFK, 3=Newark, 4=Nassau/Westchester, 5=Negotiated, 6=Group"
        tests:
          - accepted_values:
              arguments:
                values: [1, 2, 3, 4, 5, 6, 99]
              config:
                severity: warn
      - name: pickup_location_id
        description: "TLC Taxi Zone ID for pickup"
        tests:
          - not_null
          - relationships:
              arguments:
                to: ref('stg_taxi_zones')
                field: location_id
              config:
                severity: warn
      - name: dropoff_location_id
        description: "TLC Taxi Zone ID for dropoff"
        tests:
          - not_null
          - relationships:
              arguments:
                to: ref('stg_taxi_zones')
                field: location_id
              config:
                severity: warn
      - name: payment_type_id
        description: "Payment method"
        tests:
          - accepted_values:
              arguments:
                values: [0, 1, 2, 3, 4, 5, 6]
              config:
                severity: warn
      - name: pickup_datetime
        tests:
          - not_null
      - name: dropoff_datetime
        tests:
          - not_null
      - name: trip_distance_miles
        tests:
          - not_null
      - name: fare_amount
        tests:
          - not_null
      - name: total_amount
        tests:
          - not_null

  - name: stg_taxi_zones
    description: "Taxi zone reference mapping location IDs to borough and zone names"
    columns:
      - name: location_id
        tests:
          - unique
          - not_null
      - name: borough
        tests:
          - not_null
      - name: zone_name
        tests:
          - not_null

  - name: stg_payment_types
    description: "Payment type reference"
    columns:
      - name: payment_type_id
        tests:
          - unique
          - not_null
      - name: payment_type_name
        tests:
          - not_null

  - name: stg_rate_codes
    description: "Rate code reference"
    columns:
      - name: rate_code_id
        tests:
          - unique
          - not_null
      - name: rate_code_name
        tests:
          - not_null

## 14. dbt Intermediate Models

Intermediate models add **business logic** on top of staging. They compute metrics,
aggregate data, and apply final quality filters.

### 14.1 int_trip_metrics.sql

Enriches each trip with calculated fields:
- `trip_duration_minutes` (using adapter-dispatched macro)
- `avg_speed_mph` (with division-by-zero protection)
- `cost_per_mile`, `tip_percentage`
- `pickup_day_of_week`, `is_weekend`

Also applies final quality gates:
- Duration between 1-720 minutes
- Speed under 100 mph

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/intermediate/int_trip_metrics.sql
/*
    Intermediate model: Trip-level enrichment with calculated metrics.
    Uses adapter-dispatched macros for cross-dialect compatibility.
*/

with trips as (
    select * from {{ ref('stg_yellow_trips') }}
),

enriched as (
    select
        trip_id,
        vendor_id,
        rate_code_id,
        pickup_location_id,
        dropoff_location_id,
        payment_type_id,
        pickup_datetime,
        dropoff_datetime,
        passenger_count,
        trip_distance_miles,
        store_and_fwd_flag,

        -- calculated: duration in minutes
        {{ duration_minutes('pickup_datetime', 'dropoff_datetime') }} as trip_duration_minutes,

        -- calculated: average speed (avoid division by zero)
        case
            when {{ duration_minutes('pickup_datetime', 'dropoff_datetime') }} > 0
            then round(
                trip_distance_miles / ({{ duration_minutes('pickup_datetime', 'dropoff_datetime') }} / 60.0),
                2
            )
            else null
        end as avg_speed_mph,

        -- calculated: cost per mile
        case
            when trip_distance_miles > 0
            then round(fare_amount / trip_distance_miles, 2)
            else null
        end as cost_per_mile,

        -- calculated: tip percentage
        case
            when fare_amount > 0
            then round((tip_amount / fare_amount) * 100, 2)
            else null
        end as tip_percentage,

        -- time dimensions (using adapter-dispatched macros)
        date_trunc('day', pickup_datetime)::date as pickup_date,
        extract(hour from pickup_datetime) as pickup_hour,
        {{ dayname_compat('pickup_datetime') }} as pickup_day_of_week,
        case
            when extract(dow from pickup_datetime) in (0, 6) then true
            else false
        end as is_weekend,

        -- financials passthrough
        fare_amount,
        extra_amount,
        mta_tax,
        tip_amount,
        tolls_amount,
        improvement_surcharge,
        total_amount,
        congestion_surcharge,
        airport_fee

    from trips
)

select *
from enriched
where trip_duration_minutes between 1 and 720
  and (avg_speed_mph is null or avg_speed_mph < 100)

### 14.2 int_daily_summary.sql

One row per day with aggregated counts, averages, and revenue totals.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/intermediate/int_daily_summary.sql
/*
    Intermediate model: Daily aggregated trip and revenue metrics.
    One row per day with counts, averages, and revenue totals.
*/

with trip_metrics as (
    select * from {{ ref('int_trip_metrics') }}
),

daily_agg as (
    select
        pickup_date,
        pickup_day_of_week,
        is_weekend,

        count(*) as total_trips,
        sum(passenger_count) as total_passengers,

        round(avg(trip_distance_miles), 2) as avg_trip_distance,
        round(avg(trip_duration_minutes), 2) as avg_trip_duration_min,
        round(avg(avg_speed_mph), 2) as avg_speed_mph,

        round(sum(fare_amount), 2) as total_fare_revenue,
        round(sum(tip_amount), 2) as total_tip_revenue,
        round(sum(total_amount), 2) as total_revenue,
        round(avg(total_amount), 2) as avg_trip_revenue,
        round(avg(tip_percentage), 2) as avg_tip_percentage,

        count(case when payment_type_id = 1 then 1 end) as credit_card_trips,
        count(case when payment_type_id = 2 then 1 end) as cash_trips

    from trip_metrics
    group by pickup_date, pickup_day_of_week, is_weekend
)

select * from daily_agg

### 14.3 int_hourly_patterns.sql

One row per date+hour combination for demand analysis.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/intermediate/int_hourly_patterns.sql
/*
    Intermediate model: Hourly trip patterns by date.
    One row per date + hour combination.
*/

with trip_metrics as (
    select * from {{ ref('int_trip_metrics') }}
),

hourly_agg as (
    select
        pickup_date,
        pickup_hour,
        pickup_day_of_week,
        is_weekend,

        count(*) as total_trips,
        round(avg(trip_distance_miles), 2) as avg_distance,
        round(avg(trip_duration_minutes), 2) as avg_duration_min,
        round(sum(total_amount), 2) as total_revenue,
        round(avg(total_amount), 2) as avg_revenue

    from trip_metrics
    group by pickup_date, pickup_hour, pickup_day_of_week, is_weekend
)

select * from hourly_agg

### 14.4 intermediate.yml (schema + tests)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/intermediate/intermediate.yml
version: 2

models:
  - name: int_trip_metrics
    description: "Trip records enriched with calculated metrics."
    columns:
      - name: trip_id
        tests:
          - not_null
      - name: trip_duration_minutes
        tests:
          - not_null
          - dbt_utils.accepted_range:
              arguments:
                min_value: 1
                max_value: 720
      - name: pickup_date
        tests:
          - not_null
      - name: pickup_hour
        tests:
          - not_null
          - dbt_utils.accepted_range:
              arguments:
                min_value: 0
                max_value: 23
      - name: is_weekend
        tests:
          - not_null

  - name: int_daily_summary
    description: "Daily aggregated trip counts, revenue, and average metrics"
    columns:
      - name: pickup_date
        tests:
          - unique
          - not_null
      - name: total_trips
        tests:
          - not_null
          - dbt_utils.accepted_range:
              arguments:
                min_value: 0
      - name: total_revenue
        tests:
          - not_null

  - name: int_hourly_patterns
    description: "Hourly trip aggregations by date"
    columns:
      - name: pickup_date
        tests:
          - not_null
      - name: pickup_hour
        tests:
          - not_null
          - dbt_utils.accepted_range:
              arguments:
                min_value: 0
                max_value: 23
      - name: total_trips
        tests:
          - not_null

## 15. dbt Core Marts (Gold Layer - Facts & Dimensions)

The core marts form the **Gold layer** — the final, query-ready tables for analytics.

### Star Schema Design

```
               dim_dates
                  │
                  │
dim_locations ─── fct_trips ─── dim_payment_types
                  │
                  │
           dim_locations (dropoff)
```

### 15.1 fct_trips.sql

The central **fact table**. Joins trip metrics with location dimensions.
Uses `incremental` materialization with `delete+insert` strategy for efficient rebuilds.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/core/fct_trips.sql
/*
    Fact table: Fully enriched trip records with location names.
    Incremental with delete+insert strategy.
*/

{{
  config(
    materialized='incremental',
    unique_key='trip_id',
    incremental_strategy='delete+insert',
    on_schema_change='fail'
  )
}}

with trip_metrics as (
    select * from {{ ref('int_trip_metrics') }}
),

pickup_locations as (
    select * from {{ ref('dim_locations') }}
),

dropoff_locations as (
    select * from {{ ref('dim_locations') }}
),

final as (
    select
        t.trip_id,
        t.vendor_id,
        t.rate_code_id,
        t.payment_type_id,
        t.pickup_location_id,
        t.dropoff_location_id,
        t.pickup_datetime,
        t.dropoff_datetime,
        t.pickup_date,
        t.pickup_hour,
        t.pickup_day_of_week,
        t.is_weekend,
        t.passenger_count,
        t.trip_distance_miles,
        t.trip_duration_minutes,
        t.avg_speed_mph,
        t.cost_per_mile,
        t.fare_amount,
        t.extra_amount,
        t.mta_tax,
        t.tip_amount,
        t.tip_percentage,
        t.tolls_amount,
        t.improvement_surcharge,
        t.total_amount,
        t.congestion_surcharge,
        t.airport_fee,

        -- enriched from dimensions
        pu.borough as pickup_borough,
        pu.zone_name as pickup_zone,
        do_loc.borough as dropoff_borough,
        do_loc.zone_name as dropoff_zone

    from trip_metrics t
    left join pickup_locations pu
        on t.pickup_location_id = pu.location_id
    left join dropoff_locations do_loc
        on t.dropoff_location_id = do_loc.location_id

    {% if is_incremental() %}
    where t.pickup_datetime > (select max(pickup_datetime) from {{ this }})
    {% endif %}
)

select * from final

### 15.2 dim_dates.sql

Date dimension for January 2024. Uses `dbt_utils.date_spine()` to generate all dates,
then enriches with day-of-week, month name, weekend/holiday flags.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/core/dim_dates.sql
/*
    Dimension table: Calendar dates for January 2024.
    Uses adapter-dispatched macros for dayname/monthname.
*/

with date_spine as (
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('2024-01-01' as date)",
        end_date="cast('2024-02-01' as date)"
    ) }}
),

final as (
    select
        cast(date_day as date) as date_key,
        extract(year from date_day) as year,
        extract(month from date_day) as month,
        extract(day from date_day) as day_of_month,
        extract(dow from date_day) as day_of_week_num,
        {{ dayname_compat('date_day') }} as day_of_week_name,
        {{ monthname_compat('date_day') }} as month_name,
        extract(week from date_day) as week_of_year,
        case
            when extract(dow from date_day) in (0, 6) then true
            else false
        end as is_weekend,
        case
            when cast(date_day as date) in (
                cast('2024-01-01' as date),
                cast('2024-01-15' as date)
            ) then true
            else false
        end as is_holiday

    from date_spine
)

select * from final

### 15.3 dim_locations.sql

Taxi zone dimension from seed data.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/core/dim_locations.sql
/*
    Dimension table: TLC Taxi Zone locations.
*/

with zones as (
    select * from {{ ref('stg_taxi_zones') }}
),

final as (
    select
        location_id,
        borough,
        zone_name,
        service_zone
    from zones
)

select * from final

### 15.4 dim_payment_types.sql

Payment method dimension from seed data.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/core/dim_payment_types.sql
/*
    Dimension table: Payment type descriptions.
*/

with payment_types as (
    select * from {{ ref('stg_payment_types') }}
),

final as (
    select
        payment_type_id,
        payment_type_name
    from payment_types
)

select * from final

### 15.5 dim_vendors.sql

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/core/dim_vendors.sql/*    Dimension table: Taxi vendor descriptions.    TPEP provider: 1=Creative Mobile Technologies (CMT), 2=VeriFone Inc. (VFI)*/with vendors as (    select * from {{ ref('stg_vendors') }}),final as (    select        vendor_id,        vendor_name,        vendor_abbr    from vendors)select * from final

### 15.5 core.yml (contracts + tests)

Enforces **data contracts** on all core models — every column has a declared `data_type`.
This catches schema drift at build time.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/core/core.yml
version: 2

models:
  - name: fct_trips
    description: "Fact table with fully enriched trip records."
    config:
      contract:
        enforced: true
    columns:
      - name: trip_id
        data_type: varchar
        tests: [not_null, unique]
      - name: vendor_id
        data_type: integer
      - name: rate_code_id
        data_type: integer
      - name: payment_type_id
        data_type: integer
      - name: pickup_location_id
        data_type: integer
      - name: dropoff_location_id
        data_type: integer
      - name: pickup_datetime
        data_type: timestamp
        tests: [not_null]
      - name: dropoff_datetime
        data_type: timestamp
      - name: pickup_date
        data_type: date
      - name: pickup_hour
        data_type: bigint
      - name: pickup_day_of_week
        data_type: varchar
      - name: is_weekend
        data_type: boolean
      - name: passenger_count
        data_type: integer
      - name: trip_distance_miles
        data_type: double
      - name: trip_duration_minutes
        data_type: bigint
      - name: avg_speed_mph
        data_type: double
      - name: cost_per_mile
        data_type: double
      - name: fare_amount
        data_type: "decimal(10,2)"
      - name: extra_amount
        data_type: "decimal(10,2)"
      - name: mta_tax
        data_type: "decimal(10,2)"
      - name: tip_amount
        data_type: "decimal(10,2)"
      - name: tip_percentage
        data_type: double
      - name: tolls_amount
        data_type: "decimal(10,2)"
      - name: improvement_surcharge
        data_type: "decimal(10,2)"
      - name: total_amount
        data_type: "decimal(10,2)"
        tests: [not_null]
      - name: congestion_surcharge
        data_type: "decimal(10,2)"
      - name: airport_fee
        data_type: "decimal(10,2)"
      - name: pickup_borough
        data_type: varchar
      - name: pickup_zone
        data_type: varchar
      - name: dropoff_borough
        data_type: varchar
      - name: dropoff_zone
        data_type: varchar

  - name: dim_locations
    description: "Location dimension"
    config:
      contract:
        enforced: true
    columns:
      - name: location_id
        data_type: integer
        tests: [unique, not_null]
      - name: borough
        data_type: varchar
        tests: [not_null]
      - name: zone_name
        data_type: varchar
        tests: [not_null]
      - name: service_zone
        data_type: varchar

  - name: dim_dates
    description: "Date dimension for January 2024"
    config:
      contract:
        enforced: true
    columns:
      - name: date_key
        data_type: date
        tests: [unique, not_null]
      - name: year
        data_type: bigint
      - name: month
        data_type: bigint
      - name: day_of_month
        data_type: bigint
      - name: day_of_week_num
        data_type: bigint
      - name: day_of_week_name
        data_type: varchar
        tests: [not_null]
      - name: month_name
        data_type: varchar
      - name: week_of_year
        data_type: bigint
      - name: is_weekend
        data_type: boolean
        tests: [not_null]
      - name: is_holiday
        data_type: boolean
        tests: [not_null]

  - name: dim_payment_types
    description: "Payment type dimension"
    config:
      contract:
        enforced: true
    columns:
      - name: payment_type_id
        data_type: integer
        tests: [unique, not_null]
      - name: payment_type_name
        data_type: varchar
        tests: [not_null]

## 16. dbt Analytics Marts (Gold Layer - Business KPIs)

Analytics marts are purpose-built aggregations for specific business questions.

### 16.1 mart_daily_revenue.sql

Daily revenue metrics with **running totals** and **day-over-day change**.
Joins with `dim_dates` for calendar context (weekends, holidays).

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/analytics/mart_daily_revenue.sql
/*
    Analytics mart: Daily revenue metrics with running totals.
*/

with daily as (
    select * from {{ ref('int_daily_summary') }}
),

dates as (
    select * from {{ ref('dim_dates') }}
),

final as (
    select
        d.date_key,
        d.day_of_week_name,
        d.is_weekend,
        d.is_holiday,
        d.week_of_year,

        daily.total_trips,
        daily.total_passengers,
        daily.total_fare_revenue,
        daily.total_tip_revenue,
        daily.total_revenue,
        daily.avg_trip_revenue,
        daily.avg_tip_percentage,
        daily.credit_card_trips,
        daily.cash_trips,
        daily.avg_trip_distance,
        daily.avg_trip_duration_min,

        -- running total
        sum(daily.total_revenue) over (order by d.date_key) as cumulative_revenue,

        -- day-over-day change
        daily.total_revenue - lag(daily.total_revenue) over (order by d.date_key) as revenue_change_vs_prior_day

    from daily
    inner join dates d
        on daily.pickup_date = d.date_key
)

select * from final

### 16.2 mart_hourly_demand.sql

Hourly demand patterns aggregated across all days. Answers: "What's the average trip count at 8 AM on weekdays vs weekends?"

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/analytics/mart_hourly_demand.sql
/*
    Analytics mart: Hourly demand patterns.
*/

with hourly as (
    select * from {{ ref('int_hourly_patterns') }}
),

final as (
    select
        pickup_hour,
        is_weekend,

        count(*) as days_observed,
        round(avg(total_trips), 0) as avg_trips_per_period,
        round(avg(avg_distance), 2) as avg_distance,
        round(avg(avg_duration_min), 2) as avg_duration_min,
        round(avg(total_revenue), 2) as avg_revenue_per_period,
        sum(total_trips) as total_trips_all_days

    from hourly
    group by pickup_hour, is_weekend
)

select * from final
order by is_weekend, pickup_hour

### 16.3 mart_location_performance.sql

Per-zone performance summary. Includes `mode()` for most common dropoff destination and peak hour.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/analytics/mart_location_performance.sql
/*
    Analytics mart: Location-level performance summary.
    Uses adapter-dispatched mode_compat() for cross-dialect support.
*/

with trips as (
    select * from {{ ref('fct_trips') }}
),

final as (
    select
        pickup_location_id,
        pickup_borough,
        pickup_zone,

        count(*) as total_pickups,
        round(avg(trip_distance_miles), 2) as avg_trip_distance,
        round(avg(trip_duration_minutes), 2) as avg_trip_duration_min,
        round(sum(total_amount), 2) as total_revenue,
        round(avg(total_amount), 2) as avg_revenue_per_trip,
        round(avg(tip_percentage), 2) as avg_tip_pct,
        round(avg(passenger_count), 2) as avg_passengers,

        -- most common dropoff destination
        {{ mode_compat('dropoff_zone') }} as most_common_dropoff_zone,

        -- busiest hour
        {{ mode_compat('pickup_hour') }} as peak_pickup_hour

    from trips
    where pickup_zone is not null
    group by pickup_location_id, pickup_borough, pickup_zone
)

select * from final
order by total_pickups desc

### 16.4 analytics.yml (contracts + tests)

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/models/marts/analytics/analytics.yml
version: 2

models:
  - name: mart_daily_revenue
    description: "Daily revenue metrics with running totals."
    config:
      contract:
        enforced: true
    columns:
      - name: date_key
        data_type: date
        tests: [unique, not_null]
      - name: day_of_week_name
        data_type: varchar
      - name: is_weekend
        data_type: boolean
      - name: is_holiday
        data_type: boolean
      - name: week_of_year
        data_type: bigint
      - name: total_trips
        data_type: bigint
      - name: total_passengers
        data_type: hugeint
      - name: total_fare_revenue
        data_type: "decimal(38,2)"
      - name: total_tip_revenue
        data_type: "decimal(38,2)"
      - name: total_revenue
        data_type: "decimal(38,2)"
        tests: [not_null]
      - name: avg_trip_revenue
        data_type: double
      - name: avg_tip_percentage
        data_type: double
      - name: credit_card_trips
        data_type: bigint
      - name: cash_trips
        data_type: bigint
      - name: avg_trip_distance
        data_type: double
      - name: avg_trip_duration_min
        data_type: double
      - name: cumulative_revenue
        data_type: "decimal(38,2)"
      - name: revenue_change_vs_prior_day
        data_type: "decimal(38,2)"

  - name: mart_location_performance
    description: "Per-zone performance summary."
    config:
      contract:
        enforced: true
    columns:
      - name: pickup_location_id
        data_type: integer
        tests: [unique, not_null]
      - name: pickup_borough
        data_type: varchar
      - name: pickup_zone
        data_type: varchar
      - name: total_pickups
        data_type: bigint
        tests: [not_null]
      - name: avg_trip_distance
        data_type: double
      - name: avg_trip_duration_min
        data_type: double
      - name: total_revenue
        data_type: "decimal(38,2)"
      - name: avg_revenue_per_trip
        data_type: double
      - name: avg_tip_pct
        data_type: double
      - name: avg_passengers
        data_type: double
      - name: most_common_dropoff_zone
        data_type: varchar
      - name: peak_pickup_hour
        data_type: bigint

  - name: mart_hourly_demand
    description: "Hourly demand patterns."
    config:
      contract:
        enforced: true
    columns:
      - name: pickup_hour
        data_type: bigint
        tests: [not_null]
      - name: is_weekend
        data_type: boolean
        tests: [not_null]
      - name: days_observed
        data_type: bigint
      - name: avg_trips_per_period
        data_type: double
      - name: avg_distance
        data_type: double
      - name: avg_duration_min
        data_type: double
      - name: avg_revenue_per_period
        data_type: double
      - name: total_trips_all_days
        data_type: hugeint

## 17. dbt Tests: Data Quality Assertions

dbt tests come in two flavors:
1. **Generic tests** (in YAML files): `unique`, `not_null`, `accepted_values`, `relationships`, `accepted_range`
2. **Singular tests** (SQL files): Custom queries that return rows **only if there's a problem**

### Test Summary: 91 tests across all layers

| Layer | Tests | What They Check |
|-------|-------|-----------------|
| Staging | 32 | Uniqueness, nulls, accepted values, referential integrity |
| Intermediate | 15 | Ranges (duration 1-720 min, hour 0-23), totals > 0 |
| Core Marts | 24 | Data contracts (column types), key uniqueness |
| Analytics | 12 | Aggregation integrity, non-null results |
| Singular | 2 | fare ≤ total, duration ≥ 0 |
| Seeds | 6 | Reference data integrity |

### 17.1 assert_fare_not_exceeds_total.sql

Fare amount should never exceed total amount (which includes tips, taxes, surcharges).

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/tests/assert_fare_not_exceeds_total.sql
/*
    Singular test: fare_amount should not exceed total_amount.
*/

select
    trip_id,
    fare_amount,
    total_amount
from {{ ref('stg_yellow_trips') }}
where fare_amount > total_amount + 0.01
  and total_amount > 0

### 17.2 assert_trip_duration_positive.sql

No trip should have a negative duration (dropoff before pickup).

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/dbt_project/tests/assert_trip_duration_positive.sql
/*
    Singular test: No trip should have negative duration.
*/

select
    trip_id,
    pickup_datetime,
    dropoff_datetime,
    trip_duration_minutes
from {{ ref('int_trip_metrics') }}
where trip_duration_minutes < 0

## 18. Pipeline Makefile: One-Command Orchestration

The Makefile orchestrates the entire pipeline lifecycle with `make` commands:

| Command | What It Does |
|---------|-------------|
| `make up` | Start all infrastructure services |
| `make down` | Stop and remove volumes (includes Lakekeeper profile) |
| `make create-topics` | Create Kafka topics (raw + DLQ) |
| `make generate` | Produce taxi events to Kafka (burst mode) |
| `make generate-limited` | Produce 10k events for testing |
| `make process` | Submit Bronze + Silver Flink SQL jobs |
| `make dbt-build` | Run dbt build with full-refresh |
| `make benchmark` | Full E2E benchmark (down → up → process → dbt → down) |
| `make health` | Quick health check of all services |
| `make check-lag` | Show Kafka consumer group lag |
| `make up-lakekeeper` | Start with Lakekeeper REST catalog |
| `make process-rest` | Run Flink SQL via REST catalog |

> **Windows note:** `MSYS_NO_PATHCONV=1` prefix on Docker commands prevents Git Bash
> from converting Linux paths to Windows paths.

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/Makefile
SHELL := bash
# =============================================================================
# Pipeline 01: Kafka + Flink + Iceberg (Production-Grade Template)
# =============================================================================
# Makefile for orchestrating the complete streaming pipeline lifecycle.
# =============================================================================

COMPOSE = docker compose
FLINK_SQL_CLIENT = MSYS_NO_PATHCONV=1 $(COMPOSE) exec -T flink-jobmanager /opt/flink/bin/sql-client.sh embedded

.PHONY: help up down generate create-topics process process-bronze process-silver \
        dbt-build benchmark logs status clean ps restart check-lag health

help: ## Show this help
	@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | \
		awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

# =============================================================================
# Lifecycle
# =============================================================================

up: ## Start all infrastructure services
	$(COMPOSE) up -d
	@echo ""
	@echo "=== Pipeline 01: Kafka + Flink + Iceberg ==="
	@echo "Kafka:            localhost:9092"
	@echo "Schema Registry:  http://localhost:8085"
	@echo "Flink Dashboard:  http://localhost:8083"
	@echo "MinIO Console:    http://localhost:9001  (minioadmin/minioadmin)"
	@echo ""
	@echo "Next steps:"
	@echo "  make create-topics   # Create Kafka topics"
	@echo "  make generate        # Produce taxi events to Kafka"
	@echo "  make process         # Submit Flink SQL jobs"
	@echo "  make dbt-build       # Run dbt transformations"

down: ## Stop all services and remove volumes
	$(COMPOSE) --profile generator --profile dbt --profile lakekeeper down -v
	@echo "Pipeline 01 stopped and volumes removed."

clean: ## Stop everything and prune all related resources
	$(COMPOSE) --profile generator --profile dbt --profile lakekeeper down -v --remove-orphans
	docker network rm p01-pipeline-net 2>/dev/null || true
	@echo "Pipeline 01 fully cleaned."

restart: ## Restart all services
	$(MAKE) down
	$(MAKE) up

# =============================================================================
# Topic Management
# =============================================================================

create-topics: ## Create Kafka topics (raw + DLQ)
	$(COMPOSE) exec kafka /opt/kafka/bin/kafka-topics.sh \
		--bootstrap-server localhost:9092 \
		--create \
		--topic taxi.raw_trips \
		--partitions 3 \
		--replication-factor 1 \
		--if-not-exists
	@echo "Topic taxi.raw_trips created (3 partitions)."
	$(COMPOSE) exec kafka /opt/kafka/bin/kafka-topics.sh \
		--bootstrap-server localhost:9092 \
		--create \
		--topic taxi.raw_trips.dlq \
		--partitions 1 \
		--replication-factor 1 \
		--if-not-exists
	@echo "Topic taxi.raw_trips.dlq created (1 partition, dead letter queue)."

# =============================================================================
# Data Generation
# =============================================================================

generate: ## Produce taxi trip events to Kafka (burst mode)
	MSYS_NO_PATHCONV=1 $(COMPOSE) run --rm data-generator
	@echo "Data generation complete."

generate-limited: ## Produce limited events for testing (10k)
	MSYS_NO_PATHCONV=1 $(COMPOSE) run --rm -e MAX_EVENTS=10000 data-generator
	@echo "Limited data generation complete (10k events)."

# =============================================================================
# Flink SQL Processing
# =============================================================================

process: process-bronze process-silver ## Submit all Flink SQL jobs (Bronze + Silver)
	@echo "All Flink SQL jobs complete."

process-bronze: ## Submit Bronze layer Flink SQL jobs (batch mode)
	@echo "=== Bronze: Kafka → Iceberg ==="
	$(FLINK_SQL_CLIENT) -i /opt/flink/sql/00-init.sql -f /opt/flink/sql/05-bronze.sql
	@echo "Bronze layer complete."

process-silver: ## Submit Silver layer Flink SQL jobs (batch mode)
	@echo "=== Silver: Bronze → Cleaned Iceberg ==="
	$(FLINK_SQL_CLIENT) -i /opt/flink/sql/00-init.sql -f /opt/flink/sql/06-silver.sql
	@echo "Silver layer complete."

# =============================================================================
# dbt Transformations
# =============================================================================

dbt-build: ## Run dbt build (full-refresh) on Iceberg Silver data
	MSYS_NO_PATHCONV=1 $(COMPOSE) run --rm --entrypoint /bin/sh dbt -c "dbt deps --profiles-dir . && dbt build --full-refresh --profiles-dir ."
	@echo "dbt build complete."

dbt-test: ## Run dbt tests only
	MSYS_NO_PATHCONV=1 $(COMPOSE) run --rm dbt test --profiles-dir .

dbt-docs: ## Generate dbt documentation
	MSYS_NO_PATHCONV=1 $(COMPOSE) run --rm dbt docs generate --profiles-dir .

# =============================================================================
# Benchmark (Full E2E)
# =============================================================================

benchmark: ## Full end-to-end benchmark: down -> up → topics → generate → process → dbt → down
	@echo "============================================================"
	@echo "  Pipeline 01 Benchmark: Kafka + Flink + Iceberg"
	@echo "============================================================"
	@START_TIME=$$(date +%s) && \
	$(MAKE) down 2>/dev/null || true && \
	$(MAKE) up && \
	echo "Waiting for services to stabilize..." && \
	sleep 15 && \
	$(MAKE) create-topics && \
	$(MAKE) generate && \
	echo "Waiting for Flink processing to catch up..." && \
	sleep 10 && \
	$(MAKE) process && \
	echo "Waiting for streaming jobs to process data..." && \
	sleep 30 && \
	$(MAKE) dbt-build && \
	END_TIME=$$(date +%s) && \
	ELAPSED=$$((END_TIME - START_TIME)) && \
	echo "" && \
	echo "============================================================" && \
	echo "  BENCHMARK COMPLETE" && \
	echo "  Total elapsed: $${ELAPSED}s" && \
	echo "============================================================" && \
	echo "{\"pipeline\": \"01-kafka-flink-iceberg\", \"elapsed_seconds\": $$ELAPSED, \"timestamp\": \"$$(date -Iseconds)\"}" > benchmark_results/latest.json && \
	echo "Results saved to benchmark_results/latest.json" && \
	$(MAKE) down

# =============================================================================
# Observability
# =============================================================================

logs: ## Tail logs from all services
	$(COMPOSE) logs -f --tail=100

logs-kafka: ## Tail Kafka logs
	$(COMPOSE) logs -f kafka

logs-flink: ## Tail Flink JobManager logs
	$(COMPOSE) logs -f flink-jobmanager

logs-flink-tm: ## Tail Flink TaskManager logs
	$(COMPOSE) logs -f flink-taskmanager

status: ## Show service status
	@echo "=== Pipeline 01: Service Status ==="
	$(COMPOSE) ps
	@echo ""
	@echo "=== Kafka Topics ==="
	$(COMPOSE) exec kafka /opt/kafka/bin/kafka-topics.sh \
		--bootstrap-server localhost:9092 --list 2>/dev/null || echo "(Kafka not running)"
	@echo ""
	@echo "=== Flink Jobs ==="
	@curl -s http://localhost:8083/jobs/overview 2>/dev/null | python3 -m json.tool 2>/dev/null || echo "(Flink not running)"

ps: ## Show running containers
	$(COMPOSE) ps

# =============================================================================
# Health & Diagnostics
# =============================================================================

health: ## Quick health check of all services
	@echo "=== Pipeline 01: Health Check ==="
	@echo -n "Kafka:           " && $(COMPOSE) exec -T kafka /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 > /dev/null 2>&1 && echo "OK" || echo "FAIL"
	@echo -n "Schema Registry: " && curl -sf http://localhost:8085/subjects > /dev/null 2>&1 && echo "OK" || echo "FAIL"
	@echo -n "MinIO:           " && curl -sf http://localhost:9000/minio/health/live > /dev/null 2>&1 && echo "OK" || echo "FAIL"
	@echo -n "Flink Dashboard: " && curl -sf http://localhost:8083/overview > /dev/null 2>&1 && echo "OK" || echo "FAIL"

check-lag: ## Show Kafka consumer group lag
	$(COMPOSE) exec kafka /opt/kafka/bin/kafka-consumer-groups.sh \
		--bootstrap-server localhost:9092 \
		--describe --group flink-consumer 2>/dev/null || echo "(No active consumer group 'flink-consumer')"

# =============================================================================
# Lakekeeper REST Catalog (opt-in)
# =============================================================================

up-lakekeeper: ## Start with Lakekeeper REST catalog
	$(COMPOSE) --profile lakekeeper up -d
	@echo ""
	@echo "=== Pipeline 01: Kafka + Flink + Iceberg (Lakekeeper REST Catalog) ==="
	@echo "Lakekeeper UI:    http://localhost:8181"
	@echo "Flink Dashboard:  http://localhost:8083"
	@echo ""
	@echo "Use 'make process-rest' to run Flink SQL with REST catalog"

process-rest: process-bronze-rest process-silver-rest ## Submit Flink SQL via REST catalog
	@echo "All Flink SQL jobs complete (REST catalog)."

process-bronze-rest: ## Submit Bronze layer via REST catalog
	@echo "=== Bronze: Kafka → Iceberg (REST catalog) ==="
	$(FLINK_SQL_CLIENT) -i /opt/flink/sql/00-init-rest.sql -f /opt/flink/sql/05-bronze.sql
	@echo "Bronze layer complete."

process-silver-rest: ## Submit Silver layer via REST catalog
	@echo "=== Silver: Bronze → Cleaned Iceberg (REST catalog) ==="
	$(FLINK_SQL_CLIENT) -i /opt/flink/sql/00-init-rest.sql -f /opt/flink/sql/06-silver.sql
	@echo "Silver layer complete."


## 19. Airflow DAGs: Production Scheduling

These DAGs implement the **control plane** — scheduling dbt runs and Iceberg maintenance.

> **Note:** These are reference implementations. Pipeline 01 runs Airflow via the
> optional docker-compose services. For a dedicated orchestrated pipeline, see
> Pipeline 08 (Airflow/Astronomer).

### 19.1 Pipeline DAG (every 10 minutes)

```
check_flink_health
    ├─ healthy → run_dbt → run_dbt_tests
    └─ unhealthy → alert_flink_down
```

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/airflow/dags/taxi_pipeline_dag.py
"""NYC Taxi Pipeline DAG - Production Orchestration.

Runs every 10 minutes:
  1. Check Flink cluster health
  2. Run dbt build (Silver → Gold)
  3. Run dbt tests (91 data quality assertions)
  4. Alert if Flink is down
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import requests


default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'nyc_taxi_pipeline',
    default_args=default_args,
    description='NYC Taxi Real-Time Pipeline Orchestration',
    schedule_interval='*/10 * * * *',  # Every 10 minutes
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1,
    tags=['production', 'nyc-taxi', 'real-time'],
)


def check_flink_health(**context):
    """Check if Flink cluster is healthy and jobs are running."""
    try:
        response = requests.get('http://flink-jobmanager:8081/jobs/overview')
        response.raise_for_status()
        jobs = response.json()['jobs']
        running_jobs = [j for j in jobs if j['state'] == 'RUNNING']
        if not running_jobs:
            raise ValueError("No running Flink jobs found")
        print(f"Flink healthy: {len(running_jobs)} jobs running")
        return 'run_dbt'
    except Exception as e:
        print(f"Flink health check failed: {e}")
        return 'alert_flink_down'


# Task 1: Health check (branch based on result)
health_check = BranchPythonOperator(
    task_id='check_flink_health',
    python_callable=check_flink_health,
    provide_context=True,
    dag=dag,
)

# Task 2: Run dbt build (Silver → Gold)
run_dbt = BashOperator(
    task_id='run_dbt',
    bash_command='cd /opt/airflow/dbt && dbt build --profiles-dir . --target prod',
    dag=dag,
)

# Task 3: Run dbt tests
run_dbt_tests = BashOperator(
    task_id='run_dbt_tests',
    bash_command='cd /opt/airflow/dbt && dbt test --profiles-dir . --target prod',
    dag=dag,
)

# Task 4: Alert if Flink is unhealthy
alert_flink_down = BashOperator(
    task_id='alert_flink_down',
    bash_command='echo "ALERT: Flink cluster unhealthy" && exit 1',
    dag=dag,
)

# Dependencies
health_check >> [run_dbt, alert_flink_down]
run_dbt >> run_dbt_tests

### 19.2 Maintenance DAG (daily at 2 AM)

```
compact_silver → expire_snapshots → remove_orphan_files
```

In [None]:
%%writefile ../pipelines/01-kafka-flink-iceberg/airflow/dags/iceberg_maintenance_dag.py
"""Iceberg Maintenance DAG - Daily at 2 AM.

Operations:
  1. Compact Silver table (merge small files for query performance)
  2. Expire old snapshots (cleanup metadata, keep last 5)
  3. Remove orphan files (reclaim unreferenced storage)
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta


default_args = {
    'owner': 'data-engineering',
    'retries': 1,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG(
    'iceberg_maintenance',
    default_args=default_args,
    description='Iceberg table maintenance (compaction, expiration)',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=days_ago(1),
    catchup=False,
    tags=['maintenance', 'iceberg'],
)

compact_silver = BashOperator(
    task_id='compact_silver_table',
    bash_command="""
    docker exec p01-flink-jobmanager /opt/flink/bin/sql-client.sh \
      -i /opt/flink/sql/00-init.sql \
      -e "CALL iceberg_catalog.system.rewrite_data_files(
            table => 'nyc_taxi.silver.cleaned_trips',
            strategy => 'sort',
            sort_order => 'pickup_date,pickup_hour'
          );"
    """,
    dag=dag,
)

expire_snapshots = BashOperator(
    task_id='expire_old_snapshots',
    bash_command="""
    docker exec p01-flink-jobmanager /opt/flink/bin/sql-client.sh \
      -i /opt/flink/sql/00-init.sql \
      -e "CALL iceberg_catalog.system.expire_snapshots(
            table => 'nyc_taxi.silver.cleaned_trips',
            older_than => CURRENT_TIMESTAMP - INTERVAL '7' DAY,
            retain_last => 5
          );"
    """,
    dag=dag,
)

remove_orphans = BashOperator(
    task_id='remove_orphan_files',
    bash_command="""
    docker exec p01-flink-jobmanager /opt/flink/bin/sql-client.sh \
      -i /opt/flink/sql/00-init.sql \
      -e "CALL iceberg_catalog.system.remove_orphan_files(
            table => 'nyc_taxi.silver.cleaned_trips',
            older_than => CURRENT_TIMESTAMP - INTERVAL '7' DAY
          );"
    """,
    dag=dag,
)

compact_silver >> expire_snapshots >> remove_orphans

## 20. Running the Pipeline

### Quick Start (Full Benchmark)

```bash
cd pipelines/01-kafka-flink-iceberg
make benchmark
```

This runs the complete E2E flow: `down → up → create-topics → generate → process → dbt-build → down`

### Step-by-Step Execution

```bash
# 1. Start infrastructure (Kafka, Flink, MinIO, Schema Registry)
make up
# Wait ~15-30s for all health checks to pass

# 2. Verify all services are healthy
make health
# Expected: Kafka=OK, Schema Registry=OK, MinIO=OK, Flink Dashboard=OK

# 3. Create Kafka topics (raw + DLQ)
make create-topics
# Creates taxi.raw_trips (3 partitions) + taxi.raw_trips.dlq (1 partition)

# 4. Produce events
make generate
# Sends 10k events in burst mode with idempotent delivery

# 5. Wait for Kafka to be fully written
sleep 10

# 6. Process Bronze layer (Kafka → Iceberg)
make process-bronze
# Runs: sql-client.sh -i 00-init.sql -f 05-bronze.sql

# 7. Process Silver layer (Bronze → Silver with dedup)
make process-silver
# Runs: sql-client.sh -i 00-init.sql -f 06-silver.sql

# 8. Run dbt (Silver → Gold, 94 tests)
make dbt-build
# Runs: dbt deps && dbt build --full-refresh

# 9. Check consumer lag
make check-lag

# 10. Clean up
make down
```

### With Lakekeeper REST Catalog (Optional)

```bash
# Start with REST catalog services
make up-lakekeeper
# Additional: Lakekeeper UI at http://localhost:8181

# Process using REST catalog (no S3 creds in SQL)
make process-rest

# Same dbt-build step
make dbt-build
```

### Monitoring During Execution

| Service | URL | What to Check |
|---------|-----|---------------|
| **Flink Dashboard** | http://localhost:8083 | Running jobs, task metrics, backpressure |
| **MinIO Console** | http://localhost:9001 | Iceberg data files in `warehouse` bucket |
| **Schema Registry** | http://localhost:8085 | Registered schemas |
| **Lakekeeper** | http://localhost:8181 | REST catalog (if `--profile lakekeeper`) |
| **Prometheus** | port 9249 on Flink containers | Metrics scraping endpoint |

## 21. Production Operations

### Performance Summary (Flink 2.0.1 + Iceberg 1.10.1)

| Phase | Duration | What Happens |
|-------|----------|-------------|
| **Infrastructure startup** | 15-30s | Services healthy, buckets created |
| **Ingestion** | 0.3-0.7s | 10k events to Kafka (idempotent, acks=all) |
| **Bronze processing** | ~10s | Kafka → Iceberg (with watermarks) |
| **Silver processing** | ~14s | Bronze → Silver (ROW_NUMBER dedup + filters) |
| **dbt build** | ~21s | Silver → Gold (15 models, 94 tests) |
| **Total E2E** | **~75s** | First event → Gold tables ready |

### Defense-in-Depth Monitoring

| Layer | What to Monitor | Alert Threshold |
|-------|----------------|-----------------|
| **Kafka** | Consumer lag (`make check-lag`) | lag > 10,000 events for > 5 min |
| **Kafka** | DLQ message count | Any message in DLQ |
| **Flink** | Backpressure (Dashboard) | > 10% for > 10 min |
| **Flink** | Checkpoint duration | > 60s |
| **Flink** | Prometheus metrics (port 9249) | Task failures > 0 |
| **Iceberg** | File count per table | > 1,000 (needs compaction) |
| **dbt** | Test results (94 expected) | Any FAIL or ERROR |
| **dbt** | Source freshness | Exceeds `warn_after` threshold |

### Scaling Considerations

| Component | Horizontal | Vertical |
|-----------|-----------|----------|
| **Kafka** | Add partitions (must match Flink parallelism) | Increase broker memory |
| **Flink** | Add task managers | Increase task slots, memory |
| **MinIO** | Add nodes (distributed mode) | Increase disk |
| **dbt** | Increase threads in profiles.yml | Increase DuckDB memory_limit |

### Batch vs Streaming Mode

The same SQL works in both modes — only the runtime setting changes:

```sql
-- Batch mode (default, for catch-up/backfill)
SET 'execution.runtime-mode' = 'batch';
SET 'table.dml-sync' = 'true';

-- Streaming mode (continuous processing)
SET 'execution.runtime-mode' = 'streaming';
-- Do NOT set table.dml-sync (would block forever)
```

Use `07-streaming-bronze.sql` for continuous streaming. Use `05-bronze.sql` for batch catch-up.

### Backfill Pattern

```bash
# 1. Reset Kafka consumer offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group flink-consumer --topic taxi.raw_trips \
  --reset-offsets --to-earliest --execute

# 2. Re-run Flink processing (batch mode)
make process

# 3. Rebuild dbt from scratch
make dbt-build
```

### Version Matrix (Validated)

| Component | Version | JAR/Image |
|-----------|---------|-----------|
| Flink | 2.0.1 | `flink:2.0.1-java17` |
| Iceberg | 1.10.1 | `iceberg-flink-runtime-2.0-1.10.1.jar` |
| Kafka Connector | 4.0.1-2.0 | `flink-sql-connector-kafka-4.0.1-2.0.jar` |
| Kafka | 4.0.0 | `apache/kafka:4.0.0` |
| Hadoop Client | 3.3.6 | `hadoop-client-api/runtime-3.3.6.jar` |
| AWS SDK | 1.12.367 | `aws-java-sdk-bundle-1.12.367.jar` |
| Lakekeeper | 0.11.2 | `quay.io/lakekeeper/catalog:v0.11.2` |