In [None]:
%pip install duckdb pandas numpy pyspainmobility requests

<h1 align="center"><b>Building a 3-Tier Data Lakehouse for Mobility Analysis in Spain</b></h1>
<h3 align="center"><b style="color:gray">Gold Layer</b></h3>
<h4 align="right">Joan Fernández Navarro & Borja Albert Gramaje</h4>
<h3><b>Table of Contents</b></h3>
<ul style = "list-style-type: none; line-height: 0.5em;">
    <li><a href="#holidays"><h5>1. Spanish Holidays (Open Holidyas API)</h5></a></li>
    <li><a href="#mitma"><h5>2. Spanish Ministry of Transport, Mobility and Urban Agenda (MITMA) Open Data</h5></a></li>
    <ul style = "list-style-type: none; line-height: 1em;">
        <li><a href="#od"><h5>1.1. Origin-destination (OD) trip matrices</h5></a></li>
        <li><a href="#people"><h5>1.2. People by day</h5></a></li>
        <li><a href="#overnight"><h5>1.3. Overnight stays</h5></a></li>
        <li><a href="#zones"><h5>1.4. Zones</h5></a></li>
        <li><a href="#zones"><h5>1.5. Centroids</h5></a></li>
    </ul>
    <li><a href="#ine"><h5>2. Spanish National Statistics Institute (INE)</h5></a></li>
    <ul style = "list-style-type: none; line-height: 1em;">
        <li><a href="#population"><h5>2.1. Population by municipio (Padrón)</h5></a></li>
        <li><a href="#income"><h5>2.2. Income by distrito</h5></a></li>
    </ul>
    </ul>
</ul>

In [None]:
import os
import duckdb
import requests
import pandas as pd
from pyspainmobility import Mobility, Zones

BASE_PATH = f"{os.getcwd()}/../../raw"
LAKE_LAYER = "bronze"

con = duckdb.connect("./../../mobility.db")

def SQL(q):
    """Run SQL (printed for clarity) and return a DataFrame."""
    return con.execute(q).fetchdf()

print("DuckDB version:", con.sql("SELECT version();").fetchone()[0])

<h2 id="holidays"><b>1. Spanish Holidays (Open Holidyas API)</b></h2>

In [None]:
SQL("""
    INSTALL httpfs;
    LOAD httpfs;
""")

SQL("""
    -- La tabla temporal es visible solo para la sesión que la creó.
    CREATE OR REPLACE TEMP TABLE spanish_holidays AS
    WITH parsed_holidays AS (
        SELECT 
            json_extract(holiday, '$.startDate') AS date_str,
            CAST(json_extract(holiday, '$.nationwide') AS BOOLEAN) AS nationwide
        FROM read_json(
            'https://openholidaysapi.org/PublicHolidays?countryIsoCode=ES&languageIsoCode=ES&validFrom=2023-01-01&validTo=2023-12-31',
            format='array'
        ) AS t(holiday)
    )
    SELECT DISTINCT
        CAST(date_str AS DATE) AS date
    FROM parsed_holidays
    WHERE nationwide = TRUE;
""")

<h2 id="mitma"><b>1. Spanish Ministry of Transport, Mobility and Urban Agenda (MITMA) Open Data</b></h2>

<h2 id="od"><b>1.1. Origin-destination (OD) trip matrices</b></h2>

```sql
-- Trip Matrices - distristos
CREATE TABLE  silver_mitma_od_distritcs (
    fecha DATE,
    hora SMALLINT,
    origen TEXT,
    destino TEXT,
    distancia TEXT,
    actividad_origen TEXT,
    actividad_destino TEXT,
    residencia TEXT,
    renta TEXT,
    edad TEXT,
    sexo TEXT,
    viajes DOUBLE,
    viajes_km DOUBLE,
    estudio_destino_posible BOOLEAN,
    estudio_origen_posible BOOLEAN,
    is_weekend BOOLEAN,
    is_holiday BOOLEAN
);


-- Trip Matrices - municipalities
CREATE TABLE  silver_mitma_od_municipalities (
    fecha DATE,
    hora SMALLINT,
    origen TEXT,
    destino TEXT,
    distancia TEXT,
    actividad_origen TEXT,
    actividad_destino TEXT,
    residencia TEXT,
    renta TEXT,
    edad TEXT,
    sexo TEXT,
    viajes DOUBLE,
    viajes_km DOUBLE,
    estudio_destino_posible BOOLEAN,
    estudio_origen_posible BOOLEAN,
    is_weekend BOOLEAN,
    is_holiday BOOLEAN
);

-- Trip Matrices - GAU
CREATE TABLE  silver_mitma_od_gau (
    fecha DATE,
    hora SMALLINT,
    origen TEXT,
    destino TEXT,
    distancia TEXT,
    actividad_origen TEXT,
    actividad_destino TEXT,
    residencia TEXT,
    renta TEXT,
    edad TEXT,
    sexo TEXT,
    viajes DOUBLE,
    viajes_km DOUBLE,
    estudio_destino_posible BOOLEAN,
    estudio_origen_posible BOOLEAN,
    is_weekend BOOLEAN,
    is_holiday BOOLEAN
);
```

