In [2]:
import duckdb
import pandas as pd
import requests
import json
from datetime import datetime, timedelta
import os
from pathlib import Path
import geopandas as gpd
from pyspainmobility import Mobility

In [None]:
# Import required libraries
import duckdb
import pandas as pd
import requests
import json
from datetime import datetime, timedelta
import os
from pathlib import Path
import geopandas as gpd
from pyspainmobility import Mobility

# Initialize DuckDB and create schema
def initialize_duckdb():
    """Initialize DuckDB connection and create schemas for 3-tier architecture"""
    conn = duckdb.connect('spain_mobility_lakehouse.duckdb')
    
    # Create schemas for each tier
    schemas = ['bronze', 'silver', 'gold']
    for schema in schemas:
        conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
    
    return conn

conn = initialize_duckdb()
print("DuckDB initialized with bronze, silver, gold schemas")

# Create Bronze layer tables
def create_bronze_tables(conn):
    """Create raw data tables in Bronze layer"""
    
    # MITMA OD data table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS bronze.mitma_od_daily (
        date DATE,
        hour INTEGER,
        id_origin VARCHAR,
        id_destination VARCHAR,
        n_trips DOUBLE,
        trips_total_length_km DOUBLE,
        activity_origin VARCHAR,
        activity_destination VARCHAR,
        loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    """)
    
    # MITMA overnight stays table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS bronze.mitma_overnight_stays (
        date DATE,
        residence_area VARCHAR,
        overnight_stay_area VARCHAR,
        people DOUBLE,
        loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    """)
    
    # MITMA number of trips table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS bronze.mitma_number_trips (
        date DATE,
        overnight_stay_area VARCHAR,
        age VARCHAR,
        gender VARCHAR,
        number_of_trips VARCHAR,
        people DOUBLE,
        loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    """)
    
    # INE demographic data table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS bronze.ine_demographics (
        zone_id VARCHAR,
        zone_name VARCHAR,
        population INTEGER,
        density_km2 DOUBLE,
        gender VARCHAR,
        age_group VARCHAR,
        count INTEGER,
        year INTEGER,
        loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    """)

    #MITMA municiples names
    conn.execute("""
            CREATE OR REPLACE TABLE bronze.mitma_municiples_name (
                                                   id          VARCHAR,
                                                   name        VARCHAR(128),
                );
            """)
    #MITMA municiples population 2023
    conn.execute("""
            CREATE OR REPLACE TABLE bronze.mitma_municiples_2023_population (
                                                   id          VARCHAR,
                                                   population       INTEGER ,
                );
            """)

    conn.execute("""
            CREATE OR REPLACE TABLE bronze.mitma_ine_zone_code (
                                                   section_ine        VARCHAR,
                                                   district_ine        VARCHAR,
                                                   municiple_ine    VARCHAR,
                                                   district_mitma   VARCHAR,
                                                   municiple_mitma VARCHAR,
                                                   gau_mitma VARCHAR


                );
                    """)
    
    
    print("Bronze layer tables created")

create_bronze_tables(conn)

# Download and ingest 2 days of MITMA data
def download_mobility_data():
    """Download 2 days of mobility data using pyspainmobility"""
    
    # Calculate dates for 2 consecutive days
    start_date = "2023-01-01"  # Using 2023 as reference year
    end_date = "2023-01-02"
    
    print(f"Downloading data from {start_date} to {end_date}")
    
    # Initialize mobility object
    mobility_data = Mobility(
        version=2,
        zones='municipalities',
        start_date=start_date,
        end_date=end_date,
        output_directory='./data/raw',
        use_dask=False
    )
    
    # Download OD data
    print("Downloading OD data...")
    od_df = mobility_data.get_od_data(keep_activity=True, return_df=True)
    
    # Download overnight stays
    print("Downloading overnight stays data...")
    overnight_df = mobility_data.get_overnight_stays_data(return_df=True)
    
    # Download number of trips
    print("Downloading number of trips data...")
    trips_df = mobility_data.get_number_of_trips_data(return_df=True)
    
    return od_df, overnight_df, trips_df

