In [5]:
%pip install duckdb pandas python-dotenv requests boto3

Note: you may need to restart the kernel to use updated packages.


In [6]:
import duckdb
import pandas as pd
import os
import requests
import urllib.request
import re
from dotenv import load_dotenv
from datetime import datetime, timedelta

LAKE_LAYER = 'bronze'

# Load environment variables from .env file
load_dotenv('../../.env', override=True) 

True

In [7]:
RUSTFS_HOST = os.getenv('RUSTFS_HOST', 'localhost')
RUSTFS_PORT = os.getenv('RUSTFS_PORT', '8080')
RUSTFS_USER = os.getenv('RUSTFS_USER', 'admin')
RUSTFS_PASSWORD = os.getenv('RUSTFS_PASSWORD', 'password')
RUSTFS_BUCKET = os.getenv('RUSTFS_BUCKET', 'mitma')
RUSTFS_SSL = os.getenv('RUSTFS_SSL', 'false')

# Postgres Configuration
POSTGRES_USER = os.getenv('POSTGRES_USER', 'postgres')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'password')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
POSTGRES_DB = os.getenv('POSTGRES_DB', 'muceim')

# Construct S3 Endpoint with protocol
S3_ENDPOINT = f"{RUSTFS_HOST}:{RUSTFS_PORT}"


In [8]:
print(f"Connecting to RustFS at {S3_ENDPOINT}...")

# Initialize DuckDB Connection
con = duckdb.connect()

# Install and Load extensions
con.execute("INSTALL httpfs;")
con.execute("LOAD httpfs;")
con.execute("INSTALL postgres;")
con.execute("LOAD postgres;")
con.execute("INSTALL ducklake;")
con.execute("LOAD ducklake;")

# Configure S3 Secrets for RustFS
con.execute(f"SET s3_endpoint='{S3_ENDPOINT}';")
con.execute(f"SET s3_access_key_id='{RUSTFS_USER}';")
con.execute(f"SET s3_secret_access_key='{RUSTFS_PASSWORD}';")
con.execute(f"SET s3_use_ssl={RUSTFS_SSL};")
con.execute("SET s3_url_style='path';")
con.execute("SET preserve_insertion_order=false;")
con.execute("SET max_temp_directory_size='40GiB';")


# Attach DuckLake with Postgres Catalog
postgres_connection_string = f"dbname={POSTGRES_DB} host={POSTGRES_HOST} user={POSTGRES_USER} password={POSTGRES_PASSWORD} port={POSTGRES_PORT}"
attach_query = f"ATTACH 'ducklake:postgres:{postgres_connection_string}' AS ducklake (DATA_PATH 's3://{RUSTFS_BUCKET}/');"

print(f"Attaching DuckLake with query: {attach_query}")
con.execute(attach_query)
con.execute("USE ducklake;")

print("DuckLake configured with Postgres catalog and RustFS storage.")

Connecting to RustFS at localhost:9000...
Attaching DuckLake with query: ATTACH 'ducklake:postgres:dbname=mitma host=localhost user=admin password=muceim-duckduck.2025! port=30432' AS ducklake (DATA_PATH 's3://mitma/');
DuckLake configured with Postgres catalog and RustFS storage.


In [9]:
import boto3
from botocore.client import Config

s3 = boto3.resource('s3',
    endpoint_url=f'http://{RUSTFS_HOST}:{RUSTFS_PORT}',
    aws_access_key_id=RUSTFS_USER,
    aws_secret_access_key=RUSTFS_PASSWORD,
    config=Config(signature_version='s3v4'),
    verify=False
)

if (not s3.Bucket(RUSTFS_BUCKET).creation_date):
    try:
        s3.create_bucket(Bucket=RUSTFS_BUCKET)
        print(f"Bucket '{RUSTFS_BUCKET}' created.")
    except Exception as e:
        print(f"Error creating bucket: {e}")
    

In [10]:
import boto3
from botocore.client import Config

