# Sprint 2: Schema Design

#### Initial configuration and conection to duckdb and ducklake

In [None]:
import duckdb
import pandas as pd
import os
import glob

# --- 1. Configuration & Paths ---
RAW_DATA_PATH = '../data/raw/'
LAKEHOUSE_PATH = '../data/lakehouse'
METADATA_PATH = os.path.join(LAKEHOUSE_PATH, 'metadata.duckdb')

# Create the base directory if it doesn't exist
os.makedirs(LAKEHOUSE_PATH, exist_ok=True)

# --- 2. Initialize DuckDB & Load DuckLake Extension ---
# Connect to in-memory DuckDB (Compute Layer)
con = duckdb.connect(database=':memory:')

print("--- Initializing DuckLake Extension ---")

# ‚úÖ ACTIVATE DUCKLAKE: This downloads/installs the extension if missing
# and loads it into the current session.
try:
    con.execute("INSTALL ducklake;")
    con.execute("LOAD ducklake;")
    print("‚úÖ Extension 'ducklake' loaded successfully.")
except Exception as e:
    print(f"‚ùå Error loading 'ducklake'. Make sure the extension is available in your environment.\nError: {e}")

#### Attach the Catalog and Schema Management

In [None]:
# --- 3. Attach the Catalog ---
# We attach the persistent storage. 
# Note: Depending on your specific DuckLake version, the syntax for ATTACH might vary slightly.
# This assumes standard syntax where we point to the metadata file.
con.execute(f"ATTACH 'ducklake:{METADATA_PATH}' AS lakehouse")
print(f"‚úÖ Lakehouse catalog attached at: {METADATA_PATH}")

# --- 4. Schema Management ---
# Create logical schemas within the managed catalog
schemas = ['bronze', 'silver', 'gold']
for schema in schemas:
    con.execute(f"CREATE SCHEMA IF NOT EXISTS lakehouse.{schema}")
print(f"‚úÖ Schemas ready: {', '.join(schemas)}")

---
# Bronze Layer

### Create table of mobility without data

In [None]:
# --- 5. Ingestion: Mobility Data (Bronze Layer) ---
mitma_raw_glob_path = os.path.join(RAW_DATA_PATH, 'mitma', '*_Viajes_municipios.csv.gz')
mobility_files = glob.glob(mitma_raw_glob_path)

print(f"\n--- Ingesting Mobility Data ---")
if not mobility_files:
    print("‚ùå No mobility files found!")
else:
    print(f"-> Found {len(mobility_files)} files.")
    
    # Use CREATE TABLE to let DuckLake manage the data
    # This creates a transaction, writes the Parquet files, and updates metadata.
    query_mobility = f"""
        CREATE OR REPLACE TABLE lakehouse.bronze.mobility_sample_week 
        AS
        SELECT 
            *,
            CURRENT_TIMESTAMP AS ingestion_timestamp,
            'https://www.transportes.gob.es/ministerio/proyectos-singulares/estudios-de-movilidad-con-big-data/opendata-movilidad' AS source_url
        FROM read_csv_auto({mobility_files}, filename=true, all_varchar=true)
        LIMIT 0;
    """
    con.execute(query_mobility)
    print(f"‚úÖ Table created: lakehouse.bronze.mobility_sample_week ")

#### Creating partition for mobility date

In [None]:
con.execute(f"""
        ALTER TABLE lakehouse.bronze.mobility_sample_week  
        SET PARTITIONED BY (fecha);
    """)

#### Inserting the data into the partitioned table

In [None]:
query_mobility = f"""
    INSERT INTO lakehouse.bronze.mobility_sample_week 
    SELECT 
        *,
        CURRENT_TIMESTAMP AS ingestion_timestamp,
        'https://www.transportes.gob.es/ministerio/proyectos-singulares/estudios-de-movilidad-con-big-data/opendata-movilidad' AS source_url
    FROM read_csv_auto({mobility_files}, filename=true, all_varchar=true, ignore_errors=true);
"""
con.execute(query_mobility)
print(f"‚úÖ Transformed & Ingested: lakehouse.bronze.mobility_sample_week")

#### Data and Schema Preview of Mobility Files

In [None]:
# --- INSPECTION & METADATA CHECK ---
print("\n--- üîç INSPECTION: Mobility Table ---")

# 1. Content Preview
# Verify that 'origen' and 'destino' columns look like municipal codes (5 digits)
print("\n[1] Data Preview (First 5 rows):")
con.execute("SELECT * FROM lakehouse.bronze.mobility_sample_week  LIMIT 5").df()

In [None]:
# 2. Schema Check
# Confirm column names and ensure types are currently VARCHAR (as expected for Bronze)
print("\n[2] Schema (Columns & Types):")
con.execute("DESCRIBE lakehouse.bronze.mobility_sample_week ").df()

In [None]:
# 3. Quality Profile
# Check for 100% nulls or weird values. This might take a moment.
print("\n[3] Data Quality Profile (Nulls & Unique Values):")
con.execute("SUMMARIZE lakehouse.bronze.mobility_sample_week ").df()

### Ingesting other tables

In [None]:
# --- 6. Ingestion: Auxiliary Tables (Refactorizado con Linaje) ---

# Define URLs 
URL_MITMA = "https://www.transportes.gob.es/ministerio/proyectos-singulares/estudios-de-movilidad-con-big-data/opendata-movilidad"
URL_INE = "https://www.ine.es/"
URL_CNIG = "https://centrodedescargas.cnig.es/CentroDescargas/index.jsp"
URL_MTDFP = "https://datos.gob.es/es/catalogo/l01280796-calendario-laboral"

