In [25]:
%%writefile src\infrastructure\docker\airflow.Dockerfile
FROM apache/airflow:3.0.1-python3.10


USER root

# Install system dependencies
RUN apt-get update \
    && apt-get install -y --no-install-recommends \
        build-essential \
        default-libmysqlclient-dev \
        libpq-dev \
    && apt-get autoremove -yqq --purge \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

USER airflow

# Set environment variables explicitly to avoid SQLAlchemy conflicts
ENV AIRFLOW__DATABASE__EXECUTEMANY_MODE=batch 

WORKDIR /app
COPY pyproject.toml /app/
RUN pip install --no-cache-dir /app/

# Only copy your DAGs and plugins when they change
COPY src/orchestrators/airflow/dags /opt/airflow/dags
COPY src/orchestrators/airflow/plugins /opt/airflow/plugins

# Initialize Airflow
WORKDIR /opt/airflow 


Overwriting src\infrastructure\docker\airflow.Dockerfile


In [18]:
%%writefile src\infrastructure\docker\init-db.sql



-- Create kestra database
CREATE DATABASE kestra;

-- Create database for real-time pipeline
CREATE DATABASE real_time_pipeline;

-- Grant privileges
GRANT ALL PRIVILEGES ON DATABASE kestra TO airflow;
GRANT ALL PRIVILEGES ON DATABASE real_time_pipeline TO airflow; 




Overwriting src\infrastructure\docker\init-db.sql


In [19]:
%%writefile src\orchestrators\airflow\config\airflow.cfg

# This is a minimal Airflow configuration file that overrides some default settings
# The full configuration is loaded from airflow.cfg in the Airflow installation directory

[core]
# The folder where your DAGs are located
dags_folder = /opt/airflow/dags

# The folder where Airflow should store its database
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow

# The executor class that Airflow should use
executor = CeleryExecutor

# Whether to load the DAG examples that ship with Airflow
load_examples = False

# Whether DAGs should be loaded by default
dags_are_paused_at_creation = True

# The amount of parallelism as a setting to the executor
parallelism = 32

# The number of task instances allowed to run concurrently
dag_concurrency = 16

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

# Enable XCom pickling for TaskFlow API
enable_xcom_pickling = True

# Enable AssetWatcher
task_defer_method = airflow.triggers.triggerer.DeferralTrigger

[webserver]
# The base URL of your Airflow web server
base_url = http://localhost:8080

# The default UI theme
default_ui_timezone = UTC

# The default number of task instances to show in the UI
dag_default_view = graph

# Default page size when listing entities in the UI
page_size = 100

[scheduler]
# The number of seconds the scheduler should wait between heartbeats to the database
scheduler_heartbeat_sec = 5

# The number of seconds to wait between file-parsing loops
min_file_process_interval = 30

# The number of threads to use when processing the dags
parsing_processes = 2

[celery]
# The Celery broker URL
broker_url = redis://redis:6379/0

# The Celery result backend
result_backend = db+postgresql://airflow:airflow@postgres:5432/airflow

# The default queue that tasks get assigned to
default_queue = default

[assetwatcher]
# Enable AssetWatcher
enable = True 

[dag_processor]
# Use GitDagBundle for versioned DAG source code - commented out as it causes parsing issues
# dag_bundle_config_list = [{
#   "name": "repo_dags",
#   "classpath": "airflow.providers.git.bundles.git.GitDagBundle",
#   "kwargs": {
#     "repo_url": "https://github.com/your-org/data_engineering_realtime_pipeline.git",
#     "tracking_ref": "main",
#     "git_conn_id": "git_default"
#   }
# }]

[database]
# Force a supported executemany_mode for this version of SQLAlchemy/psycopg2
executemany_mode = batch









Overwriting src\orchestrators\airflow\config\airflow.cfg


In [20]:
%%writefile src\orchestrators\airflow\dags\gtfs_data_pipeline.py
#!/usr/bin/env python3
"""
GTFS Realtime Data Pipeline DAG 

This DAG fetches GTFS-RT data from the MTA Bus Time API, processes it,
and loads it into the storage backend of choice: S3, BigQuery, Azure Blob, or DuckDB.
It also demonstrates SQL operations by loading data into PostgreSQL.

The DAG demonstrates the Airflow TaskFlow API (Python functions as tasks)
and parameterization for different cloud environments.
"""

import os
import sys
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path

from airflow.decorators import dag, task
from airflow.models import Variable, Connection
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.empty import EmptyOperator

# Add the ingestion module to Python path
# Adjust this path based on your deployment
from ingestion.fetch_gtfs import GTFSFetcher

# Default settings applied to all tasks
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(minutes=10),
}