def cleanup_mitma_system():
    """
    Cleans up the MITMA system by resetting the metadata 
    in PostgreSQL and clearing the storage in RustFS.
    """
    print("\n[1/2] Resetting Metadata in PostgreSQL...")
    try:
        SQL("DROP SCHEMA public CASCADE;")
        SQL("CREATE SCHEMA public;")
        SQL("GRANT ALL ON SCHEMA public TO postgres;")
        SQL("GRANT ALL ON SCHEMA public TO public;")
        print("  ‚úì Schema 'public' reseted.")
    except Exception as e:
        print(f"  ‚ùå Error resetting Postgres: {e}")
        return # Paramos si falla la DB

    print("\n[2/2] Resetting Storage in RustFS...")
    try:
        bucket = s3.Bucket(RUSTFS_BUCKET)

        if not bucket.creation_date:
            return
        
        # Borramos todo el contenido
        bucket.objects.all().delete()
        # Borramos el bucket
        bucket.delete()
        #recreamos el bucket
        s3.create_bucket(Bucket=RUSTFS_BUCKET)
        print(f"  ‚úì Bucket '{RUSTFS_BUCKET}' recreated.")
    except Exception as e:
        print(f"  ‚ùå Error cleaning RustFS: {e}")

    print("\n‚úÖ MITMA SYSTEM RESETED.")


# Uncomment to run cleanup:
# cleanup_mitma_system()

In [11]:
def SQL(query):
    """Execute a SQL query and return the result as a Pandas DataFrame."""
    try:
        return con.execute(query).fetchdf()
    except Exception as e:
        print(f"Error executing query: {e}")
        return None

In [12]:
def get_mitma_urls(dataset, zone_type, start_date, end_date):
    """
    Fetches MITMA URLs from RSS feed and filters by dataset, zone type, and date range.
    """
    rss_url = "https://movilidad-opendata.mitma.es/RSS.xml"
    
    # Simple mapping: dataset -> (url_path, file_prefix)
    dataset_map = {
        "od": ("viajes", "Viajes"),
        "people_day": ("personas", "Personas_dia"),
        "overnight_stay": ("pernoctaciones", "Pernoctaciones")
    }
    
    if zone_type not in ["distritos", "municipios", "gau"]:
        raise ValueError(f"Invalid zone_type: {zone_type}. Must be 'distritos', 'municipios', or 'gau'.")
    if dataset not in dataset_map:
        raise ValueError(f"Invalid dataset: {dataset}. Must be one of {list(dataset_map.keys())}.")
    
    dataset_path, file_prefix = dataset_map[dataset]
    
    # Construct file pattern: {Prefix}_{zone} (GAU is uppercase in files)
    zone_suffix = "GAU" if zone_type == "gau" else zone_type
    file_pattern = f"{file_prefix}_{zone_suffix}"
    
    # Build dynamic regex pattern
    # Pattern: https://.../por-{zone}/viajes/ficheros-diarios/YYYY-MM/YYYYMMDD_{FilePattern}.csv.gz
    pattern = rf'(https?://[^\s"<>]*/estudios_basicos/por-{zone_type}/{dataset_path}/ficheros-diarios/\d{{4}}-\d{{2}}/(\d{{8}})_{file_pattern}\.csv\.gz)'
        
    # Fetch RSS with User-Agent to avoid 403
    req = urllib.request.Request(rss_url, headers={"User-Agent": "MITMA-DuckLake-Loader"})
    txt = urllib.request.urlopen(req).read().decode("utf-8", "ignore")
    
    # Find all matches (case-insensitive for por-gau vs por-GAU)
    matches = re.findall(pattern, txt, re.I)
    
    # Remove duplicates using set (RSS often has duplicate entries)
    unique_matches = list(set(matches))
    
    # Convert date range to comparable format
    start_dt = datetime.strptime(start_date, "%Y-%m-%d")
    end_dt = datetime.strptime(end_date, "%Y-%m-%d")
    
    # Filter by date range and sort
    filtered_urls = []
    for url, date_str in unique_matches:
        file_date = datetime.strptime(date_str, "%Y%m%d")
        if start_dt <= file_date <= end_dt:
            filtered_urls.append((url, date_str))
    
    # Sort by date ascending
    filtered_urls.sort(key=lambda x: x[1])
    
    # Extract just the URLs
    urls = [url for url, _ in filtered_urls]
    
    print(f"Found {len(urls)} URLs for {dataset} {zone_type} from {start_date} to {end_date}")
    
    if not urls:
        print(f"WARNING: No URLs found. Check if data exists for the requested date range.")
    
    return urls