# Download the data
od_data, overnight_data, trips_data = download_mobility_data()

# Ingest data into Bronze layer
def ingest_bronze_data(conn, od_data, overnight_data, trips_data):
    """Ingest downloaded data into Bronze tables"""
    
    # Insert OD data
    conn.register('od_data_df', od_data)
    conn.execute("""
    INSERT INTO bronze.mitma_od_daily 
    (date, hour, id_origin, id_destination, n_trips, trips_total_length_km, activity_origin, activity_destination)
    SELECT 
        date, hour, id_origin, id_destination, n_trips, trips_total_length_km, 
        activity_origin, activity_destination
    FROM od_data_df
    """)
    
    # Insert overnight stays data
    conn.register('overnight_data_df', overnight_data)
    conn.execute("""
    INSERT INTO bronze.mitma_overnight_stays 
    (date, residence_area, overnight_stay_area, people)
    SELECT date, residence_area, overnight_stay_area, people
    FROM overnight_data_df
    """)
    
    # Insert number of trips data
    conn.register('trips_data_df', trips_data)
    conn.execute("""
    INSERT INTO bronze.mitma_number_trips 
    (date, overnight_stay_area, age, gender, number_of_trips, people)
    SELECT date, overnight_stay_area, age, gender, number_of_trips, people
    FROM trips_data_df
    """)
    # Insert municiples names from csv
    conn.execute(f"""--sql
        INSERT INTO bronze.mitma_municiples_name 
        SELECT 
            CAST(ID AS VARCHAR) as id,
            CAST(name AS VARCHAR) as name,

        FROM read_csv('MITMA_data/nombres_municipios.csv');
        
    """)
    # Insert municiples population from csv
    conn.execute(f"""--sql
        INSERT INTO bronze.mitma_municiples_2023_population 
        SELECT 
            CAST(ID AS VARCHAR) as id,
            TRY_CAST(poblacion AS INTEGER) as population ,

        FROM read_csv('MITMA_data/poblacion_municipios.csv');
        
    """)
    # Insert gau/district/municiples codes MITMA INE equivalence
    conn.execute(f"""--sql
        INSERT INTO bronze.mitma_ine_zone_code
        SELECT 
            CAST(seccion_ine AS VARCHAR) as section_ine,
            CAST(distrito_ine AS VARCHAR) as district_ine,
            CAST(municipio_ine AS VARCHAR) as municiple_ine,
            CAST(distrito_mitma AS VARCHAR) as district_mitma,
            CAST(municipio_mitma AS VARCHAR) as municiple_mitma,
            CAST(gau_mitma AS VARCHAR) as gau_mitma,

        FROM read_csv('MITMA_data/relacion_ine_zonificacionMitma.csv');
        
    """)
    
    print("Data ingested into Bronze layer")

ingest_bronze_data(conn, od_data, overnight_data, trips_data)