![Descripción de la imagen](./schemas/silver_od.png)

In [None]:
def load_od_matrices(zone_type="districts"):
    """
    Transforms MITMA OD matrices from the bronze layer into the silver layer,
    applying proper data type casting and adding derived fields (weekend, holiday).

    Parameters
    ----------
    type : str
        Zone level (“districts”, “municipalities”, “gau”, etc.)
    """

    dataset = "od"
    source_table = f"bronze_mitma_{dataset}_{zone_type}"
    target_table = f"{LAKE_LAYER}_mitma_{dataset}_{zone_type}"

    # -------------------------------------------------------
    # 1. Create transformed table in the silver layer
    # -------------------------------------------------------
    SQL(f"""
        CREATE OR REPLACE TABLE {target_table} AS
        WITH parsed_data AS (
            SELECT
                -- Convert fecha (YYYYMMDD) to DATE
                strptime(CAST(fecha AS VARCHAR), '%Y%m%d')::DATE AS fecha,

                -- Convert period to hour
                CAST(periodo AS SMALLINT) AS hora,

                -- Direct fields
                origen,
                destino,
                distancia,
                actividad_origen,
                actividad_destino,
                residencia,
                renta,
                edad,
                sexo,

                -- Numeric conversions
                CAST(viajes AS DOUBLE)     AS viajes,
                CAST(viajes_km AS DOUBLE)  AS viajes_km,

                -- Boolean conversions
                CASE 
                    WHEN estudio_destino_posible IN ('si', 'SI', 'sí') THEN TRUE 
                    WHEN estudio_destino_posible IN ('no', 'NO') THEN FALSE 
                    ELSE NULL
                END AS estudio_destino_posible,

                CASE 
                    WHEN estudio_origen_posible IN ('si', 'SI', 'sí') THEN TRUE 
                    WHEN estudio_origen_posible IN ('no', 'NO') THEN FALSE 
                    ELSE NULL
                END AS estudio_origen_posible
            FROM {source_table}
        ),
        enriched_data AS (
            SELECT
                *,
                -- Weekend: Saturday (6) and Sunday (7)
                CASE 
                    WHEN dayofweek(fecha) IN (6, 7) THEN TRUE
                    ELSE FALSE
                END AS is_weekend,

                -- Holiday as per Spanish holidays table
                CASE
                    WHEN fecha IN (SELECT date FROM spanish_holidays) THEN TRUE
                    ELSE FALSE
                END AS is_holiday
            FROM parsed_data
        )
        SELECT * FROM enriched_data;
    """)

    print(f"OD matrices successfully transformed into table: {target_table}")

load_od_matrices(zone_type="distritos")
load_od_matrices(zone_type="municipios")
load_od_matrices(zone_type="gau")

SQL(f""" DROP TABLE IF EXISTS spanish_holidays; """)

In [None]:
SQL(f"""
    (SELECT '{LAKE_LAYER}_mitma_od_distritos' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_od_distritos)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_od_municipios' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_od_municipios)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_od_gau' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_od_gau)
    ORDER BY count(*) DESC;
""")

In [None]:
SQL(f"""
    SELECT *
    FROM {LAKE_LAYER}_mitma_od_distritos 
    LIMIT 10;
""")

In [None]:
SQL(f"""
    SELECT is_weekend, count(*) as total
    FROM {LAKE_LAYER}_mitma_od_distritos 
    GROUP BY is_weekend;
""")

<h2 id="people"><b>1.2. People by day</b></h2>

```sql
-- Distritos
CREATE TABLE silver_mitma_people_day_distritos (
    fecha DATE,
    zona_pernoctacion TEXT,
    edad TEXT,
    sexo TEXT,
    numero_viajes TEXT,
    personas DOUBLE
);

-- Municipios
CREATE TABLE silver_mitma_people_day_munipios (
    fecha DATE,
    zona_pernoctacion TEXT,
    edad TEXT,
    sexo TEXT,
    numero_viajes TEXT,
    personas DOUBLE
);

-- GAU
CREATE TABLE silver_mitma_people_day_gau (
    fecha DATE,
    zona_pernoctacion TEXT,
    edad TEXT,
    sexo TEXT,
    numero_viajes TEXT,
    personas DOUBLE
);
```

![Descripción de la imagen](./schemas/silver_people_day.png)