In [13]:
def create_and_merge_table(table_name, urls):
    """
    Generic function to create table and merge data for any MITMA dataset.
    Uses ALL columns from the CSV as merge keys (bronze layer pattern).

    Parameters:
    - dataset: 'od', 'people_day', 'overnight_stay'
    - zone_type: 'distritos', 'municipios', 'gau'
    - urls: list of URLs to load
    """
    
    table_name = f'{LAKE_LAYER}_{table_name}'
    
    # Convert list of URLs to a string representation for DuckDB list
    url_list_str = "[" + ", ".join([f"'{u}'" for u in urls]) + "]"

    # Step 1: Create table if not exists (using first file for schema inference)
    SQL(f"""
        CREATE TABLE IF NOT EXISTS {table_name} AS
        SELECT 
            * EXCLUDE (filename),
            CURRENT_TIMESTAMP AS loaded_at,
            filename AS source_file
        FROM read_csv(
            {url_list_str},
            filename = true,
            all_varchar = true
        )
        LIMIT 0;
    """)
    
    # Get column names from the table (excluding audit columns)
    columns_df = SQL(f"""
        SELECT column_name 
        FROM information_schema.columns 
        WHERE table_name = '{table_name}'
        AND column_name NOT IN ('loaded_at', 'source_file')
        ORDER BY ordinal_position;
    """)
    
    merge_keys = columns_df['column_name'].tolist()
    
    # Build ON clause from all CSV columns
    on_clause = " AND ".join([f"target.{key} = source.{key}" for key in merge_keys])
    
    # Step 3: MERGE for idempotent incremental loads
    SQL(f"""
        MERGE INTO {table_name} AS target
        USING (
            SELECT 
                * EXCLUDE (filename),
                CURRENT_TIMESTAMP AS loaded_at,
                filename AS source_file
            FROM read_csv(
                {url_list_str},
                filename = true,
                all_varchar = true
            )
        ) AS source
        ON {on_clause}
        WHEN MATCHED THEN
            UPDATE SET *
        WHEN NOT MATCHED THEN
            INSERT *;
    """)
    
    print(f"Table {table_name} merged successfully with {len(merge_keys)} key columns.")

In [14]:
def create_and_merge_table_from_json(table_name, url, key_columns=None):
    """
    Generic function to create table and merge data from JSON API endpoint using DuckDB's read_json.
    
    Parameters:
    - table_name: Name of the table to create/merge into
    - url: URL that returns JSON data (array of objects)
    - key_columns: List of column names to use as merge keys. If None, uses all columns.
    """
    
    table_name = f'{LAKE_LAYER}_{table_name}'
    
    print(f"Fetching JSON data from {url}...")
    
    # Step 1: Create table if not exists using DuckDB's read_json
    SQL(f"""
        CREATE TABLE IF NOT EXISTS {table_name} AS
        SELECT 
            *,
            CURRENT_TIMESTAMP AS loaded_at,
            '{url}' AS source_url
        FROM read_json('{url}', format='array')
        LIMIT 0;
    """)
    
    # Step 2: Get column names from the table (excluding audit columns)
    columns_df = SQL(f"""
        SELECT column_name 
        FROM information_schema.columns 
        WHERE table_name = '{table_name}'
        AND column_name NOT IN ('loaded_at', 'source_url')
        ORDER BY ordinal_position;
    """)
    
    data_columns = columns_df['column_name'].tolist()
    
    # Step 3: Determine merge keys
    if key_columns is None:
        merge_keys = data_columns
    else:
        merge_keys = key_columns
        # Validate that key columns exist
        missing_keys = [k for k in merge_keys if k not in data_columns]
        if missing_keys:
            raise ValueError(f"Key columns {missing_keys} not found in data. Available columns: {data_columns}")
    
    print(f"Using merge keys: {merge_keys}")
    
    # Step 4: Build ON clause
    on_clause = " AND ".join([f'target."{key}" = source."{key}"' for key in merge_keys])
    
    # Step 5: MERGE for idempotent incremental loads
    merge_query = f"""
        MERGE INTO {table_name} AS target
        USING (
            SELECT 
                *,
                CURRENT_TIMESTAMP AS loaded_at,
                '{url}' AS source_url
            FROM read_json('{url}', format='array')
        ) AS source
        ON {on_clause}
        WHEN MATCHED THEN
            UPDATE SET *
        WHEN NOT MATCHED THEN
            INSERT *;
    """
    
    SQL(merge_query)
    
    # Get row count
    count_result = SQL(f"SELECT COUNT(*) as count FROM {table_name}")
    row_count = count_result.iloc[0, 0]
    
    print(f"Table {table_name} merged successfully with {len(merge_keys)} key columns. Total rows: {row_count}")

