# 13 – ETL de Eurostat: HBS (estructura de gasto de los hogares, UE-27)

En este notebook se transforma el fichero tabular de Eurostat:

    data_raw/eurostat/hbs_str_t211_tabular.tsv

Objetivos:

- Pasar de formato ancho (una columna por año) a formato largo (una fila por año).
- Descomponer la columna compuesta freq,coicop,unit,geo\TIME_PERIOD en:
    - freq  (frecuencia, anual A),
    - coicop (categoría de consumo COICOP),
    - unit (tipo de medida, p. ej. PM = % de la media del gasto),
    - geo  (país / agregado).
- Limpiar los valores:
    - ":" → NULL,
    - valores con banderas de calidad ("23.5 u", "12.1 p", etc.) → usar solo la parte numérica.
- Filtrar a:
    - frecuencia anual (freq = 'A'),
    - unidad PM,
    - países de la UE-27.
- Guardar el resultado en:

    `data_processed/eurostat/hbs_eu27_long.parquet`

In [14]:
from pathlib import Path
import duckdb
import pandas as pd

# Carpeta raíz del proyecto
ROOT_DIR = Path("..").resolve().parent

# Carpetas de datos
DATA_RAW = ROOT_DIR / "data_raw" / "eurostat"
DATA_PROCESSED = ROOT_DIR / "data_processed" / "eurostat"

# Fichero HBS original (Eurostat)
HBS_PATH = DATA_RAW / "hbs_str_t211_tabular.tsv"

# Fichero de salida (Parquet largo UE-27)
HBS_PARQUET = DATA_PROCESSED / "hbs_eu27_long.parquet"

ROOT_DIR, HBS_PATH.exists(), HBS_PATH

(WindowsPath('C:/Users/santi/OneDrive - UNIR/UNIR/MASTER ANÁLISIS Y VISUALIZACIÓN BIG DATA/TFM/dashboard-coherencia-ue-tfm'),
 True,
 WindowsPath('C:/Users/santi/OneDrive - UNIR/UNIR/MASTER ANÁLISIS Y VISUALIZACIÓN BIG DATA/TFM/dashboard-coherencia-ue-tfm/data_raw/eurostat/hbs_str_t211_tabular.tsv'))

In [15]:
# Conexión en memoria
con = duckdb.connect(database=":memory:")

# Vista rápida
hbs_preview = con.execute(f"""
    SELECT *
    FROM read_csv_auto('{HBS_PATH}', delim='\t', header=TRUE)
    LIMIT 5
""").fetchdf()

hbs_preview

Unnamed: 0,"freq,coicop,unit,geo\TIME_PERIOD",1988,1994,1999,2005,2010,2015,2020
0,"A,CP01,PM,AT",:,154,134,130,121,118,120
1,"A,CP01,PM,BE",161,120,133,135,132,129,159
2,"A,CP01,PM,BG",:,:,482,315,293,266,246
3,"A,CP01,PM,CY",:,:,178,151,123,153,153 e
4,"A,CP01,PM,CZ",:,:,232,206,203,211,207


In [16]:
# Esquema
hbs_schema = con.execute(f"""
    DESCRIBE
    SELECT *
    FROM read_csv_auto('{HBS_PATH}', delim='\t', header=TRUE)
""").fetchdf()

hbs_schema.head()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,"freq,coicop,unit,geo\TIME_PERIOD",VARCHAR,YES,,,
1,1988,VARCHAR,YES,,,
2,1994,VARCHAR,YES,,,
3,1999,VARCHAR,YES,,,
4,2005,VARCHAR,YES,,,


In [17]:
# Nombre real de la primera columna compuesta
first_col_hbs = hbs_preview.columns[0]
first_col_hbs

'freq,coicop,unit,geo\\TIME_PERIOD'

In [18]:
# Columnas que son años (el resto del TSV)
year_cols = [c for c in hbs_preview.columns[1:] if str(c).isdigit()]
years_sql = ", ".join(year_cols)

year_cols, years_sql

(['1988', '1994', '1999', '2005', '2010', '2015', '2020'],
 '1988, 1994, 1999, 2005, 2010, 2015, 2020')

In [19]:
# Lista de países UE-27
eu27_codes = [
    "AT","BE","BG","HR","CY","CZ","DK","EE","FI","FR","DE","EL",
    "HU","IE","IT","LV","LT","LU","MT","NL","PL","PT","RO","SK",
    "SI","ES","SE"
]
eu27_list_sql = ", ".join(f"'{c}'" for c in eu27_codes)
eu27_codes[:5]