# Configurable parameters with defaults
# These can be overridden by setting Airflow Variables
CLOUD_PROVIDER = Variable.get("CLOUD_PROVIDER", default_var="local")  # aws, gcp, azure, or local
STORAGE_TYPE = Variable.get("STORAGE_TYPE", default_var="duckdb")  # s3, gcs, azure_blob, bigquery, duckdb
API_URL = Variable.get("GTFS_API_URL", default_var="https://gtfsrt.prod.obanyc.com/vehiclePositions")
OUTPUT_FORMAT = Variable.get("OUTPUT_FORMAT", default_var="json")
USE_SQL_DB = Variable.get("USE_SQL_DB", default_var="true").lower() == "true"  # Whether to also load data into PostgreSQL

# Cloud-specific settings with defaults
if CLOUD_PROVIDER == "aws":
    S3_BUCKET = Variable.get("S3_BUCKET", default_var="gtfs-data")
    S3_PREFIX = Variable.get("S3_PREFIX", default_var="vehicle_positions")
elif CLOUD_PROVIDER == "gcp":
    GCS_BUCKET = Variable.get("GCS_BUCKET", default_var="gtfs-data")
    GCS_PREFIX = Variable.get("GCS_PREFIX", default_var="vehicle_positions")
    BQ_DATASET = Variable.get("BQ_DATASET", default_var="gtfs_data")
    BQ_TABLE = Variable.get("BQ_TABLE", default_var="vehicle_positions")
elif CLOUD_PROVIDER == "azure":
    AZURE_CONTAINER = Variable.get("AZURE_CONTAINER", default_var="gtfs-data")
    AZURE_PREFIX = Variable.get("AZURE_PREFIX", default_var="vehicle_positions")
else:  # local
    DUCKDB_PATH = Variable.get("DUCKDB_PATH", default_var="/tmp/gtfs.duckdb")
    DUCKDB_TABLE = Variable.get("DUCKDB_TABLE", default_var="vehicle_positions")

# Define an asset for asset-driven scheduling
from airflow.sdk import Asset, AssetWatcher
from airflow.providers.standard.triggers.file import FileSensorTrigger

# Create a file sensor trigger for the GTFS asset
gtfs_file_trigger = FileSensorTrigger(filepath="/data/gtfs/new_data.flag")
gtfs_asset = Asset(
    "gtfs_data_asset", 
    watchers=[AssetWatcher(name="gtfs_data_watcher", trigger=gtfs_file_trigger)]
)