In [15]:
def load_od_matrices(type="districts", start_date='2022-03-01', end_date='2022-03-03'):
    """
    Load OD matrices for the specified type and date range.
    """
    table_name = 'mitma_od'
    urls = get_mitma_urls(dataset, type, start_date, end_date)
    create_and_merge_table(table_name, urls)

In [16]:
def load_people_day(type="districts", start_date='2022-03-01', end_date='2022-03-03'):
    """
    Load people_day data for a specific type and date range.
    """
    table_name = 'mitma_people_day'
    urls = get_mitma_urls(dataset, type, start_date, end_date)
    create_and_merge_table(table_name, urls)

In [17]:
def load_overnight_stay(type="districts", start_date='2022-03-01', end_date='2022-03-03'):
    """
    Load overnight stay data for a specific type and date range.
    """
    table_name = 'mitma_overnight_stay'
    urls = get_mitma_urls(dataset, type, start_date, end_date)
    create_and_merge_table(table_name, urls)

In [18]:
# Verify connection
print("DuckDB Version:", SQL("SELECT version();").iloc[0,0])

# Check tables
print("Tables in DuckLake:")
print(SQL("SHOW TABLES;"))

DuckDB Version: v1.4.2
Tables in DuckLake:
                                      name
0                      bronze_ine_empresas
1            bronze_ine_empresas_municipio
2                    bronze_ine_municipios
3                   bronze_mitma_distritos
4                         bronze_mitma_gau
5               bronze_mitma_ine_relations
6                  bronze_mitma_municipios
7                bronze_mitma_od_distritos
8                      bronze_mitma_od_gau
9               bronze_mitma_od_municipios
10   bronze_mitma_overnight_stay_distritos
11         bronze_mitma_overnight_stay_gau
12  bronze_mitma_overnight_stay_municipios
13       bronze_mitma_people_day_distritos
14             bronze_mitma_people_day_gau
15      bronze_mitma_people_day_municipios
16                          ine_municipios


In [19]:
SQL("""
    SELECT * 
    FROM bronze_mitma_od_distritos 
    USING SAMPLE 0.01 % (BERNOULLI)
    LIMIT 10
""")

Unnamed: 0,fecha,periodo,origen,destino,distancia,actividad_origen,actividad_destino,estudio_origen_posible,estudio_destino_posible,residencia,renta,edad,sexo,viajes,viajes_km,loaded_at,source_file
0,20230301,18,200306,0200304,0.5-2,frecuente,casa,no,no,2,10-15,0-25,hombre,7.314,12.617,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
1,20230301,2,200308,0200307,2-10,frecuente,casa,no,no,2,10-15,0-25,mujer,9.675,21.203,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
2,20230301,17,305905,03005_AM,2-10,frecuente,casa,no,no,3,<10,0-25,mujer,10.296,72.003,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
3,20230301,15,311902,0301404,0.5-2,frecuente,casa,no,no,3,10-15,25-45,mujer,3.996,7.662,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
4,20230301,11,306301,03026_AM,2-10,frecuente,casa,no,no,3,<10,,,5.224,41.202,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
5,20230301,7,3011,0303102,0.5-2,frecuente,casa,no,no,3,10-15,45-65,mujer,6.172,7.305,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
6,20230301,21,308202,03042,2-10,frecuente,casa,no,no,3,<10,,,2.027,9.913,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
7,20230301,5,304902,03055,2-10,frecuente,casa,no,no,3,<10,,,2.992,28.929,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
8,20230301,11,306501,0306502,0.5-2,frecuente,casa,no,no,3,10-15,45-65,mujer,22.668,24.994,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...
9,20230301,21,820501,0820502,0.5-2,frecuente,casa,no,no,8,>15,25-45,hombre,36.323,49.042,2025-12-02 18:31:02.395660+01:00,https://movilidad-opendata.mitma.es/estudios_b...