def ingest_dimension(table_name, filename, source_url, folder='mitma', sep=';', encoding='utf-8', **kwargs):
    path = os.path.join(RAW_DATA_PATH, folder, filename)
    
    if os.path.exists(path):
        # Escape single quotes in the URL for SQL safety
        safe_url = source_url.replace("'", "''")

        options = f"filename=true, all_varchar=true, sep='{sep}', encoding='{encoding}'"

        for key, value in kwargs.items():
            # Handle boolean SQL syntax (true/false instead of True/False)
            if isinstance(value, bool):
                sql_val = str(value).lower()
            else:
                sql_val = f"'{value}'"
            options += f", {key}={sql_val}"
            
        print(f"-> Ingesting {table_name} with options: [{options}]")
        
        # Read the CSV and append audit columns
        # We use robust typing (try_cast/all_varchar) to prevent failures if any numeric column contains irregular characters
        con.execute(f"""
            CREATE OR REPLACE TABLE lakehouse.bronze.{table_name} AS
            SELECT 
                *,
                CURRENT_TIMESTAMP AS ingestion_timestamp,
                '{safe_url}' AS source_url
            FROM read_csv_auto('{path}', {options});
        """)
        print(f"‚úÖ Ingested: lakehouse.bronze.{table_name} (Source: {source_url})")
    else:
        print(f"‚ö†Ô∏è Missing file: {filename}")

print("\n--- Ingesting Dictionaries & Dimensions ---")

# 1. Nombres de Distritos (MITMA)
# Fuente: Open Data Movilidad
ingest_dimension('zoning_municipalities', 'nombres_municipios.csv', source_url=URL_MITMA, folder='mitma', sep='|', header=True)

# 2. Poblaci√≥n por Distrito (MITMA)
# Fuente: Open Data Movilidad
ingest_dimension('population_municipalities', 'poblacion_municipios.csv', source_url=URL_MITMA, folder='mitma', sep='|')

# 3. Relaci√≥n Zonificaci√≥n MITMA <-> INE
# Fuente: Open Data Movilidad
ingest_dimension('mapping_ine_mitma', 'relacion_ine_zonificacionMitma.csv', source_url=URL_MITMA, folder='mitma', sep='|')

# 4. Renta Media (INE)
# Fuente: Instituto Nacional de Estad√≠stica
ingest_dimension('ine_rent_municipalities', 'ine_renta.csv', source_url=URL_INE, folder='ine', sep=';')

# 5. Coordenadas Municipales (IGN/CNIG)
# Fuente: Centro de Descargas del CNIG
ingest_dimension('municipal_coordinates', 'municipios_coordenadas.csv', source_url=URL_CNIG, folder='ine', sep=';')

# 6. Calendarios Laborales (MTDFP)
# Fuente: Centro de Descargas del MTDFP
ingest_dimension('work_calendars', 'calendario.csv', source_url=URL_MTDFP, folder='ine', sep=';')

#### Inspection: Rent Table

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.bronze.mapping_ine_mitma"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table} WHERE municipio_ine LIKE 'NA' LIMIT 5").df()

In [None]:
# 2. Schema Metadata (Structure)
# Shows column names and types. Since we used 'all_varchar=true', everything should be VARCHAR.
print("\n[2] Schema Metadata (Columns & Types):")
con.execute(f"DESCRIBE {target_table}").df()

In [None]:
# 3. Quality Profile (Statistics)
# Check 'approx_unique' to see how many municipalities have data
# Check 'null_percentage' to ensure the ingestion didn't fail silently
print("\n[3] Quality Statistics (Nulls & Uniques):")
con.execute(f"SUMMARIZE {target_table}").df()

#### Final Check: table names and schema name

In [None]:
# --- 7. Final Check ---
print("\n--- Current Lakehouse State (Bronze Layer) ---")

# We use the internal system function 'duckdb_tables()'
# This function sees EVERYTHING connected to the current session, regardless of the extension used.
query_check = """
    SELECT table_name, schema_name
    FROM duckdb_tables()
    WHERE database_name = 'lakehouse' 
      AND schema_name = 'bronze';
"""
df_result = con.execute(query_check).df()
print(df_result)

---
# Silver Layer

### Zone table

In [None]:
# Check column names to ensure our JOIN uses the right keys
print("--- Checking Columns ---")
print("Zones (MITMA):", con.execute("DESCRIBE lakehouse.bronze.zoning_municipalities").fetch_df()['column_name'].tolist())
print("Mapping (INE):", con.execute("DESCRIBE lakehouse.bronze.work_calendars").fetch_df()['column_name'].tolist())

In [None]:
print("\n--- Building Silver Table: dim_zones ---")