@dag(
    default_args=default_args,
    schedule=[gtfs_asset],  # asset-driven scheduling
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1,
    tags=['gtfs', 'realtime', 'sql', CLOUD_PROVIDER],
    doc_md=__doc__
)
def gtfs_data_pipeline():
    """
    ### GTFS-RT Data Pipeline

    This DAG demonstrates how to fetch and process GTFS-RT data with Airflow,
    using different cloud providers and storage backends.

    #### Environment configuration
    * Cloud Provider: {cloud_provider}
    * Storage Type: {storage_type}
    * Data Format: {format}
    * Also Load to SQL DB: {use_sql_db}
    * Schedule: Asset-driven (file trigger)
    """.format(
        cloud_provider=CLOUD_PROVIDER,
        storage_type=STORAGE_TYPE,
        format=OUTPUT_FORMAT,
        use_sql_db=USE_SQL_DB
    )

    @task()
    def fetch_gtfs():
        """Fetch GTFS-RT data from the configured API"""
        # Get API key from connection if configured
        try:
            conn = Connection.get_connection_from_secrets("gtfs_api")
            api_key = conn.password if conn else os.getenv("MTA_API_KEY")
        except:
            api_key = os.getenv("MTA_API_KEY")

        # Initialize fetcher
        fetcher = GTFSFetcher(api_url=API_URL, api_key=api_key)

        # Get the data
        logging.info(f"Fetching GTFS data from {API_URL}")
        try:
            data = fetcher.fetch_and_parse()
            logging.info(f"Successfully fetched {len(data)} GTFS entities")
            return data
        except Exception as e:
            logging.error(f"Error fetching GTFS data: {e}")
            raise

    @task()
    def process_data(data):
        """Process the GTFS data before storing"""
        # Add processing timestamp
        processed_data = []
        processing_time = datetime.now().isoformat()

        for entity in data:
            # Add processing metadata
            entity['_processing_time'] = processing_time
            processed_data.append(entity)

        logging.info(f"Processed {len(processed_data)} GTFS entities")
        return processed_data

    @task()
    def transform_for_sql(data):
        """Transform data into a format suitable for SQL insertion"""
        if not data:
            return []
            
        sql_ready_data = []
        for entity in data:
            if 'vehicle' in entity and 'position' in entity['vehicle']:
                try:
                    vehicle_id = entity.get('id', '') or entity['vehicle'].get('vehicle', {}).get('id', 'unknown')
                    position = entity['vehicle']['position']
                    timestamp = entity['vehicle'].get('timestamp', '')
                    
                    record = (
                        vehicle_id,
                        position.get('latitude', 0),
                        position.get('longitude', 0),
                        position.get('bearing', 0),
                        position.get('speed', 0),
                        timestamp,
                        entity.get('_processing_time', '')
                    )
                    sql_ready_data.append(record)
                except (KeyError, TypeError) as e:
                    logging.warning(f"Could not extract position data from entity: {e}")
        
        logging.info(f"Transformed {len(sql_ready_data)} entities for SQL insertion")
        # Return as a list of tuples for SQL insertion
        return sql_ready_data

    @task()
    def prepare_sql_values(sql_data):
        """Convert data to SQL VALUES format for PostgresOperator"""
        if not sql_data:
            return "''"  # Empty string if no data
            
        # Convert list of tuples to SQL VALUES syntax
        values_strings = []
        for record in sql_data:
            values_str = f"('{record[0]}', {record[1]}, {record[2]}, {record[3]}, {record[4]}, '{record[5]}', '{record[6]}')"
            values_strings.append(values_str)
            
        return ", ".join(values_strings)

    @task()
    def store_data(data):
        """Store the data in the configured backend"""
        if not data:
            logging.warning("No data to store")
            return {"status": "warning", "message": "No data to store"}

        # Get the fetcher for storage methods
        try:
            conn = Connection.get_connection_from_secrets("gtfs_api")
            api_key = conn.password if conn else os.getenv("MTA_API_KEY")
        except:
            api_key = os.getenv("MTA_API_KEY")

        fetcher = GTFSFetcher(api_url=API_URL, api_key=api_key)

        # Store based on the configured backend
        try:
            if CLOUD_PROVIDER == "aws":
                location = fetcher.save_to_s3(
                    data, 
                    bucket=S3_BUCKET, 
                    prefix=S3_PREFIX, 
                    fmt=OUTPUT_FORMAT
                )
                logging.info(f"Data saved to S3: {location}")
                return {"status": "success", "location": location}

            elif CLOUD_PROVIDER == "gcp":
                if STORAGE_TYPE == "bigquery":
                    rows = fetcher.save_to_bigquery(data, BQ_DATASET, BQ_TABLE)
                    logging.info(f"Data saved to BigQuery: {rows} rows")
                    return {"status": "success", "rows": rows}
                else:
                    location = fetcher.save_to_gcs(
                        data, 
                        bucket=GCS_BUCKET, 
                        prefix=GCS_PREFIX, 
                        fmt=OUTPUT_FORMAT
                    )
                    logging.info(f"Data saved to GCS: {location}")
                    return {"status": "success", "location": location}

            elif CLOUD_PROVIDER == "azure":
                # Azure implementation would go here
                # This would use the Azure blob storage client
                logging.info("Azure storage not yet implemented")
                return {"status": "not_implemented", "message": "Azure storage not yet implemented"}

            else:  # local/duckdb
                rows = fetcher.save_to_duckdb(data, table=DUCKDB_TABLE, db_path=DUCKDB_PATH)
                logging.info(f"Data saved to DuckDB: {DUCKDB_PATH}, table: {DUCKDB_TABLE}, {rows} rows")
                return {"status": "success", "rows": rows, "database": DUCKDB_PATH}

        except Exception as e:
            logging.error(f"Error storing data: {e}")
            raise

    # Create PostgreSQL tables
    create_pg_table = PostgresOperator(
        task_id="create_gtfs_table",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE TABLE IF NOT EXISTS public.gtfs_vehicle_positions (
            vehicle_id TEXT,
            latitude DOUBLE PRECISION,
            longitude DOUBLE PRECISION,
            bearing DOUBLE PRECISION,
            speed DOUBLE PRECISION,
            timestamp TIMESTAMP,
            processing_time TIMESTAMP,
            PRIMARY KEY (vehicle_id, processing_time)
        );
        """
    )
    
    # Insert task with dynamic SQL
    @task()
    def insert_to_postgres(values):
        """Insert the values into PostgreSQL using PostgresOperator"""
        if not values or values == "''":
            logging.warning("No values to insert into PostgreSQL")
            return {"rows_inserted": 0}
            
        pg_insert = PostgresOperator(
            task_id="insert_gtfs_data",
            postgres_conn_id="postgres_default",
            sql=f"""
            INSERT INTO public.gtfs_vehicle_positions
            (vehicle_id, latitude, longitude, bearing, speed, timestamp, processing_time)
            VALUES {values}
            ON CONFLICT (vehicle_id, processing_time) 
            DO UPDATE SET
                latitude = EXCLUDED.latitude,
                longitude = EXCLUDED.longitude,
                bearing = EXCLUDED.bearing,
                speed = EXCLUDED.speed;
            """
        )
        
        pg_insert.execute(context={})
        return {"rows_inserted": values.count('),') + 1 if values else 0}
    
    # Task to clean up the flag file that triggered this DAG
    @task()
    def cleanup():
        """Clean up the flag file that triggered this DAG"""
        try:
            flag_file = "/data/gtfs/new_data.flag"
            if os.path.exists(flag_file):
                os.remove(flag_file)
                logging.info(f"Removed flag file: {flag_file}")
            else:
                logging.warning(f"Flag file not found: {flag_file}")
        except Exception as e:
            logging.error(f"Error removing flag file: {e}")
    
    # Define SQL branch based on configuration
    sql_branch = EmptyOperator(task_id="skip_sql_branch") if not USE_SQL_DB else EmptyOperator(task_id="use_sql_branch")
        
    # Define the task dependencies
    raw_data = fetch_gtfs()
    processed_data = process_data(raw_data)
    storage_result = store_data(processed_data)
    
    # SQL branch
    if USE_SQL_DB:
        sql_data = transform_for_sql(processed_data)
        sql_values = prepare_sql_values(sql_data)
        create_pg_table >> insert_to_postgres(sql_values) >> cleanup()
    
    # Main flow
    raw_data >> processed_data >> storage_result
    
    # Return the DAG result
    return {"result": storage_result}

# Instantiate the DAG
gtfs_pipeline = gtfs_data_pipeline() 


Overwriting src\orchestrators\airflow\dags\gtfs_data_pipeline.py


In [21]:
%%writefile src\orchestrators\airflow\dags\nba_ingest_pipeline.py
#!/usr/bin/env python3
"""
NBA Data Ingestion Pipeline DAG with AssetWatcher

This DAG fetches NBA game data from an API, processes it, and loads it into PostgreSQL.
It utilizes Airflow 3.0's AssetWatcher for event-driven runs and explicit SQL operations.
"""

import os
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
import requests

from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sdk import Asset, AssetWatcher
from airflow.providers.standard.triggers.file import FileSensorTrigger
from ingestion.fetch_gtfs import GTFSFetcher

# Default settings applied to all tasks
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(minutes=10),
}

# Define an asset that watches for new NBA data files in MinIO
nba_file_sensor_trigger = FileSensorTrigger(filepath="/data/nba/new_data.flag")
nba_data_asset = Asset(
    "nba_data_asset", 
    watchers=[AssetWatcher(name="nba_data_watcher", trigger=nba_file_sensor_trigger)]
)

# Define the DAG using Airflow 3.0's @dag decorator and AssetWatcher
@dag(
    default_args=default_args,
    schedule=[nba_data_asset],  # Schedule based on asset availability
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['nba', 'event-driven', 'assetwatcher', 'sql'],
)
def nba_ingest_pipeline():
    """
    ### NBA Data Ingestion Pipeline with AssetWatcher

    This DAG demonstrates how to use Airflow 3.0's AssetWatcher for event-driven processing.
    It watches for a flag file indicating new NBA data is available, then processes
    and loads the data into PostgreSQL using explicit SQL operations.

    #### Triggers:
    - File sensor watching for "/data/nba/new_data.flag"
    """

    @task()
    def fetch_nba_games():
        """Fetch NBA game data from the API"""
        # Get API key from Airflow connection or environment variable
        try:
            conn = Connection.get_connection_from_secrets("nba_api")
            api_key = conn.password
            api_url = conn.host
        except:
            api_key = os.getenv("NBA_API_KEY")
            api_url = "https://api.example.com/nba/games"

        # Get current date in format YYYY-MM-DD
        today = datetime.now().strftime("%Y-%m-%d")

        # Make API request
        headers = {"API-Key": api_key} if api_key else {}
        params = {"date": today}

        logging.info(f"Fetching NBA games data from {api_url} for date {today}")

        # For demo purposes, using sample data if API call fails
        try:
            response = requests.get(api_url, headers=headers, params=params, timeout=10)
            data = response.json().get("games", [])
        except Exception as e:
            logging.warning(f"Failed to fetch real NBA data: {e}. Using sample data instead.")
            # Sample data for demonstration
            data = [
                {
                    "id": "20240101-LAL-GSW",
                    "date": today,
                    "home_team": "LAL",
                    "away_team": "GSW",
                    "score_home": 120,
                    "score_away": 115,
                },
                {
                    "id": "20240101-BOS-MIA",
                    "date": today,
                    "home_team": "BOS",
                    "away_team": "MIA",
                    "score_home": 105,
                    "score_away": 98,
                }
            ]

        logging.info(f"Retrieved {len(data)} NBA games")
        return data

    @task()
    def process_games(games_data):
        """Process the NBA games data"""
        # Add processing metadata
        processed_data = []
        processing_time = datetime.now().isoformat()

        for game in games_data:
            # Add processing timestamp
            game['processing_time'] = processing_time
            processed_data.append(game)

        logging.info(f"Processed {len(processed_data)} NBA games")
        return processed_data
    
    @task()
    def prepare_sql_values(games):
        """Prepare SQL VALUES for insertion"""
        if not games:
            return "''"
            
        values_strings = []
        for game in games:
            # Format values for SQL INSERT
            values_str = f"('{game['id']}', '{game['date']}', '{game['home_team']}', '{game['away_team']}', {game['score_home']}, {game['score_away']}, CURRENT_TIMESTAMP)"
            values_strings.append(values_str)
            
        return ", ".join(values_strings)

    # Create NBA schema and tables if they don't exist
    create_schema = PostgresOperator(
        task_id="create_nba_schema",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE SCHEMA IF NOT EXISTS nba;
        """
    )
    
    create_table = PostgresOperator(
        task_id="create_nba_games_table",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE TABLE IF NOT EXISTS nba.games (
            game_id TEXT PRIMARY KEY,
            date DATE,
            home_team TEXT,
            away_team TEXT,
            score_home INTEGER,
            score_away INTEGER,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
    )
    
    # PostgresOperator task that takes values from previous task
    @task()
    def insert_games_to_db(values):
        """Insert games data into database using PostgresOperator"""
        if not values or values == "''":
            logging.warning("No NBA games data to save")
            return {"status": "warning", "message": "No data to save"}
            
        insert_games = PostgresOperator(
            task_id="insert_nba_games",
            postgres_conn_id="postgres_default",
            sql=f"""
            INSERT INTO nba.games 
            (game_id, date, home_team, away_team, score_home, score_away, updated_at)
            VALUES {values}
            ON CONFLICT (game_id) 
            DO UPDATE SET 
                score_home = EXCLUDED.score_home,
                score_away = EXCLUDED.score_away,
                updated_at = CURRENT_TIMESTAMP;
            """
        )
        
        insert_games.execute(context={})
        return {"status": "success", "count": values.count('),') + 1 if values else 0}
    
    # Create summary view for analytics
    create_summary_view = PostgresOperator(
        task_id="create_nba_summary_view",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE OR REPLACE VIEW nba.games_summary AS
        SELECT
            date,
            COUNT(*) AS game_count,
            AVG(score_home + score_away) AS avg_total_score,
            MAX(score_home) AS max_home_score,
            MAX(score_away) AS max_away_score,
            SUM(CASE WHEN score_home > score_away THEN 1 ELSE 0 END) AS home_wins,
            SUM(CASE WHEN score_home < score_away THEN 1 ELSE 0 END) AS away_wins
        FROM nba.games
        GROUP BY date
        ORDER BY date DESC;
        """
    )
    
    # Run analytics query - this will show up in UI
    run_analytics = PostgresOperator(
        task_id="run_nba_analytics",
        postgres_conn_id="postgres_default",
        sql="""
        -- Calculate team performance metrics
        SELECT 
            home_team AS team,
            COUNT(*) AS games_played,
            SUM(CASE WHEN score_home > score_away THEN 1 ELSE 0 END) AS wins,
            SUM(CASE WHEN score_home < score_away THEN 1 ELSE 0 END) AS losses,
            AVG(score_home) AS avg_points_scored
        FROM nba.games
        GROUP BY home_team
        
        UNION ALL
        
        SELECT 
            away_team AS team,
            COUNT(*) AS games_played,
            SUM(CASE WHEN score_away > score_home THEN 1 ELSE 0 END) AS wins,
            SUM(CASE WHEN score_away < score_home THEN 1 ELSE 0 END) AS losses,
            AVG(score_away) AS avg_points_scored
        FROM nba.games
        GROUP BY away_team
        ORDER BY wins DESC, team;
        """
    )

    @task()
    def cleanup():
        """Clean up temporary files after processing"""
        # Remove the flag file that triggered this DAG
        try:
            flag_file = "/data/nba/new_data.flag"
            if os.path.exists(flag_file):
                os.remove(flag_file)
                logging.info(f"Removed flag file: {flag_file}")
            else:
                logging.warning(f"Flag file not found: {flag_file}")
        except Exception as e:
            logging.error(f"Error removing flag file: {e}")
            
        return {"status": "success"}

    # Define task dependencies
    nba_games = fetch_nba_games()
    processed_games = process_games(nba_games)
    sql_values = prepare_sql_values(processed_games)
    
    # Database operations
    create_schema >> create_table
    
    # Execute insert and analytics
    insert_result = insert_games_to_db(sql_values)
    create_table >> insert_result >> create_summary_view >> run_analytics
    
    # Cleanup at the end
    cleanup_result = cleanup()
    run_analytics >> cleanup_result

    # Set up main flow
    nba_games >> processed_games >> sql_values >> insert_result

# Instantiate the DAG
nba_pipeline = nba_ingest_pipeline() 







Overwriting src\orchestrators\airflow\dags\nba_ingest_pipeline.py


In [22]:
%%writefile src\orchestrators\airflow\dags\weather_kafka_pipeline.py
#!/usr/bin/env python3
"""
Weather Data Kafka Pipeline DAG with AssetWatcher

This DAG processes weather data from Kafka, transforms it, and loads it into PostgreSQL.
It demonstrates Airflow 3.0's AssetWatcher for Kafka-driven events and explicit SQL operations.
"""

import os
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
import time

from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sdk import Asset, AssetWatcher
from airflow.providers.standard.triggers.file import FileSensorTrigger
from airflow.providers.kafka.hooks.kafka import KafkaHook
from ingestion.fetch_gtfs import GTFSFetcher

# Default settings applied to all tasks
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(minutes=10),
}

# Define an asset that watches for a file indicating new Kafka messages
weather_file_trigger = FileSensorTrigger(filepath="/data/weather/new_data.flag")
weather_data_asset = Asset(
    "weather_data_asset", 
    watchers=[AssetWatcher(name="weather_data_watcher", trigger=weather_file_trigger)]
)

@dag(
    default_args=default_args,
    schedule=[weather_data_asset],  # Event-driven scheduling
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['weather', 'kafka', 'event-driven', 'assetwatcher', 'sql'],
)
def weather_kafka_pipeline():
    """
    ### Weather Data Kafka Pipeline with AssetWatcher

    This DAG demonstrates how to process weather data from Kafka using
    Airflow 3.0's AssetWatcher for event-driven runs. It consumes messages
    from a Kafka topic and stores them in PostgreSQL using explicit SQL operations.

    #### Triggers:
    - File sensor watching for "/data/weather/new_data.flag"
    """

    @task()
    def consume_kafka_messages():
        """Consume weather data from Kafka"""
        topic = Variable.get("WEATHER_KAFKA_TOPIC", default_var="weather-updates")
        max_messages = int(Variable.get("WEATHER_MAX_MESSAGES", default_var="100"))

        kafka_hook = KafkaHook(kafka_conn_id="kafka_default")
        consumer = kafka_hook.get_consumer(
            topics=[topic],
            group_id="weather-pipeline-group",
            auto_offset_reset="earliest",
            enable_auto_commit=False,
            consumer_timeout_ms=10000  # 10 seconds timeout
        )

        messages = []
        message_count = 0

        logging.info(f"Starting to consume messages from Kafka topic: {topic}")
        start_time = time.time()

        # Use poll to get better control over consumption
        try:
            while message_count < max_messages:
                poll_result = consumer.poll(timeout_ms=5000, max_records=max_messages)
                if not poll_result:
                    break

                # Process all partitions and messages
                for tp, records in poll_result.items():
                    for record in records:
                        try:
                            message = json.loads(record.value.decode('utf-8'))
                            message['_metadata'] = {
                                'topic': record.topic,
                                'partition': record.partition,
                                'offset': record.offset,
                                'timestamp': record.timestamp
                            }
                            messages.append(message)
                            message_count += 1
                        except json.JSONDecodeError:
                            logging.warning(f"Skipping non-JSON message: {record.value}")

                # Commit offsets after processing
                consumer.commit()

            logging.info(f"Consumed {message_count} messages in {time.time() - start_time:.2f} seconds")
        finally:
            consumer.close()

        return messages

    @task()
    def process_weather_data(messages):
        """Process and transform the weather data"""
        if not messages:
            logging.warning("No weather messages to process")
            return []

        processed_data = []
        processing_time = datetime.now().isoformat()

        for message in messages:
            try:
                # Extract weather observation data
                observation = {
                    'location': message.get('location', 'unknown'),
                    'latitude': message.get('lat'),
                    'longitude': message.get('lon'),
                    'obs_time': message.get('observation_time'),
                    'temperature': message.get('temp_c'),
                    'humidity': message.get('humidity'),
                    'pressure': message.get('pressure_mb'),
                    'wind_speed': message.get('wind_kph'),
                    'wind_direction': message.get('wind_dir'),
                    'conditions': message.get('condition', {}).get('text'),
                    '_processing_time': processing_time
                }
                processed_data.append(observation)
            except Exception as e:
                logging.error(f"Error processing weather message: {e}")

        logging.info(f"Processed {len(processed_data)} weather observations")
        return processed_data
    
    @task()
    def prepare_sql_values(observations):
        """Prepare SQL VALUES for insertion"""
        if not observations:
            return "''"
            
        values_strings = []
        for obs in observations:
            # Format values for SQL INSERT
            values_str = f"""(
                '{obs['location']}', 
                {obs['latitude'] if obs['latitude'] is not None else 'NULL'}, 
                {obs['longitude'] if obs['longitude'] is not None else 'NULL'}, 
                '{obs['obs_time']}', 
                {obs['temperature'] if obs['temperature'] is not None else 'NULL'}, 
                {obs['humidity'] if obs['humidity'] is not None else 'NULL'}, 
                {obs['pressure'] if obs['pressure'] is not None else 'NULL'}, 
                {obs['wind_speed'] if obs['wind_speed'] is not None else 'NULL'}, 
                '{obs['wind_direction']}', 
                '{obs['conditions'] if obs['conditions'] else ''}'
            )"""
            values_strings.append(values_str)
            
        return ", ".join(values_strings)

    # Create schema and tables with PostgresOperator
    create_schema = PostgresOperator(
        task_id="create_weather_schema",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE SCHEMA IF NOT EXISTS weather;
        """
    )
    
    create_weather_table = PostgresOperator(
        task_id="create_weather_table",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE TABLE IF NOT EXISTS weather.observations (
            id SERIAL PRIMARY KEY,
            location TEXT NOT NULL,
            latitude DOUBLE PRECISION,
            longitude DOUBLE PRECISION,
            obs_time TIMESTAMP,
            temperature DOUBLE PRECISION,
            humidity DOUBLE PRECISION,
            pressure DOUBLE PRECISION,
            wind_speed DOUBLE PRECISION,
            wind_direction TEXT,
            conditions TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
    )
    
    # PostgresOperator task that takes values from previous task
    @task()
    def insert_weather_data(values):
        """Insert weather data using PostgresOperator"""
        if not values or values == "''":
            logging.warning("No weather data to insert")
            return {"status": "warning", "message": "No data to insert"}
            
        insert_data = PostgresOperator(
            task_id="insert_weather_data",
            postgres_conn_id="postgres_default",
            sql=f"""
            INSERT INTO weather.observations
            (location, latitude, longitude, obs_time, temperature, humidity,
             pressure, wind_speed, wind_direction, conditions)
            VALUES {values};
            """
        )
        
        insert_data.execute(context={})
        return {"status": "success", "count": values.count('),') + 1 if values else 0}
    
    # Create summary tables and views
    create_daily_summary = PostgresOperator(
        task_id="create_weather_daily_summary",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE TABLE IF NOT EXISTS weather.daily_summary (
            summary_date DATE PRIMARY KEY,
            location_count INTEGER,
            avg_temperature DOUBLE PRECISION,
            min_temperature DOUBLE PRECISION,
            max_temperature DOUBLE PRECISION,
            avg_humidity DOUBLE PRECISION,
            avg_pressure DOUBLE PRECISION,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
    )
    
    # Run analytics to summarize weather data
    run_analytics = PostgresOperator(
        task_id="run_weather_analytics",
        postgres_conn_id="postgres_default",
        sql="""
        -- Insert into daily summary table
        INSERT INTO weather.daily_summary
        (summary_date, location_count, avg_temperature, min_temperature, 
         max_temperature, avg_humidity, avg_pressure)
        SELECT 
            DATE(obs_time) AS summary_date,
            COUNT(DISTINCT location) AS location_count,
            AVG(temperature) AS avg_temperature,
            MIN(temperature) AS min_temperature,
            MAX(temperature) AS max_temperature,
            AVG(humidity) AS avg_humidity,
            AVG(pressure) AS avg_pressure
        FROM weather.observations
        WHERE obs_time >= CURRENT_DATE - INTERVAL '1 day'
        GROUP BY DATE(obs_time)
        ON CONFLICT (summary_date) 
        DO UPDATE SET
            location_count = EXCLUDED.location_count,
            avg_temperature = EXCLUDED.avg_temperature,
            min_temperature = EXCLUDED.min_temperature,
            max_temperature = EXCLUDED.max_temperature,
            avg_humidity = EXCLUDED.avg_humidity,
            avg_pressure = EXCLUDED.avg_pressure,
            created_at = CURRENT_TIMESTAMP;
            
        -- Calculate location-level statistics
        SELECT 
            location,
            COUNT(*) AS observation_count,
            AVG(temperature) AS avg_temp,
            AVG(humidity) AS avg_humidity,
            MIN(temperature) AS min_temp,
            MAX(temperature) AS max_temp
        FROM weather.observations
        WHERE obs_time >= CURRENT_DATE - INTERVAL '7 days'
        GROUP BY location
        ORDER BY observation_count DESC;
        """
    )

    @task()
    def cleanup():
        """Clean up temporary files after processing"""
        try:
            flag_file = "/data/weather/new_data.flag"
            if os.path.exists(flag_file):
                os.remove(flag_file)
                logging.info(f"Removed flag file: {flag_file}")
            else:
                logging.warning(f"Flag file not found: {flag_file}")
        except Exception as e:
            logging.error(f"Error removing flag file: {e}")
        
        return {"status": "success"}

    # Define task dependencies
    kafka_messages = consume_kafka_messages()
    processed_data = process_weather_data(kafka_messages)
    sql_values = prepare_sql_values(processed_data)
    
    # Database tasks
    create_schema >> create_weather_table
    create_weather_table >> create_daily_summary
    
    # Insert and analyze
    insert_result = insert_weather_data(sql_values)
    create_daily_summary >> insert_result >> run_analytics
    
    # Cleanup
    cleanup_result = cleanup()
    run_analytics >> cleanup_result
    
    # Set up main flow
    kafka_messages >> processed_data >> sql_values >> insert_result

# Instantiate the DAG
weather_pipeline = weather_kafka_pipeline() 





Overwriting src\orchestrators\airflow\dags\weather_kafka_pipeline.py


In [23]:
%%writefile src\orchestrators\airflow\dags\user_metrics_etl.py

# src/orchestrators/airflow/dags/user_metrics_etl.py
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago

@dag(start_date=days_ago(1), schedule_interval="@daily", tags=["metrics", "sql"])
def user_metrics_etl():
    @task()
    def extract():
        # …
        return data

    @task()
    def transform(data):
        # …
        return sql_values

    create_table = PostgresOperator(
        task_id="create_user_metrics_table",
        postgres_conn_id="postgres_default",
        sql="""
        CREATE TABLE IF NOT EXISTS user_metrics (…);
        """
    )

    load = PostgresOperator(
        task_id="load_user_metrics",
        postgres_conn_id="postgres_default",
        sql="INSERT INTO user_metrics VALUES {{ params.values }};",
        parameters={"values": "{{ ti.xcom_pull('transform') }}"}
    )

    data = extract()
    vals = transform(data)
    create_table >> load

user_metrics_etl_dag = user_metrics_etl()


Overwriting src\orchestrators\airflow\dags\user_metrics_etl.py


In [24]:
%%writefile src\orchestrators\airflow\tests\test_dag_integrity.py


#!/usr/bin/env python3
"""
Tests for Airflow DAG integrity.

This module contains tests to ensure our DAGs load and have a valid structure.
"""

import os
import logging
import unittest
from pathlib import Path
from airflow.models import DagBag

# Get DAGs directory
DAGS_DIR = Path(__file__).parents[1] / "dags"


class TestDagIntegrity(unittest.TestCase):
    """Test DAG integrity."""

    @classmethod
    def setUpClass(cls):
        """Set up test class."""
        cls.dagbag = DagBag(dag_folder=str(DAGS_DIR), include_examples=False)

    def test_no_import_errors(self):
        """Test there are no import errors in our DAGs."""
        import_errors = self.dagbag.import_errors
        self.assertEqual(
            len(import_errors), 0, f"DAG import errors: {import_errors}"
        )

    def test_all_dags_have_required_fields(self):
        """Test that all DAGs have the required fields."""
        for dag_id, dag in self.dagbag.dags.items():
            self.assertIsNotNone(dag.default_args.get('owner', None),
                                f"{dag_id}: Missing owner in default_args")
            self.assertIsNotNone(dag.default_args.get('retries', None),
                                f"{dag_id}: Missing retries in default_args")
            self.assertIsNotNone(dag.default_args.get('retry_delay', None),
                                f"{dag_id}: Missing retry_delay in default_args")

    def test_all_dags_have_tags(self):
        """Test that all DAGs have tags."""
        for dag_id, dag in self.dagbag.dags.items():
            tags = dag.tags
            self.assertTrue(
                len(tags) > 0, f"{dag_id}: No tags specified for DAG"
            )


if __name__ == '__main__':
    unittest.main() 

Overwriting src\orchestrators\airflow\tests\test_dag_integrity.py