In [20]:
%pip install requests geopandas

Note: you may need to restart the kernel to use updated packages.


In [21]:
import urllib.request
import re

def get_mitma_zoning_urls(zone_type):
    """
    Fetches MITMA Zoning URLs (Shapefiles + CSVs) from RSS feed using Regex.
    Matches the style of 'get_mitma_urls' but for static zoning files.
    """
    rss_url = "https://movilidad-opendata.mitma.es/RSS.xml"
    
    # Normalizaci√≥n de input
    if zone_type not in ["distritos", "municipios", "gau"]:
        raise ValueError(f"Invalid zone_type: {zone_type}. Must be 'distritos', 'municipios', or 'gau'.")

    # L√≥gica de sufijos para construir el Regex
    # Carpeta en URL: zonificacion_municipios | zonificacion_distritos | zonificacion_GAU
    folder_suffix = "GAU" if zone_type == "gau" else zone_type
    
    # Sufijo en ficheros CSV: nombres_municipios | nombres_distritos | nombres_gaus
    file_suffix = "gaus" if zone_type == "gau" else zone_type
    
    # --- REGEX PATTERNS ---
    # 1. Pattern para componentes del Shapefile (.shp, .shx, .dbf, .prj)
    # Busca URLs que contengan /zonificacion_{Suffix}/ y terminen en extensi√≥n de shapefile
    shp_pattern = rf'(https?://[^\s"<>]*/zonificacion/zonificacion_{folder_suffix}/[^"<>]+\.(?:shp|shx|dbf|prj))'
    
    # 2. Pattern para CSVs auxiliares (nombres_*.csv, poblacion_*.csv)
    # Busca URLs que contengan /zonificacion_{Suffix}/ y sean nombres_X.csv o poblacion_X.csv
    csv_pattern = rf'(https?://[^\s"<>]*/zonificacion/zonificacion_{folder_suffix}/(?:nombres|poblacion)_{file_suffix}\.csv)'

    print(f"üì° Scanning RSS for {zone_type} zoning files...")

    try:
        # Fetch RSS with User-Agent
        req = urllib.request.Request(rss_url, headers={"User-Agent": "MITMA-DuckLake-Loader"})
        with urllib.request.urlopen(req) as response:
            txt = response.read().decode("utf-8", "ignore")
        
        # Find matches
        shp_matches = re.findall(shp_pattern, txt, re.IGNORECASE)
        csv_matches = re.findall(csv_pattern, txt, re.IGNORECASE)
        
        # Deduplicate
        unique_shp = sorted(list(set(shp_matches)))
        unique_csv = sorted(list(set(csv_matches)))
        
        # Organizar resultados
        url_nombres = next((u for u in unique_csv if 'nombres' in u.lower()), None)
        url_poblacion = next((u for u in unique_csv if 'poblacion' in u.lower()), None)
        
        if not unique_shp and not unique_csv:
            print("WARNING: No zoning URLs found in RSS. The feed might have rotated them out.")
            # Opcional: Aqu√≠ podr√≠as lanzar error o devolver fallback. 
            # Si quieres mantener el estilo estricto del otro script, devolvemos vacio.
            return {}

        print(f"Found {len(unique_shp)} shapefile components and {len(unique_csv)} CSVs.")
        
        return {
            "shp_components": unique_shp,
            "nombres": url_nombres,
            "poblacion": url_poblacion
        }

    except Exception as e:
        print(f"ERROR fetching RSS: {e}")
        return {}


In [22]:
def clean_id(series):
    """Normaliza ID a string limpio (sin .0, sin espacios)."""
    return series.astype(str).str.strip().str.replace(r'\.0$', '', regex=True)

def clean_poblacion(series):
    """Limpia enteros de poblaci√≥n (quita puntos y decimales)."""
    return (series.astype(str)
            .str.replace('.', '', regex=False)
            .str.replace(r'\.0$', '', regex=True)
            .apply(pd.to_numeric, errors='coerce')
            .fillna(0).astype(int))