query_dim_zones = """--sql
    CREATE OR REPLACE TABLE lakehouse.silver.dim_zones AS
    WITH unique_mapping AS (
        -- CRITICAL CHANGE: We GROUP BY the MITMA code.
        -- We take the FIRST (Minimum) INE code found as the 'Representative' code.
        -- This ensures 1 MITMA Zone = 1 Row.
        SELECT DISTINCT 
            CAST(municipio_mitma AS VARCHAR) as mitma_ref,
            MIN(CAST(municipio_ine AS VARCHAR)) as ine_ref
        FROM lakehouse.bronze.mapping_ine_mitma
        WHERE municipio_mitma IS NOT NULL
            AND municipio_ine IS NOT NULL
            AND municipio_ine NOT LIKE 'NA'
            AND municipio_mitma NOT LIKE 'NA'
        GROUP BY municipio_mitma
    ),
    raw_zones AS (
        SELECT 
            TRIM(z.ID) AS mitma_code,
            TRIM(m.ine_ref)  AS ine_code,
            TRIM(z.name) AS zone_name
        FROM lakehouse.bronze.zoning_municipalities z
        INNER JOIN unique_mapping m 
            ON TRIM(z.ID) = TRIM(m.mitma_ref)
        WHERE z.ID IS NOT NULL AND z.ID != 'ID'
        GROUP BY z.ID, z.name, m.ine_ref
    )
    SELECT
        -- 1. Codes
        ROW_NUMBER() OVER (ORDER BY mitma_code) AS zone_id,
        mitma_code,
        ine_code,
        zone_name,
        CURRENT_TIMESTAMP AS processed_at
        
    FROM raw_zones
    ORDER BY zone_id;
"""
con.execute(query_dim_zones)
print("‚úÖ Created: lakehouse.silver.dim_zones")

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.dim_zones"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table} LIMIT 5").df()


In [None]:
# Check coverage
total = con.execute("SELECT COUNT(*) FROM lakehouse.silver.dim_zones").fetchone()[0]
mapped = con.execute("SELECT COUNT(*) FROM lakehouse.silver.dim_zones WHERE ine_code IS NOT NULL").fetchone()[0]
print(f"-> Statistics: {mapped}/{total} zones have been successfully mapped to INE codes.")

In [None]:
# 3. Quality Profile
# Check for 100% nulls or weird values. This might take a moment.
print("\n[3] Data Quality Profile (Nulls & Unique Values):")
con.execute(f"SUMMARIZE {target_table}").df()

### Population table

In [None]:
print("\n--- Building Silver Table: metric_population ---")

query_pop = """
    CREATE OR REPLACE TABLE lakehouse.silver.metric_population AS
    SELECT 
        -- 1. Linking Key (Map column0 -> zone_id)
        z.zone_id,
        
        -- 2. The Metric (Map column1 -> population)
        -- Logic:
        --   a. Cast to Integer
        CAST(TRY_CAST(column1 AS DOUBLE) AS BIGINT) AS population,
        
        -- 3. Metadata
        2023 AS year,
        CURRENT_TIMESTAMP AS processed_at
        
    FROM lakehouse.bronze.population_municipalities p
        JOIN lakehouse.silver.dim_zones z ON TRIM(p.column0) = z.mitma_code
    
    WHERE 
        -- Filter out empty rows
        column0 IS NOT NULL 
        -- Filter out the header row (if the first row contains text like 'ID' or 'Poblacion')
        AND NOT regexp_matches(column1, '[a-zA-Z]') -- Exclude rows where population contains letters
"""
con.execute(query_pop)
print("‚úÖ Created: lakehouse.silver.metric_population")

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.metric_population"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table} LIMIT 5").df()

In [None]:
stats = con.execute("""
    SELECT 
        COUNT(*) as total_rows,
        SUM(population) as total_population_spain
    FROM lakehouse.silver.metric_population
""").fetchone()
print(f"-> Integrity Check: {stats[0]} rows loaded. Total Population: {stats[1]:,}")

In [None]:
# 3. Quality Profile
# Check for 100% nulls or weird values. This might take a moment.
print("\n[3] Data Quality Profile (Nulls & Unique Values):")
con.execute(f"SUMMARIZE {target_table}").df()

### Coordinates table

In [None]:
print("\n--- Building Silver Table: dim_coordinates ---")

query_coords = """
    CREATE OR REPLACE TABLE lakehouse.silver.dim_coordinates AS
    SELECT 
        z.zone_id,
        
        -- Coordinates
        TRY_CAST(REPLACE(c.LATITUD_ETRS89, ',', '.') AS DOUBLE) AS latitude,
        TRY_CAST(REPLACE(c.LONGITUD_ETRS89, ',', '.') AS DOUBLE) AS longitude,
        
        CURRENT_TIMESTAMP AS processed_at
        
    FROM lakehouse.bronze.municipal_coordinates c
    
    -- SIMPLE JOIN: Exact string match
    JOIN lakehouse.silver.dim_zones z 
        ON LEFT(c.COD_INE, 5) = z.ine_code 
        
    WHERE z.zone_id IS NOT NULL;
"""

con.execute(query_coords)
print("‚úÖ Created: lakehouse.silver.dim_coordinates")

In [None]:
# --- CRITICAL CHECK ---
# Let's see if the "Simple Join" worked or if we lost data due to "01001" vs "1001"
total_zones_with_ine = con.execute("SELECT COUNT(*) FROM lakehouse.silver.dim_zones WHERE ine_code IS NOT NULL").fetchone()[0]
matched_coords = con.execute("SELECT COUNT(*) FROM lakehouse.silver.dim_coordinates").fetchone()[0]

print(f"\n[Match Statistics]")
print(f"Zones with INE Codes: {total_zones_with_ine}")
print(f"Zones with Coordinates: {matched_coords}")

if matched_coords < (total_zones_with_ine * 0.5):
    print("‚ö†Ô∏è WARNING: Very low match rate. It is highly likely one table has leading zeros ('01001') and the other does not ('1001').")

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.dim_coordinates"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table} LIMIT 5").df()