In [None]:
def load_people_day(zone_type="districts", start_date='2022-03-01', end_date='2022-03-03'):
    """ Load MITMA People Day data from bronze to silver layer, transforming data types as needed. """
    dataset = 'people_day'
    table_name = f'{LAKE_LAYER}_mitma_{dataset}_{zone_type}'

    SQL(f"DROP TABLE IF EXISTS {table_name};")
    SQL(f"""
        CREATE TABLE IF NOT EXISTS {table_name}(
            fecha DATE,
            zona_pernoctacion TEXT,
            edad TEXT,
            sexo TEXT,
            numero_viajes TEXT,
            personas DOUBLE,
        );
    """)

    SQL(f"""
        INSERT INTO {table_name}
        SELECT
            strptime(CAST(fecha AS VARCHAR), '%Y%m%d')::DATE AS fecha,
            zona_pernoctacion,
            edad,
            sexo,
            numero_viajes,
            CAST(personas AS DOUBLE) AS personas,
        FROM bronze_mitma_{dataset}_{zone_type};
    """)


load_people_day(zone_type="distritos")
load_people_day(zone_type="municipios")
load_people_day(zone_type="gau")

In [None]:
SQL(f"""
    (SELECT '{LAKE_LAYER}_mitma_people_day_distritos' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_people_day_distritos)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_people_day_municipios' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_people_day_municipios)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_people_day_gau' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_people_day_gau);
""")

In [None]:
SQL(f"""
    SELECT * 
    FROM {LAKE_LAYER}_mitma_people_day_distritos 
    LIMIT 5;
""")

<h2 id="overnight"><b>1.3. Overnight stays</b></h2>

```sql
-- Distritos
CREATE TABLE IF NOT EXISTS bronze_mitma_overnight_stay_districts (
  fecha TEXT,
  zona_residencia TEXT,
  zona_pernoctacion TEXT,
  personas TEXT,
  -- Columnas extras añadidas para auditoria. 
  loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  source_file TEXT
);

-- Municipios
CREATE TABLE IF NOT EXISTS bronze_mitma_overnight_stay_municipalities (
  fecha TEXT,
  zona_residencia TEXT,
  zona_pernoctacion TEXT,
  personas TEXT,
  -- Columnas extras añadidas para auditoria. 
  loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  source_file TEXT
);

-- GAU
CREATE TABLE IF NOT EXISTS bronze_mitma_overnight_stay_gau (
  fecha TEXT,
  zona_residencia TEXT,
  zona_pernoctacion TEXT,
  personas TEXT,
  -- Columnas extras añadidas para auditoria. 
  loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  source_file TEXT
);
```

![Descripción de la imagen](./schemas/bronze_overnight.png)

In [None]:
def load_overnight_stay(zone_type="districts", start_date="2022-03-01", end_date="2022-03-03"):
    """
    Downloads MITMA overnight-stay data (only if missing) and loads it into DuckDB.

    Parameters
    ----------
    zone_type : str
        Zone level (“districts”, “municipalities”, etc.).
    start_date : str
        Start date (YYYY-MM-DD).
    end_date : str
        End date (YYYY-MM-DD).
    """

    dataset = "overnight_stay"
    dataset_path = f"{BASE_PATH}/MITMA/{dataset}_{zone_type}"
    table_name = f"{LAKE_LAYER}_mitma_{dataset}_{zone_type}"

    # -------------------------------------------------------
    # 1. Ensure directory exists
    # -------------------------------------------------------
    os.makedirs(dataset_path, exist_ok=True)

    # -------------------------------------------------------
    # 2. Download data only if directory is empty
    # -------------------------------------------------------
    if len(os.listdir(str(dataset_path))) == 0:
        print(f"Downloading MITMA overnight-stay data for zone type: {zone_type}...")
        mobility = Mobility(
            version=2,
            zones=zone_type,
            start_date=start_date,
            end_date=end_date,
            output_directory=str(dataset_path),
        )
        mobility.get_overnight_stays_data()
    else:
        print("Files already exist. Skipping download.")

    # -------------------------------------------------------
    # 3. Create table in DuckDB
    # -------------------------------------------------------
    SQL(f"DROP TABLE IF EXISTS {table_name};")

    SQL(f"""
        CREATE TABLE {table_name}(
            fecha TEXT,
            zona_residencia TEXT,
            zona_pernoctacion TEXT,
            personas TEXT,
            loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            source_file TEXT
        );
    """)

    # -------------------------------------------------------
    # 4. Insert CSV data
    # -------------------------------------------------------
    SQL(f"""
        INSERT INTO {table_name}
        SELECT
            fecha,
            zona_residencia,
            zona_pernoctacion,
            personas,
            CURRENT_TIMESTAMP AS loaded_at,
            filename AS source_file
        FROM read_csv(
            '{dataset_path}/*.csv.gz',
            filename = true,
            all_varchar = true
        );
    """)

    print(f"Overnight-stay data successfully loaded into table: {table_name}")