def get_mitma_zoning_dataset(zone_type='municipios'):
    """
    Orquesta la descarga, limpieza y fusi√≥n de datos maestros.
    Retorna un GeoDataFrame listo para ingesta.
    """
    urls = get_mitma_zoning_urls(zone_type)
    
    print(f"üöÄ Generando dataset maestro para: {zone_type.upper()}")
    
    with tempfile.TemporaryDirectory() as tmp_dir:
        print("   ‚¨áÔ∏è  Descargando geometr√≠as...")
        shp_local_path = None
        
        for url in urls['shp_components']:
            filename = url.split('/')[-1]
            try:
                r = requests.get(url, timeout=15)
                if r.status_code == 200:
                    local_p = os.path.join(tmp_dir, filename)
                    with open(local_p, 'wb') as f:
                        f.write(r.content)
                    if filename.endswith('.shp'):
                        shp_local_path = local_p
            except Exception as e:
                print(f"      ‚ö†Ô∏è Error bajando {filename}: {e}")

        if not shp_local_path:
            print("‚ùå Error: No se pudo descargar el archivo .shp principal.")
            return None

        gdf = gpd.read_file(shp_local_path)
        
        id_col = next((c for c in gdf.columns if c.upper() in ['ID', 'CODIGO', 'ZONA', 'COD_GAU']), 'ID')
        gdf['ID'] = clean_id(gdf[id_col])
        
        gdf['geometry'] = gdf['geometry'].apply(make_valid)
        if gdf.crs and gdf.crs.to_string() != "EPSG:4326":
            gdf = gdf.to_crs("EPSG:4326")

        print("   üîó Integrando metadatos (Nombres y Poblaci√≥n)...")
        df_aux = pd.DataFrame(columns=['ID'])
        
        aux_config = [
            {
                'type': 'nombres', 
                'url': urls['nombres'], 
                'header': 0, 
                'cols': ['ID', 'Nombre']
            },
            {
                'type': 'poblacion', 
                'url': urls['poblacion'], 
                'header': None, 
                'cols': ['ID', 'Poblacion']
            }
        ]

        for cfg in aux_config:
            try:
                r = requests.get(cfg['url'], timeout=10)
                if r.status_code == 200:
                    # Leer CSV crudo
                    df_t = pd.read_csv(
                        io.BytesIO(r.content), 
                        sep='|', 
                        header=cfg['header'], 
                        dtype=str, 
                        engine='python'
                    )
                    

                    if len(df_t.columns) >= 3:
                        df_t = df_t.iloc[:, [1, 2]]
                    elif len(df_t.columns) == 2:
                        df_t = df_t.iloc[:, [0, 1]]
                    
                    df_t.columns = cfg['cols']
                    
                    df_t['ID'] = clean_id(df_t['ID'])
                    df_t = df_t.drop_duplicates(subset=['ID'])
                    
                    if cfg['type'] == 'poblacion':
                        df_t['Poblacion'] = clean_poblacion(df_t['Poblacion'])

                    if df_aux.empty:
                        df_aux = df_t
                    else:
                        df_aux = df_aux.merge(df_t, on='ID', how='outer')
                        
                    print(f"      ‚úì {cfg['type'].capitalize()} OK")
            except Exception as e:
                print(f"      ‚ö†Ô∏è Fallo procesando {cfg['type']}: {e}")

        # --- C. Merge Final ---
        if not df_aux.empty:
            gdf = gdf.merge(df_aux, on='ID', how='left')
            
            if 'Nombre' in gdf.columns: 
                gdf['Nombre'] = gdf['Nombre'].fillna(gdf['ID'])
            if 'Poblacion' in gdf.columns: 
                gdf['Poblacion'] = gdf['Poblacion'].fillna(0).astype(int)

        cols = ['ID', 'Nombre', 'Poblacion', 'geometry']
        final_cols = [c for c in cols if c in gdf.columns] + [c for c in gdf.columns if c not in cols]
        gdf = gdf[final_cols]

        print(f"‚úÖ Dataset generado: {len(gdf)} registros.")
        return gdf