In [None]:
# 3. Quality Profile
# Check for 100% nulls or weird values. This might take a moment.
print("\n[3] Data Quality Profile (Nulls & Unique Values):")
con.execute(f"SUMMARIZE {target_table}").df()

### Rent table

In [None]:
print("\n--- Building Silver Table: metric_ine_rent ---")

query_rent = """
    CREATE OR REPLACE TABLE lakehouse.silver.metric_ine_rent AS
    SELECT 
        -- 1. Master Key (Zone ID from our Dimension)
        z.zone_id,
        
        -- 2. The Metric (Cleaned)
        -- Format: "13.500" -> 13500. Handle "dirty" data (like ".") using TRY_CAST
        CAST(TRY_CAST(REPLACE(r.Total, '.', '') AS DOUBLE) AS BIGINT) AS income_per_capita,
        
        -- 3. Time Reference
        CAST(r.Periodo AS INTEGER) AS year,
        
        -- 4. Metadata
        CURRENT_TIMESTAMP AS processed_at
        
    FROM lakehouse.bronze.ine_rent_municipalities r
    
    -- JOIN Logic: Match Extracted INE Code to Zone INE Code
    -- We split "01001 Name" by space to get "01001"
    JOIN lakehouse.silver.dim_zones z 
        ON split_part(r.Municipios, ' ', 1) = z.ine_code
        
    WHERE 
        -- Filter 1: Only the specific indicator requested
        r."Indicadores de renta media" = 'Renta neta media por persona'
        
        -- Filter 2: Ensure we are at Municipality level (Districts/Sections must be empty/null)
        AND (r.Distritos IS NULL OR r.Distritos = '')
        AND (r.Secciones IS NULL OR r.Secciones = '')
        
        -- Filter 3: Valid data
        AND CAST(TRY_CAST(REPLACE(r.Total, '.', '') AS DOUBLE) AS BIGINT) IS NOT NULL
        AND z.zone_id IS NOT NULL;
"""
con.execute(query_rent)
print("‚úÖ Created: lakehouse.silver.metric_ine_rent")

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.metric_ine_rent"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Available Years::")
con.execute(f"SELECT year, COUNT(*) as zones FROM lakehouse.silver.metric_ine_rent GROUP BY year").df()

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.metric_ine_rent"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table} ORDER BY zone_id LIMIT 5").df()

In [None]:
# 3. Quality Profile
# Check for 100% nulls or weird values. This might take a moment.
print("\n[3] Data Quality Profile (Nulls & Unique Values):")
con.execute(f"SUMMARIZE {target_table}").df()

### Mobility data table

In [None]:
print("\n--- Building Silver Table: fact_mobility ---")

# 1. Create Empty Table Structure (Partitioned)
query_schema = """
    CREATE OR REPLACE TABLE lakehouse.silver.fact_mobility (
        date DATE,
        hour INTEGER,
        origin_zone_id BIGINT,
        destination_zone_id BIGINT,
        trips DOUBLE,
        processed_at TIMESTAMP
    );
"""
con.execute(query_schema)

# 2. Configure Partitioning (Crucial for Speed)
con.execute("ALTER TABLE lakehouse.silver.fact_mobility SET PARTITIONED BY (date);")

# 3. Insert Data (Transforming on the fly)
query_insert = """--sql
    INSERT INTO lakehouse.silver.fact_mobility
    SELECT 
        -- 1. Time Dimensions
        try_strptime(fecha, '%Y%m%d') AS date,
        -- dayofweek(strptime(fecha, '%Y%m%d')) AS day_of_week, -- 0=Sunday, 1=Monday... (DuckDB specific, verify range)
        CAST(periodo AS INTEGER) AS hour,
        
        -- 2. Spatial Dimensions
        zo.zone_id AS origin_zone_id,
        zd.zone_id AS destination_zone_id,
        
        -- 3. Metrics (Spanish Format Handling: 1.200,50 -> 1200.50)
        -- Remove thousands separator (.) then replace decimal comma (,) with dot (.)
        TRY_CAST(REPLACE(REPLACE(viajes, '.', ''), ',', '.') AS DOUBLE) AS trips,
        
        -- 4. Audit
        CURRENT_TIMESTAMP AS processed_at
        
    FROM lakehouse.bronze.mobility_sample_week m
        INNER JOIN lakehouse.silver.dim_zones zo ON TRIM(m.origen) = zo.mitma_code
        INNER JOIN lakehouse.silver.dim_zones zd ON TRIM(m.destino) = zd.mitma_code

    WHERE viajes IS NOT NULL
        AND try_strptime(fecha, '%Y%m%d') IS NOT NULL;
"""

print("-> Processing and Inserting Data (this may take a moment)...")
con.execute(query_insert)
print("‚úÖ Created: lakehouse.silver.fact_mobility")

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.fact_mobility"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table} LIMIT 5").df()

In [None]:
# --- QA: Referential Integrity Check ---
# Are there zones in our trips that don't exist in our dictionary?

print("--- üîç Checking for Orphan Keys ---")

query_orphans = """
    SELECT 
        m.origin_zone_id,
        COUNT(*) as trip_count
    FROM lakehouse.silver.fact_mobility m
    LEFT JOIN lakehouse.silver.dim_zones z 
        ON m.origin_zone_id = z.zone_id
    WHERE z.zone_id IS NULL
    GROUP BY m.origin_zone_id
    ORDER BY trip_count DESC
    LIMIT 10;
"""