['AT', 'BE', 'BG', 'HR', 'CY']

In [20]:
# Aseguramos la carpeta de salida
HBS_PARQUET.parent.mkdir(parents=True, exist_ok=True)

con.execute(f"""
    COPY (
        WITH base AS (
            -- Leemos el TSV y separamos la columna compuesta en 4 campos:
            -- freq, coicop, unit, geo
            SELECT
                split_part("{first_col_hbs}", ',', 1) AS freq,
                split_part("{first_col_hbs}", ',', 2) AS coicop,
                split_part("{first_col_hbs}", ',', 3) AS unit,
                split_part("{first_col_hbs}", ',', 4) AS geo,
                *
            FROM read_csv_auto('{HBS_PATH}', delim='\t', header=TRUE)
        ),
        long AS (
            -- Pasamos de columnas por año a filas (UNPIVOT)
            SELECT
                freq,
                coicop,
                unit,
                geo,
                year,
                value
            FROM base
            UNPIVOT (
                value FOR year IN ({years_sql})
            )
        )
        SELECT
            unit,
            coicop,
            geo,
            CAST(year AS INTEGER) AS year,
            -- Limpieza del valor:
            --   - nos quedamos con el primer token antes del espacio ('23.5 u' -> '23.5')
            --   - ':' o cadena vacía → NULL
            --   - casteamos a DOUBLE
            NULLIF(
                NULLIF(TRIM(split_part(value, ' ', 1)), ''),
                ':'
            )::DOUBLE AS hbs_share
        FROM long
        WHERE
            freq = 'A'          -- frecuencia anual
            AND unit = 'PM'     -- porcentaje del gasto medio del hogar
            AND geo IN ({eu27_list_sql})
    )
    TO '{HBS_PARQUET}'
    (FORMAT PARQUET)
""")

HBS_PARQUET.exists(), HBS_PARQUET

(True,
 WindowsPath('C:/Users/santi/OneDrive - UNIR/UNIR/MASTER ANÁLISIS Y VISUALIZACIÓN BIG DATA/TFM/dashboard-coherencia-ue-tfm/data_processed/eurostat/hbs_eu27_long.parquet'))

In [21]:
# Muestra ordenada por país, categoría y año
hbs_sample = con.execute(f"""
    SELECT *
    FROM read_parquet('{HBS_PARQUET}')
    ORDER BY geo, coicop, year
    LIMIT 10
""").fetchdf()

hbs_sample

Unnamed: 0,unit,coicop,geo,year,hbs_share
0,PM,CP01,AT,1988,
1,PM,CP01,AT,1994,154.0
2,PM,CP01,AT,1999,134.0
3,PM,CP01,AT,2005,130.0
4,PM,CP01,AT,2010,121.0
5,PM,CP01,AT,2015,118.0
6,PM,CP01,AT,2020,120.0
7,PM,CP011,AT,1988,
8,PM,CP011,AT,1994,139.0
9,PM,CP011,AT,1999,120.0


In [22]:
# Resumen rápido: años, filas, países, categorías, unidades
hbs_stats = con.execute(f"""
    SELECT
        MIN(year)              AS min_year,
        MAX(year)              AS max_year,
        COUNT(*)               AS n_filas,
        COUNT(DISTINCT geo)    AS n_paises,
        COUNT(DISTINCT coicop) AS n_coicop,
        COUNT(DISTINCT unit)   AS n_units
    FROM read_parquet('{HBS_PARQUET}')
""").fetchdf()

hbs_stats

Unnamed: 0,min_year,max_year,n_filas,n_paises,n_coicop,n_units
0,1988,2020,42980,27,236,1


In [23]:
# Tipos de datos del Parquet resultante
hbs_schema_out = con.execute(f"""
    DESCRIBE
    SELECT *
    FROM read_parquet('{HBS_PARQUET}')
""").fetchdf()

hbs_schema_out

Unnamed: 0,column_name,column_type,null,key,default,extra
0,unit,VARCHAR,YES,,,
1,coicop,VARCHAR,YES,,,
2,geo,VARCHAR,YES,,,
3,year,INTEGER,YES,,,
4,hbs_share,DOUBLE,YES,,,