In [23]:
def load_zonificacion(type):
    """
    Load zonification data into DuckDB for the specified type.
    """
    df = get_mitma_zoning_dataset(type)
    
    if df is None or df.empty:
        print(f"No data to load for {type}")
        return
    
    # Convert all columns to string (including geometry)
    for col in df.columns:
        df[col] = df[col].astype(str)
    
    table_name = f'{LAKE_LAYER}_mitma_{type}'
    
    con.register('temp_zonificacion', df)
    
    SQL(f"""
        CREATE TABLE IF NOT EXISTS {table_name} AS
        SELECT
            *,
            CURRENT_TIMESTAMP AS loaded_at,
        FROM temp_zonificacion
        LIMIT 0;
    """)
    
    merge_key = 'ID'
    
    SQL(f"""
        MERGE INTO {table_name} AS target
        USING (
            SELECT
                *,
                CURRENT_TIMESTAMP AS loaded_at,
            FROM temp_zonificacion
        ) AS source
        ON target.{merge_key} = source.{merge_key}
        WHEN MATCHED THEN
            UPDATE SET *
        WHEN NOT MATCHED THEN
            INSERT *;
    """)
    
    con.unregister('temp_zonificacion')
    
    print(f"Table {table_name} merged successfully with {len(df)} records.")
    

In [24]:
SQL("""
    SELECT *
    FROM bronze_mitma_gau
    LIMIT 10
""")

Unnamed: 0,ID,Nombre,Poblacion,geometry,loaded_at
0,01001,Alegr√≠a-Dulantzi,29250,POINT (-2.511272 42.829065),2025-12-02 18:32:19.196526+01:00
1,01002,Amurrio,103070,POINT (-2.971689 43.025464),2025-12-02 18:32:19.196526+01:00
2,01004_AM,Artziniega agregacion de municipios,30050,POINT (-3.076283 43.150032),2025-12-02 18:32:19.196526+01:00
3,01009_AM,Asparrena agregacion de municipios,45990,POINT (-2.430988 42.883162),2025-12-02 18:32:19.196526+01:00
4,01010,Ayala/Aiara,29510,POINT (-3.078215 43.075551),2025-12-02 18:32:19.196526+01:00
5,01017_AM,Campezo/Kanpezu agregacion de municipios,43140,POINT (-2.435706 42.700123),2025-12-02 18:32:19.196526+01:00
6,01028_AM,Labastida/Bastida agregacion de municipios,75150,POINT (-2.687343 42.60083),2025-12-02 18:32:19.196526+01:00
7,01036,Laudio/Llodio,180090,POINT (-2.977515 43.13818),2025-12-02 18:32:19.196526+01:00
8,01043,Oy√≥n-Oion,34180,POINT (-2.432799 42.544579),2025-12-02 18:32:19.196526+01:00
9,01047_AM,Erriberabeitia agregacion de municipios,37710,POINT (-3.074379 42.826092),2025-12-02 18:32:19.196526+01:00


In [25]:
table_name = "mitma_ine_relations"
# create_and_merge_table(table_name, ["https://movilidad-opendata.mitma.es/zonificacion/relacion_ine_zonificacionMitma.csv"])

In [26]:
SQL("""
    SELECT *
    FROM bronze_mitma_ine_relations
    LIMIT 10
""")

Unnamed: 0,seccion_ine,distrito_ine,municipio_ine,distrito_mitma,municipio_mitma,gau_mitma,loaded_at,source_file
0,100101001,100101,1001,01001,01001,01001,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
1,100101002,100101,1001,01001,01001,01001,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
2,100201001,100201,1002,01002,01002,01002,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
3,100201002,100201,1002,01002,01002,01002,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
4,100201003,100201,1002,01002,01002,01002,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
5,100201004,100201,1002,01002,01002,01002,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
6,100201005,100201,1002,01002,01002,01002,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
7,100201006,100201,1002,01002,01002,01002,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
8,100201007,100201,1002,01002,01002,01002,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...
9,100301001,100301,1003,01058_AM,01058_AM,01058_AM,2025-12-02 20:53:25.217176+01:00,https://movilidad-opendata.mitma.es/zonificaci...