orphans = con.execute(query_orphans).fetch_df()

if orphans.empty:
    print("‚úÖ PERFECT: All origin zones in mobility data exist in dim_zones.")
else:
    print(f"‚ö†Ô∏è WARNING: Found {len(orphans)} zone IDs in mobility data that are MISSING from dim_zones.")
    print("Top missing zones (by trip volume):")
    print(orphans)

In [None]:
# 3. Quality Profile
# Check for 100% nulls or weird values. This might take a moment.
print("\n[3] Data Quality Profile (Nulls & Unique Values):")
con.execute(f"SUMMARIZE {target_table}").df()

### Calendar tables
- Festive types

In [None]:
print("\n--- Building Silver Table: dim_festive_types ---")

query_rent = """--sql
    CREATE OR REPLACE TABLE lakehouse.silver.dim_festive_types AS
    SELECT DISTINCT
        CASE 
            WHEN "Tipo de Festivo" ILIKE '%festivo nacional%'
                 OR "Tipo de Festivo" ILIKE '%fiesta nacional%'
            THEN 'NationalFestive'
            ELSE "Tipo de Festivo"
        END AS festive_type,
        CURRENT_TIMESTAMP AS processed_at
    FROM lakehouse.bronze.work_calendars
    WHERE "Tipo de Festivo" IS NOT NULL
        AND ("Tipo de Festivo" ILIKE '%festivo nacional%' OR
            "Tipo de Festivo" ILIKE '%fiesta nacional%');
"""
con.execute(query_rent)
print("‚úÖ Created: lakehouse.silver.dim_festive_types")

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.dim_festive_types"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table}  LIMIT 50").df()

- Bridge table with zone

In [None]:
print("\n--- Building Silver Table: bridge_zones_festives ---")

query_rent = """--sql
    CREATE OR REPLACE TABLE lakehouse.silver.bridge_zones_festives AS
    SELECT z.zone_id,
        CAST(strptime(c."Dia", '%d/%m/%Y') AS DATE) AS festive_date,
        ft.festive_type,
        CURRENT_TIMESTAMP AS processed_at
    FROM lakehouse.silver.dim_zones AS z
    CROSS JOIN lakehouse.bronze.work_calendars AS c
    JOIN lakehouse.silver.dim_festive_types AS ft
    ON ft.festive_type =
        CASE WHEN c."Tipo de Festivo" ILIKE '%festivo nacional%'
                OR c."Tipo de Festivo" ILIKE '%fiesta nacional%'
            THEN 'NationalFestive'
            ELSE c."Tipo de Festivo"
        END;
"""
con.execute(query_rent)
print("‚úÖ Created: lakehouse.silver.bridge_zones_festives")

In [None]:
# Define the table we want to inspect
target_table = "lakehouse.silver.bridge_zones_festives"

print(f"\n--- üîç INSPECTING: {target_table} ---")

# 1. Content Preview
# Check if the columns were separated correctly (look for separate columns, not one big text blob)
# Also verify the 'source_url' is correct
print("\n[1] Content Preview (First 5 rows):")
con.execute(f"SELECT * FROM {target_table} LIMIT 10").df()

In [None]:
# 3. Quality Profile
# Check for 100% nulls or weird values. This might take a moment.
print("\n[3] Data Quality Profile (Nulls & Unique Values):")
con.execute(f"SUMMARIZE {target_table}").df()

#### Information test
- Zone information in 2023

In [None]:
query_2023_strict = """
    SELECT 
        z.zone_id,
        z.ine_code,
        z.zone_name,
        p.year,
        p.population,
        r.income_per_capita AS rent,
        c.latitude, 
        c.longitude
        
    FROM lakehouse.silver.dim_zones z
    
    -- 1. Population: Strictly 2023 (Inner Join, as this is our baseline)
    JOIN lakehouse.silver.metric_population p 
        ON z.zone_id = p.zone_id 
        AND p.year = 2023
        
    -- 2. Rent: Strictly 2023 (Left Join)
    -- If the Rent table has year=2021, this condition fails, and you get NULL (clean exclusion)
    LEFT JOIN lakehouse.silver.metric_ine_rent r 
        ON z.zone_id = r.zone_id 
        AND r.year = 2023
        
    -- 3. Coordinates (Static)
    LEFT JOIN lakehouse.silver.dim_coordinates c 
        ON z.zone_id = c.zone_id
        
    ORDER BY z.zone_id;
"""

print("--- üìä Consolidated 2023 View ---")
con.execute(query_2023_strict).df()

- Mobility information

In [None]:
# --- Analytical Query: Mobility with Context (Names + Festivities) ---