load_overnight_stay(zone_type="distritos")
load_overnight_stay(zone_type="municipios")
load_overnight_stay(zone_type="gau")

In [None]:
SQL(f"""
    (SELECT '{LAKE_LAYER}_mitma_overnight_stay_distritos' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_overnight_stay_distritos)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_overnight_stay_municipios' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_overnight_stay_municipios)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_overnight_stay_gau' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_overnight_stay_gau);
""")

In [None]:
SQL(f"""
    SELECT * 
    FROM {LAKE_LAYER}_mitma_overnight_stay_distritos 
    LIMIT 5;
""")

<h2 id="zones"><b>1.4. Zones</b></h2>

```sql
-- Distritos
CREATE TABLE IF NOT EXISTS bronze_mitma_districts (
  id TEXT,
  name TEXT,
  population TEXT,
  geometry TEXT,
  -- Columnas extras añadidas para auditoria. 
  loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  source_file TEXT
);

-- Municipios
CREATE TABLE IF NOT EXISTS bronze_mitma_municipalities (
  id TEXT,
  name TEXT,
  population TEXT,
  geometry TEXT,
  -- Columnas extras añadidas para auditoria. 
  loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  source_file TEXT
);

-- GAU
CREATE TABLE IF NOT EXISTS bronze_mitma_gau (
  id TEXT,
  name TEXT,
  population TEXT,
  geometry TEXT,
  -- Columnas extras añadidas para auditoria. 
  loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  source_file TEXT
);

```

![Descripción de la imagen](./schemas/bronze_zones.png)

In [None]:
def load_zones(zone_type="districts"):
    """
    Downloads MITMA zone definitions (only if missing), stores them as compressed CSV,
    and loads them into DuckDB.

    Parameters
    ----------
    zone_type : str
        Zone level (“districts”, “municipalities”, “gau”, etc.).
    """

    dataset = "zones"
    dataset_path = f"{BASE_PATH}/MITMA/{zone_type}"
    table_name = f"{LAKE_LAYER}_mitma_{zone_type}"

    # -------------------------------------------------------
    # 1. Ensure directory exists
    # -------------------------------------------------------
    os.makedirs(dataset_path, exist_ok=True)

    csv_path = dataset_path + "/zones.csv.gz"

    # -------------------------------------------------------
    # 2. Download and save zones CSV if not present
    # -------------------------------------------------------
    if not os.path.isfile(dataset_path):
        print(f"Downloading MITMA zone definitions for zone type: {zone_type}...")

        zones = Zones(
            version=2,
            zones=zone_type,
            output_directory=str(dataset_path),
        )

        df = zones.get_zone_geodataframe()

        if df is None:
            raise ValueError("Zones.get_zone_geodataframe() returned None")

        # Save geodataframe as compressed CSV
        df.to_csv(csv_path, index=True, compression="gzip")
        print("Zones saved:", csv_path)
    
    else:
        print("Zone definition file already exists. Skipping download.")

    # -------------------------------------------------------
    # 3. Create DuckDB table
    # -------------------------------------------------------
    SQL(f"DROP TABLE IF EXISTS {table_name};")

    SQL(f"""
        CREATE TABLE {table_name}(
            id TEXT,
            name TEXT,
            population TEXT,
            geometry TEXT,   -- stored as plain text in BRONZE layer
            loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            source_file TEXT
        );
    """)

    # -------------------------------------------------------
    # 4. Load CSV into DuckDB
    # -------------------------------------------------------
    SQL(f"""
        INSERT INTO {table_name}
        SELECT
            id,
            name,
            population,
            geometry,
            CURRENT_TIMESTAMP AS loaded_at,
            filename AS source_file
        FROM read_csv(
            '{dataset_path}/*.csv.gz',
            filename = true,
            all_varchar = true
        );
    """)

    print(f"Zone data successfully loaded into table: {table_name}")



load_zones(zone_type="distritos")
load_zones(zone_type="municipios")
load_zones(zone_type="gau")

In [None]:
SQL(f"""
    (SELECT '{LAKE_LAYER}_mitma_distritos' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_distritos)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_municipios' as name, count(*) 
    FROM {LAKE_LAYER}_mitma_municipios)
        UNION
    (SELECT '{LAKE_LAYER}_mitma_gau' as name, count(*)
    FROM {LAKE_LAYER}_mitma_gau);
""")

In [None]:
SQL(f"""
    SELECT * 
    FROM {LAKE_LAYER}_mitma_od_distritos 
    LIMIT 5;
""")

<h2 id="ine"><b>2. Spanish National Statistics Institute (INE)</b></h2>