In [27]:
def load_municipios_ine():
    """
    Load municipios from INE datasource
    """
    table_name = 'ine_municipios'
    url = 'https://servicios.ine.es/wstempus/js/ES/VALORES_VARIABLE/19'
    
    # Use 'Id' as the primary key for municipios
    create_and_merge_table_from_json(
        table_name, 
        url,
        ['Id']  # Assuming 'Id' is the unique identifier
    )

In [28]:
def load_empresas_municipio_ine(year = 2023):
    """
    Load empresas from INE datasource
    """
    table_name = 'ine_empresas_municipio'
    url = f'https://servicios.ine.es/wstempus/js/ES/DATOS_TABLA/4721?date={year}0101:{year}1231&Tv=40621:248341&Tv=selCri_2:on'
    
    # Use 'Id' as the primary key for municipios
    create_and_merge_table_from_json(
        table_name, 
        url,
        ['COD']  # Assuming 'Id' is the unique identifier
    )

In [29]:
def load_poblacion_municipio_ine(year = 2023):
    """
    Load empresas from INE datasource
    """
    table_name = 'ine_poblacion_municipio'
    url = f'https://servicios.ine.es/wstempus/js/ES/DATOS_TABLA/29005?date={year}0101:{year}1231&nult=1&det=2'
    
    # Use 'Id' as the primary key for municipios
    create_and_merge_table_from_json(
        table_name, 
        url,
        ['COD']  # Assuming 'Id' is the unique identifier
    ) 


In [33]:
def load_renta_municipio_ine(year = 2023):
    """
    Load empresas from INE datasource
    """
    table_name = 'ine_renta_municipio'
    url = f'https://servicios.ine.es/wstempus/js/ES/DATOS_TABLA/30896?date={year}0101'
    
    # Use 'Id' as the primary key for municipios
    create_and_merge_table_from_json(
        table_name, 
        url,
        ['COD']  # Assuming 'Id' is the unique identifier
    ) 


In [None]:
# load_municipios_ine()
# load_empresas_municipio_ine()
# load_poblacion_municipio_ine()
# load_renta_municipio_ine()

Fetching JSON data from https://servicios.ine.es/wstempus/js/ES/DATOS_TABLA/30896?date=20230101...
Using merge keys: ['COD']
Table bronze_ine_renta_municipio merged successfully with 1 key columns. Total rows: 31444


In [35]:
SQL("""
    SELECT *
    FROM bronze_ine_renta_municipio
    LIMIT 10
""")

Unnamed: 0,COD,Nombre,FK_Unidad,FK_Escala,Data,loaded_at,source_url
0,ADRH102301,Abrera. Dato base. Renta neta media por persona.,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
1,ADRH102300,Abrera. Dato base. Renta neta media por hogar.,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
2,ADRH9821769,Abrera. Dato base. Media de la renta por unida...,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
3,ADRH9612349,Abrera. Dato base. Mediana de la renta por uni...,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
4,ADRH9612348,Abrera. Dato base. Renta mediana por hogar.,7,1,[],2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
5,ADRH9612347,Abrera. Dato base. Renta bruta media por perso...,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
6,ADRH9612346,Abrera. Dato base. Renta bruta media por hogar.,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
7,ADRH75855,Abrera distrito 01. Dato base. Renta neta medi...,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
8,ADRH75854,Abrera distrito 01. Dato base. Renta neta medi...,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...
9,ADRH9807923,Abrera distrito 01. Dato base. Media de la ren...,7,1,"[{'Fecha': 1672527600000, 'FK_TipoDato': 1, 'F...",2025-12-02 22:59:14.396410+01:00,https://servicios.ine.es/wstempus/js/ES/DATOS_...


In [None]:
# MITMA data insertion

# import gc

# start_date = '2022-03-01'
# end_date = '2022-03-07'
# types = ['distritos', 'municipios', 'gau']

# for type in types:
#     """
#     Load OD matrices, people day, and overnight stay data for a specific type and date range.
#     """
#     load_od_matrices(type=type, start_date=start_date, end_date=end_date)
#     load_people_day(type=type, start_date=start_date, end_date=end_date)
#     load_overnight_stay(type=type, start_date=start_date, end_date=end_date)
#     load_zonificacion(type=type)
#     print(f"--- Liberando memoria tras {type} ---")
#     gc.collect()  # Fuerza al recolector de basura de Python a limpiar objetos no usados