query_mobility_enriched = """
    SELECT 
        -- 1. Date
        f.date,
        
        -- 2. Names (Resolved from IDs)
        -- If ID is NULL (International), we label it 'External'
        COALESCE(zo.zone_name, 'External/International') AS origin_name,
        COALESCE(zd.zone_name, 'External/International') AS destination_name,
        
        -- 3. Is Origin Festive? (True/False)
        CASE 
            WHEN bfo.festive_type IS NOT NULL THEN TRUE 
            ELSE FALSE 
        END AS is_origin_festive,
        
        -- 4. Is Destination Festive? (True/False)
        CASE 
            WHEN bfd.festive_type IS NOT NULL THEN TRUE 
            ELSE FALSE 
        END AS is_dest_festive
        
    FROM lakehouse.silver.fact_mobility f
    
    -- Join Dimensions to get Names
    LEFT JOIN lakehouse.silver.dim_zones zo 
        ON f.origin_zone_id = zo.zone_id
    LEFT JOIN lakehouse.silver.dim_zones zd 
        ON f.destination_zone_id = zd.zone_id
        
    -- Join Bridge for ORIGIN (Match Zone AND Date)
    LEFT JOIN lakehouse.silver.bridge_zones_festives bfo 
        ON f.origin_zone_id = bfo.zone_id 
            AND f.date = bfo.festive_date
        
    -- Join Bridge for DESTINATION (Match Zone AND Date)
    LEFT JOIN lakehouse.silver.bridge_zones_festives bfd 
        ON f.destination_zone_id = bfd.zone_id 
            AND f.date = bfd.festive_date
    
    WHERE is_origin_festive IS True
        AND is_dest_festive IS True

    ORDER BY destination_name desc
        
    -- Limit to avoid printing billions of rows
    LIMIT 10;
"""

print("--- üìä Mobility Enriched with Names & Holiday Status ---")
con.execute(query_mobility_enriched).df()

In [None]:
print("\n--- Current Lakehouse State (Silver Layer) ---")

# We use the internal system function 'duckdb_tables()'
# This function sees EVERYTHING connected to the current session, regardless of the extension used.
query_check = """
    SELECT table_name, schema_name
    FROM duckdb_tables()
    WHERE database_name = 'lakehouse' 
      AND schema_name = 'silver';
"""
df_result = con.execute(query_check).df()
print(df_result)

---
# Gold Layer

#### Business Question 1: Typical Day Mobility
We need to identify distinct Mobility Patterns using Unsupervised Machine Learning. We apply the K-Means algorithm with K = 3. Based on domain knowledge, we expect these groups to represent:
- Labor Days: High peaks at 08:00, 15:00 and 21:00.
- Saturdays: Smoother curve, leisure activity.
- Sundays/Holidays: Flat curve, low activity.

In [None]:
import numpy as np
from sklearn.cluster import KMeans
import pandas as pd

# --- 1. DATA PREPARATION (Fetch & Pivot) ---
# Fetching aggregated hourly data from the Silver Layer (fact_mobility)
print("   -> Fetching data from Silver Layer...")

query_fetch = """
    SELECT 
        date,
        hour,
        SUM(trips) as total_trips
    FROM lakehouse.silver.fact_mobility
    WHERE trips IS NOT NULL
    GROUP BY date, hour
    ORDER BY date, hour;
"""

# Execute query and convert to Pandas DataFrame
df = con.execute(query_fetch).df()

if df.empty:
    print("   ‚ö†Ô∏è Warning: No data found in fact_mobility.")
else:
    # PIVOT: Transform from Long to Wide format (Rows=Days, Columns=Hours 0-23)
    df_pivot = df.pivot(index='date', columns='hour', values='total_trips').fillna(0)

    # Ensure all hour columns (0 to 23) exist
    for h in range(24):
        if h not in df_pivot.columns:
            df_pivot[h] = 0
            
    # Sort columns numerically to ensure correct vector order
    df_pivot = df_pivot.sort_index(axis=1)

    # --- 2. NORMALIZATION ---
    # Normalize row-wise (each day sums to 1). 
    # This allows comparing the "shape" of the curve rather than the total volume.
    print("   -> Normalizing daily profiles...")
    row_sums = df_pivot.sum(axis=1)
    df_normalized = df_pivot.div(row_sums, axis=0).fillna(0)

    # --- 3. CLUSTERING (K-Means) ---
    # We use k=3 to capture basic patterns: 
    # 1. Weekdays (Work), 2. Saturdays (Leisure), 3. Sundays/Holidays (Rest)
    n_clusters = 3
    print(f"   -> Running K-Means Clustering (k={n_clusters})...")

    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    clusters = kmeans.fit_predict(df_normalized)

    # Save results to a temporary DataFrame
    df_results = pd.DataFrame({
        'date': df_normalized.index,
        'cluster_id': clusters
    })

    # --- 4. BUILD GOLD TABLE: dim_mobility_patterns ---
    # Materialize the model results into the Gold Layer
    # --- 5. BUILD GOLD TABLE: typical_day_by_cluster (Using CTE) ---
    print("\n--- üèóÔ∏è Building Gold Table: typical_day_by_cluster ---")

    # 1. Register the clustering results DataFrame as a virtual view
    con.register('view_dim_clusters', df_results)

    # 2. Define query using WITH clause
    query_typical_cte = """
        CREATE OR REPLACE TABLE lakehouse.gold.typical_day_by_cluster AS
        
        WITH dim_mobility_patterns AS (
            -- We select directly from the registered Python DataFrame view
            SELECT 
                date, 
                cluster_id 
            FROM view_dim_clusters
        )
        
        SELECT 
            p.cluster_id,
            f.hour,
            
            -- Metrics
            ROUND(AVG(f.trips), 2) as avg_trips,
            ROUND(AVG(f.trips_km), 2) as avg_trips_km,
            SUM(f.trips) as total_trips_sample,
            
            CURRENT_TIMESTAMP as processed_at
            
        FROM lakehouse.silver.fact_mobility f
        JOIN dim_mobility_patterns p ON f.date = p.date
        GROUP BY p.cluster_id, f.hour
        ORDER BY p.cluster_id, f.hour;
    """

    # 3. Execute
    con.execute(query_typical_cte)

    

    print("‚úÖ Created: lakehouse.gold.typical_day_by_cluster")
    
    # 1. Assign result to variable
    print("\n--- üìä Cluster Interpretation ---")
    
    # We query 'view_dim_clusters' directly because the permanent dim_ table 
    # does not exist in this version of the script.
    analysis_df = con.execute("""
        SELECT 
            cluster_id, 
            COUNT(*) as days_in_cluster,
            -- Calculate day name on the fly since it's not in the dataframe
            MODE(dayname(date)) as typical_day
        FROM view_dim_clusters
        GROUP BY cluster_id
        ORDER BY days_in_cluster DESC
    """).df()

    # 2. Explicitly print using to_string() to ensure full visibility
    if not analysis_df.empty:
        print(analysis_df.to_string(index=False))
    else:
        print("‚ö†Ô∏è The table is empty.")

    # 4. Clean up view
    con.unregister('view_dim_clusters')

    print("\n‚úÖ Gold Layer Analysis Completed.")