# Create Silver layer tables (cleaned and integrated)
def create_silver_tables(conn):
    """Create Silver layer tables with cleaned and integrated data"""
    
    # Integrated OD table with basic cleaning
    conn.execute("""
    CREATE TABLE IF NOT EXISTS silver.integrated_od AS
    SELECT 
        date,
        hour,
        id_origin,
        id_destination,
        -- Remove negative trips and handle nulls
        CASE WHEN n_trips < 0 THEN 0 ELSE COALESCE(n_trips, 0) END as n_trips,
        CASE WHEN trips_total_length_km < 0 THEN 0 ELSE COALESCE(trips_total_length_km, 0) END as trips_total_length_km,
        COALESCE(activity_origin, 'unknown') as activity_origin,
        COALESCE(activity_destination, 'unknown') as activity_destination,
        -- Add derived columns
        CASE 
            WHEN hour BETWEEN 6 AND 9 THEN 'morning_peak'
            WHEN hour BETWEEN 17 AND 20 THEN 'evening_peak' 
            ELSE 'off_peak'
        END as time_period,
        loaded_at
    FROM bronze.mitma_od_daily
    WHERE date IS NOT NULL
    """)
    
    # Zone metrics table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS silver.zone_metrics AS
    SELECT 
        overnight_stay_area as zone_id,
        date,
        SUM(people) as total_people,
        COUNT(DISTINCT residence_area) as unique_residence_areas
    FROM bronze.mitma_overnight_stays
    GROUP BY overnight_stay_area, date
    """)
    
    # Demographic aggregates
    conn.execute("""
    CREATE TABLE IF NOT EXISTS silver.demographic_aggregates AS
    SELECT 
        overnight_stay_area as zone_id,
        date,
        age,
        gender,
        number_of_trips,
        SUM(people) as total_people
    FROM bronze.mitma_number_trips
    GROUP BY overnight_stay_area, date, age, gender, number_of_trips
    """)
    conn.execute("""
    CREATE TABLE IF NOT EXISTS silver.mitma_municiples_2023 AS
    SELECT 
        id as zone_id,
        name,
        population

    FROM bronze.mitma_municiples_name JOIN bronze.mitma_municiples_2023_population USING(id)
    """)
    
    
    print("Silver layer tables created")

create_silver_tables(conn)

# Create Gold layer tables 
def create_gold_tables(conn):
    """Create Gold layer tables with business-ready aggregates"""
    
    # Typical day patterns - hourly aggregates
    conn.execute("""
    CREATE TABLE IF NOT EXISTS gold.typical_day_patterns AS
    SELECT 
        id_origin,
        id_destination,
        hour,
        time_period,
        AVG(n_trips) as avg_trips,
        AVG(trips_total_length_km) as avg_distance_km,
        COUNT(*) as observation_count
    FROM silver.integrated_od
    GROUP BY id_origin, id_destination, hour, time_period
    """)
    
    # Zone mobility summary
    conn.execute("""
    CREATE TABLE IF NOT EXISTS gold.zone_mobility_summary AS
    SELECT 
        zone_id,
        AVG(total_people) as avg_daily_people,
        MAX(total_people) as max_daily_people,
        MIN(total_people) as min_daily_people,
        COUNT(*) as days_observed
    FROM silver.zone_metrics
    GROUP BY zone_id
    """)
    
    # Trip distance distribution
    conn.execute("""
    CREATE TABLE IF NOT EXISTS gold.trip_distance_analysis AS
    SELECT 
        CASE 
            WHEN trips_total_length_km < 5 THEN '0-5km'
            WHEN trips_total_length_km < 15 THEN '5-15km'
            WHEN trips_total_length_km < 50 THEN '15-50km'
            ELSE '50+km'
        END as distance_category,
        time_period,
        COUNT(*) as trip_count,
        AVG(n_trips) as avg_trips_per_od,
        SUM(n_trips) as total_trips
    FROM silver.integrated_od
    GROUP BY distance_category, time_period
    """)
    
    print("Gold layer tables created")

create_gold_tables(conn)

# Cell 8: Verify data ingestion
def verify_ingestion(conn):
    """Verify data has been properly ingested across all tiers"""
    
    print("=== BRONZE LAYER COUNTS ===")
    bronze_tables = ['mitma_od_daily', 'mitma_overnight_stays', 'mitma_number_trips']
    for table in bronze_tables:
        count = conn.execute(f"SELECT COUNT(*) FROM bronze.{table}").fetchone()[0]
        print(f"{table}: {count} records")
    
    print("\n=== SILVER LAYER COUNTS ===")
    silver_tables = ['integrated_od', 'zone_metrics', 'demographic_aggregates']
    for table in silver_tables:
        count = conn.execute(f"SELECT COUNT(*) FROM silver.{table}").fetchone()[0]
        print(f"{table}: {count} records")
    
    print("\n=== GOLD LAYER COUNTS ===")
    gold_tables = ['typical_day_patterns', 'zone_mobility_summary', 'trip_distance_analysis']
    for table in gold_tables:
        count = conn.execute(f"SELECT COUNT(*) FROM gold.{table}").fetchone()[0]
        print(f"{table}: {count} records")

verify_ingestion(conn)

# Cell 9: Close connection
conn.close()
print("Setup completed successfully!")

DuckDB initialized with bronze, silver, gold schemas
Bronze layer tables created
Downloading data from 2023-01-01 to 2023-01-02
Downloading OD data...
Downloading file from https://movilidad-opendata.mitma.es/estudios_basicos/por-municipios/viajes/ficheros-diarios/2023-01/20230101_Viajes_municipios.csv.gz
Downloading file from https://movilidad-opendata.mitma.es/estudios_basicos/por-municipios/viajes/ficheros-diarios/2023-01/20230102_Viajes_municipios.csv.gz
Generating parquet file for ODs....


  0%|          | 0/2 [00:00<?, ?it/s]

Processing file: C:\Users\Jorge\./data/raw\20230101_Viajes_municipios_v2.csv.gz
Reading gzipped file...


 50%|█████     | 1/2 [00:23<00:23, 23.63s/it]

Processing file: C:\Users\Jorge\./data/raw\20230102_Viajes_municipios_v2.csv.gz
Reading gzipped file...


100%|██████████| 2/2 [00:47<00:00, 23.55s/it]


Concatenating all the dataframes....
Writing the parquet file....
Parquet file generated successfully at  C:\Users\Jorge\./data/raw\Viajes_municipios_2023-01-01_2023-01-02_v2.parquet
Downloading overnight stays data...
Downloading file from https://movilidad-opendata.mitma.es/estudios_basicos/por-municipios/pernoctaciones/ficheros-diarios/2023-01/20230101_Pernoctaciones_municipios.csv.gz
Downloading file from https://movilidad-opendata.mitma.es/estudios_basicos/por-municipios/pernoctaciones/ficheros-diarios/2023-01/20230102_Pernoctaciones_municipios.csv.gz
Generating parquet file for Overnight Stays....


100%|██████████| 2/2 [00:00<00:00,  5.85it/s]


Concatenating all the dataframes....
Writing the parquet file....
Parquet file generated successfully at  C:\Users\Jorge\./data/raw\Pernoctaciones_municipios_2023-01-01_2023-01-02_v2.parquet
Downloading number of trips data...
Downloading file from https://movilidad-opendata.mitma.es/estudios_basicos/por-municipios/personas/ficheros-diarios/2023-01/20230101_Personas_dia_municipios.csv.gz
Downloading file from https://movilidad-opendata.mitma.es/estudios_basicos/por-municipios/personas/ficheros-diarios/2023-01/20230102_Personas_dia_municipios.csv.gz
Generating parquet file for Number of Trips....


100%|██████████| 2/2 [00:00<00:00, 19.68it/s]

Concatenating all the dataframes....
Writing the parquet file....
Parquet file generated successfully at  C:\Users\Jorge\./data/raw\Personas_municipios_2023-01-01_2023-01-02_v2.parquet





Data ingested into Bronze layer
Silver layer tables created
Gold layer tables created
=== BRONZE LAYER COUNTS ===
mitma_od_daily: 8650011 records
mitma_overnight_stays: 1025130 records
mitma_number_trips: 162534 records

=== SILVER LAYER COUNTS ===
integrated_od: 8650011 records
zone_metrics: 5123 records
demographic_aggregates: 162534 records

=== GOLD LAYER COUNTS ===
typical_day_patterns: 2190136 records
zone_mobility_summary: 2562 records
trip_distance_analysis: 12 records
Setup completed successfully!