To verify the accuracy of the Unsupervised Machine Learning model, we must validate its output against the actual calendar data stored in the Silver Layer. Success criteria:
- Sundays and National Holidays should fall into the same cluster (typically the low-mobility cluster).
- Weekdays should dominate a separate, high-mobility cluster.
- Saturdays typically form their own cluster

In [None]:
import pandas as pd

print("\n--- üïµÔ∏è‚Äç‚ôÇÔ∏è VALIDATION: 3 Clusters vs. Real Calendar ---")

# SAFETY NET: Ensure the view is registered (in case the previous cell unregistered it)
con.register('view_dim_clusters', df_results)

# This query cross-references your clusters with your actual calendar
query_validation = """
WITH national_holidays AS (
    -- 1. Get only unique NATIONAL holidays from the bridge table
    -- Use DISTINCT to avoid duplicating rows per zone
    SELECT DISTINCT festive_date
    FROM lakehouse.silver.bridge_zones_festives
    WHERE festive_type = 'NationalFestive'
),
labeled_data AS (
    SELECT 
        p.cluster_id,
        p.date,
        dayname(p.date) as day_of_week,
        
        -- 2. Create "Ground Truth" label for comparison
        CASE 
            -- Priority 1: Is it a national holiday according to Silver data?
            WHEN h.festive_date IS NOT NULL THEN 'National Holiday'
            
            -- Priority 2: Natural weekend
            WHEN dayname(p.date) = 'Sunday' THEN 'Sunday'
            WHEN dayname(p.date) = 'Saturday' THEN 'Saturday'

            -- Priority 3: Remaining days
            ELSE 'Weekday (Mon-Fri)'
        END as real_category
        
    -- UPDATED: Select from the virtual view instead of the physical gold table
    FROM view_dim_clusters p
    LEFT JOIN national_holidays h ON p.date = h.festive_date
)
-- 3. Count how many days of each type fell into each cluster
SELECT 
    cluster_id,
    real_category,
    COUNT(*) as total_days
FROM labeled_data
GROUP BY cluster_id, real_category
ORDER BY cluster_id, total_days DESC;
"""

# Execute and display the full result
df_val = con.execute(query_validation).df()

if df_val.empty:
    print("‚ö†Ô∏è Something went wrong, the table is empty.")
else:
    print(df_val.to_string(index=False))

con.unregister('view_dim_clusters')

In [None]:
import matplotlib.pyplot as plt

# --- Query the data for plotting ---
print("Querying typical day demand data...")

# 1. RE-REGISTER the view 
# (Necessary because the previous cell unregistered it, but we need it to calculate labels)
con.register('view_dim_clusters', df_results)

# 2. Query: Calculate labels dynamically from the view and join with the Gold table
query = """
WITH cluster_labels AS (
    SELECT 
        cluster_id, 
        -- Calculate day name on the fly since we are using the view
        MODE(dayname(date)) as label
    FROM view_dim_clusters
    GROUP BY cluster_id
)
SELECT 
    t.hour,
    -- Create a readable label for the legend: "Cluster 0 (Sunday)"
    'Cluster ' || t.cluster_id || ' (' || l.label || ')' as pattern_name,
    t.avg_trips
FROM lakehouse.gold.typical_day_by_cluster t
JOIN cluster_labels l ON t.cluster_id = l.cluster_id
ORDER BY t.hour;
"""

demand_df = con.execute(query).df()

if demand_df.empty:
    print("ERROR: 'gold.typical_day_by_cluster' table is empty. No data to plot.")
else:
    # --- Prepare data for plotting ---
    print("Pivoting data for plotting...")
    # Pivot: Index=Hour, Columns=Pattern Name, Values=Average Trips
    pivot_df = demand_df.pivot(index='hour', columns='pattern_name', values='avg_trips')
    
    # --- Create the plot ---
    print("Generating plot with matplotlib...")
    
    # Create a figure and axes
    fig, ax = plt.subplots(figsize=(12, 7))
    
    # Plot the data
    pivot_df.plot(kind='line', ax=ax, marker='o', markersize=4)
    
    ax.set_title('Typical Daily Mobility Patterns (Clustered Profiles)', fontsize=16)
    ax.set_xlabel('Hour of Day', fontsize=12)
    ax.set_ylabel('Average Trips per Hour', fontsize=12)
    
    # Set x-ticks to be clear
    ax.set_xticks(range(0, 24))
    ax.set_xticklabels([f'{h:02d}:00' for h in range(24)], rotation=45, ha='right')
    ax.grid(True, linestyle='--', alpha=0.6)
    
    # Add legend
    ax.legend(title='Identified Pattern')
    
    # Ensure labels are not cut off
    plt.tight_layout()
    
    # Display the plot in the notebook
    plt.show()

# Optional: Clean up again if you want to keep memory clean
con.unregister('view_dim_clusters')

#### Business Question 2: Infrastructure Gaps

In [None]:
# Instalar y cargar la extensi√≥n espacial
con.execute("INSTALL 'spatial';")
con.execute("LOAD 'spatial';")

In [None]:
print("Creating Gold table 'gold_infrastructure_gaps'...")

# Esta consulta responde a la Pregunta de Negocio 2
gold_bq2_query = """--sql
CREATE OR REPLACE TABLE lakehouse.gold.gold_infrastructure_gaps AS

WITH od_pairs AS (
    SELECT
        origin_zone_id,
        destination_zone_id,
        SUM(trips) AS total_actual_trips
    FROM lakehouse.silver.fact_mobility
    GROUP BY 1, 2
),

model_calculation AS (
    SELECT
        m.origin_zone_id AS org_zone_id,
        m.destination_zone_id AS dest_zone_id,
        p.population AS total_population,                     -- P_i
        r.income_per_capita AS rent,              -- E_j
        m.total_actual_trips AS total_trips,                    -- Viajes reales
        
        -- Calcular distancia (d_ij) en KM usando la extensi√≥n espacial
        -- Usamos GREATEST(0.5, ...) para evitar distancias de 0 (ej. viajes en la misma zona)
        -- y as√≠ prevenir errores de divisi√≥n por cero.
        GREATEST(
            0.5, -- Distancia m√≠nima de 0.5 km
            st_distance_spheroid(
                st_point(c_org.longitude, c_org.latitude), 
                st_point(c_dest.longitude, c_dest.latitude)
            ) / 1000 -- Convertir metros (salida de st_distance) a KM
        ) AS geographic_distance_km             -- d_ij
            
    FROM od_pairs AS m
    JOIN lakehouse.silver.metric_population AS p ON m.origin_zone_id = p.zone_id
    JOIN lakehouse.silver.metric_ine_rent AS r ON m.destination_zone_id = r.zone_id
    JOIN lakehouse.silver.dim_coordinates as c_org ON m.origin_zone_id = c_org.zone_id
    JOIN lakehouse.silver.dim_coordinates as c_dest ON m.destination_zone_id = c_dest.zone_id

    
    -- Filtramos datos malos para evitar errores en el modelo
    WHERE p.population > 0 
      AND r.income_per_capita > 0
      AND c_org.latitude IS NOT NULL
      AND c_dest.latitude IS NOT NULL
      AND m.origin_zone_id != m.destination_zone_id -- Evitar viajes dentro de la misma zona
)

-- Calcular el modelo final y el mismatch
SELECT
    org_zone_id,
    dest_zone_id,
    total_trips,
    total_population,
    rent,
    geographic_distance_km,
    
    -- Calcular Modelo de Gravedad T_ij = k * (P_i * E_j) / (d_ij^2)
    -- Asumimos k=1 para este PoC del Sprint 1
    (1.0 * (CAST(total_population AS DOUBLE) * CAST(rent AS DOUBLE))) / 
    (geographic_distance_km * geographic_distance_km) AS estimated_potential_trips, -- T_ij
        
    -- Calcular Mismatch Ratio 
    -- (Viajes Reales / Viajes Estimados)
    total_trips / NULLIF(estimated_potential_trips, 0) AS mismatch_ratio

FROM model_calculation;
"""

con.execute(gold_bq2_query)
print("‚úì Tabla 'gold.gold_infrastructure_gaps' creada.")

# --- Verificaci√≥n ---
print("\n--- Verificaci√≥n BQ2: Top 10 Zonas con 'Mismatch' (potencialmente peor servidas) ---")
verification_bq2 = """--sql
    SELECT 
        org_zone_id,
        dest_zone_id,
        total_trips,
        estimated_potential_trips,
        geographic_distance_km,
        mismatch_ratio
    FROM lakehouse.gold.gold_infrastructure_gaps
    WHERE total_trips > 10 -- Filtrar pares con muy pocos viajes
    AND org_zone_id != dest_zone_id -- Evitar viajes dentro de la misma zona
    ORDER BY mismatch_ratio ASC -- Ordenamos por ratio m√°s bajo (peor servicio)
    LIMIT 10;
"""
display(con.execute(verification_bq2).df())

In [None]:
print("\n--- Verificaci√≥n BQ2: Top 10 Zonas con 'Mismatch' (potencialmente peor servidas) ---")
verification_bq2 = """--sql
    SELECT 
        org_zone_id,
        dest_zone_id,
        total_trips,
        estimated_potential_trips,
        geographic_distance_km,
        mismatch_ratio
    FROM lakehouse.gold.gold_infrastructure_gaps
    WHERE total_trips > 10 -- Filtrar pares con muy pocos viajes
    ORDER BY mismatch_ratio ASC -- Ordenamos por ratio m√°s bajo (peor servicio)
    LIMIT 15;
"""
display(con.execute(verification_bq2).df())

# Close connection

In [None]:
con.close()