# Inserción de Emisiones de CO₂ desde EDGARD 2024

Flujo equivalente al que usábamos con `tidy_format_co2_emission_dataset.csv`:
1) Leer Excel de **EDGARD 2024** (`GHG_totals_by_country`)
2) Transformar a **tidy** → `country`, `anio`, `valor_mt` (Gg → Mt)
3) Mapear países a `Paises.id` (ISO3/alias)
4) Asegurar `Unidades (mt)`
5) Volcado a `Hechos` con **UPSERT** (PK: `indicador_id, pais_id, anio`), indicador `emision_co2`


In [1]:
import os
import pandas as pd
import pymysql
from pymysql.constants import CLIENT
from dotenv import load_dotenv

load_dotenv()

DB_HOST = os.getenv('DB_HOST')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_NAME = os.getenv('DB_NAME')

# Conexión a MySQL
conexion = pymysql.connect(
    host=DB_HOST,
    user=DB_USER,
    password=DB_PASSWORD,
    database=DB_NAME,
    client_flag=CLIENT.MULTI_STATEMENTS
)
cursor = conexion.cursor()


## 1. Carga y Exploración Inicial

Leemos el XLSX y vemos su forma, columnas y primeros registros.


In [2]:
from pathlib import Path
import pandas as pd

# EDGARD (Excel)
EXCEL_PATH = Path("../../data/fuentes/climaticos/EDGAR_2024_GHG_booklet_2024.xlsx")
SHEET_NAME = "GHG_totals_by_country"
UNIT = "Gg CO2e"

# 1) Leer Excel
assert EXCEL_PATH.exists(), f"No se encuentra el Excel en {EXCEL_PATH}"
df_raw = pd.read_excel(EXCEL_PATH, sheet_name=SHEET_NAME, engine="openpyxl")

# 2) Normalizar cabeceras
df = df_raw.copy()
df.columns = [str(c).strip().replace(" ", "_").replace("-", "_") for c in df.columns]

# 3) Detectar columna de país y columnas de año
candidatos_pais = ["country", "country_name", "name", "entity"]
pais_col = next((c for c in df.columns if c.lower() in candidatos_pais), None)
if not pais_col:
    pais_col = next((c for c in df.columns if not str(c).isdigit()), None)
    assert pais_col, "No se pudo identificar la columna de país."

cols_anio = [c for c in df.columns if str(c).isdigit()]
assert cols_anio, "No se detectaron columnas de año."

if pais_col != "country":
    df = df.rename(columns={pais_col: "country"})

# 4) Ancho-largo (Gg CO2e tal cual)
id_cols = [c for c in df.columns if c not in cols_anio]
tidy = df.melt(id_vars=id_cols, value_vars=cols_anio, var_name="anio", value_name="valor_gg")
tidy["anio"] = tidy["anio"].astype(int)
tidy["valor_gg"] = pd.to_numeric(tidy["valor_gg"], errors="coerce")
tidy = tidy.dropna(subset=["valor_gg"]).copy()

# (Opcional) excluir agregados tipo World/EU/continentes/bunkers…
_drop_patterns = r"(world|europe|asia|africa|america|antarctica|bunker|international|total|global|eu\d+)"
mask_aggr = tidy["country"].astype(str).str.lower().str.contains(_drop_patterns, regex=True)
tidy = tidy.loc[~mask_aggr].copy()

# 5) Dejar df_co2 en Gg y añadir columna de unidad para que quede claro
df_co2 = (
    tidy[["country", "anio", "valor_gg"]]
    .assign(unit=UNIT)
    .sort_values(["country", "anio"])
    .reset_index(drop=True)
)

# Vista rápida
print("Unidad:", UNIT)
print("Shape:", df_co2.shape)
display(df_co2.head())

# Comprobar missing values
print("\nValores nulos por columna:")
print(df_co2.isnull().sum())


Unidad: Gg CO2e
Shape: (11124, 4)


  mask_aggr = tidy["country"].astype(str).str.lower().str.contains(_drop_patterns, regex=True)


Unnamed: 0,country,anio,valor_gg,unit
0,Afghanistan,1970,15.43743,Gg CO2e
1,Afghanistan,1971,15.364077,Gg CO2e
2,Afghanistan,1972,13.316628,Gg CO2e
3,Afghanistan,1973,13.934875,Gg CO2e
4,Afghanistan,1974,15.177862,Gg CO2e



Valores nulos por columna:
country     0
anio        0
valor_gg    0
unit        0
dtype: int64


## 2. Limpieza y Normalización de Países

Creamos `country_norm`, aplicamos excepciones para casos irregulares, y mapeamos contra la dimensión `Paises`.

In [3]:
# 1) Normalizar country y valor (Gg CO2e)
print("UNIDAD actual de df_co2: Gg CO2e")
df_co2['country_norm'] = (
    df_co2['country']
      .astype(str)
      .str.strip()
      .str.lower()
)

# Quitar separadores de miles y convertir a numérico
df_co2['valor_gg'] = (
    df_co2['valor_gg']
      .astype(str)
      .str.replace(',', '', regex=False)
)
df_co2['valor_gg'] = pd.to_numeric(df_co2['valor_gg'], errors='coerce')

# Informar y descartar filas con NaN en valor_gg
missing = df_co2['valor_gg'].isna().sum()
print(f"⚠️ Se encontraron {missing} filas con valores no numéricos en 'valor_gg' y se descartarán.")
df_co2 = df_co2[df_co2['valor_gg'].notna()]

# 2) Cargar dimensión Paises (codigo, nombre_en)
cursor.execute("SELECT codigo, nombre_en FROM Paises;")
dim_paises = { ne.strip().lower(): code for code, ne in cursor.fetchall() }

# 3) Excepciones/alias (EDGARD → nombre_en)
exceptions_co2 = {
    'bolivia': 'bolivia (plurinational state of)',
    'british virgin islands': 'virgin islands (british)',
    'brunei': 'brunei darussalam',
    'cape verde': 'cabo verde',
    'czech republic': 'czechia',
    'democratic republic of the congo': 'congo (the democratic republic of)',
    'falkland islands': 'falkland islands (the) [malvinas]',
    'iran': 'iran (islamic republic of)',
    "ivory coast": "côte d'ivoire",
    "cote d'ivoire": "côte d'ivoire",
    'laos': "lao people's democratic republic",
    'macau': 'macao',
    'moldova': 'moldova (the republic of)',
    'namibia': None,
    'netherlands': 'netherlands (kingdom of the)',
    'north korea': "korea (the democratic people's republic of)",
    'russia': 'russian federation',
    'são tomé and príncipe': 'sao tome and principe',
    'south korea': 'korea (the republic of)',
    'syria': 'syrian arab republic',
    'taiwan': 'taiwan (province of china)',
    'tanzania': 'tanzania, the united republic of',
    'turkey': 'türkiye',
    'united kingdom': 'united kingdom of great britain and northern ireland',
    'united states': 'united states of america',
    'venezuela': 'venezuela (bolivarian republic of)',
    'vietnam': 'viet nam',
    'spain and andorra': 'spain',
    "côte d’ivoire" : "côte d'ivoire",
    'democratic republic of the congo' : 'congo (the democratic republic of the)',
    'faroes': 'faroe islands',
    'france and monaco': 'france',
    'italy, san marino and the holy see': 'italy',
    'the gambia': 'gambia',
    'israel and palestine, state of': 'israel',
    'myanmar/burma': 'myanmar',
    'serbia and montenegro': 'serbia',
    'sudan and south sudan': 'sudan',
    'switzerland and liechtenstein': 'switzerland',
    'western sahara': 'western sahara*',
}

# 4) Aplicar excepciones/alias
df_co2['country_db_name'] = df_co2['country_norm'].map(
    lambda x: exceptions_co2[x] if x in exceptions_co2 else x
)

# 5) Explode por si alguna excepción devuelve lista (y permitir descartar con None)
df_co2['country_list'] = df_co2['country_db_name'].apply(
    lambda v: [] if v is None else (v if isinstance(v, list) else [v])
)
df_co2 = df_co2.explode('country_list')

# 6) Mapear a código (clave: nombre_en en minúsculas)
df_co2['pais_id'] = (
    df_co2['country_list']
      .astype(str)
      .str.strip()
      .str.lower()
      .map(dim_paises)
)

# 6.1) Mostrar países NO mapeados (lista única) y guardarlos en CSV
mask_no = df_co2['pais_id'].isna()
no_mapeados_df = (
    df_co2.loc[mask_no, ['country', 'country_norm', 'country_db_name', 'country_list']]
        .drop_duplicates()
        .sort_values('country')
)
if not no_mapeados_df.empty:
    print(f"⚠️ Países no encontrados en la dimensión Paises (únicos: {len(no_mapeados_df)}). Revisa alias/dimensiones:")
else:
    print("✅ Todos los países fueron mapeados correctamente.")

# 7) Filtrar filas sin mapeo e informar
antes = len(df_co2)
df_co2 = df_co2[~mask_no].copy()
print(f"ℹ️ Filas totales descartadas por no mapeadas: {antes - len(df_co2)}")


UNIDAD actual de df_co2: Gg CO2e
⚠️ Se encontraron 0 filas con valores no numéricos en 'valor_gg' y se descartarán.
⚠️ Países no encontrados en la dimensión Paises (únicos: 1). Revisa alias/dimensiones:
ℹ️ Filas totales descartadas por no mapeadas: 54


## 3. Insertar los datos de emisiones de CO₂

Cada fila de `df_co2` ya tiene su `pais_id` mapeado y el año. Dado que los datos son anuales, usaremos siempre `periodo_id = 17`. A continuación preparamos los *batch inserts* y volcamos los registros en la base de datos.


In [4]:
conexion.ping(reconnect=True)
cursor = conexion.cursor()

# 1) Obtener el indicador_id para emisiones de CO₂
cursor.execute("SELECT id FROM Indicadores WHERE codigo = %s LIMIT 1;", ("emision_co2",))
row = cursor.fetchone()
assert row is not None, "El indicador 'emision_co2' no existe. Créalo antes de insertar."
indic_co2_id = row[0]

# 2) Preparar la lista de tuplas: (pais_id, periodo_id, anio, indicador_id, valor)
fact_hechos = [
    (
        r.pais_id,
        int(r.anio),
        int(indic_co2_id),
        float(r.valor_gg) # Gg CO2e
    )
    for r in df_co2.itertuples(index=False)
]

# 3) SQL de inserción en Hechos (cinco columnas)
sql_hechos = """
INSERT INTO Hechos
  (pais_id, anio, indicador_id, valor)
VALUES (%s, %s, %s, %s);
"""

# 4) Batch‐insert para no saturar el servidor
batch_size = 1000
total = len(fact_hechos)
print(f"UNIDAD insertada: Gg CO2e")
print(f"Total a insertar en Hechos: {total}")

for start in range(0, total, batch_size):
    end = min(start + batch_size, total)
    batch = fact_hechos[start:end]
    try:
        cursor.executemany(sql_hechos, batch)
        conexion.commit()
        print(f"  ✔ Filas {start+1}–{end} insertadas")
    except pymysql.err.OperationalError as e:
        print(f"  ❌ Error en lote {start+1}–{end}:", e)
        conexion.rollback()
        break

# 5) Cerrar
cursor.close()
conexion.close()


UNIDAD insertada: Gg CO2e
Total a insertar en Hechos: 11070
  ✔ Filas 1–1000 insertadas
  ✔ Filas 1001–2000 insertadas
  ✔ Filas 2001–3000 insertadas
  ✔ Filas 3001–4000 insertadas
  ✔ Filas 4001–5000 insertadas
  ✔ Filas 5001–6000 insertadas
  ✔ Filas 6001–7000 insertadas
  ✔ Filas 7001–8000 insertadas
  ✔ Filas 8001–9000 insertadas
  ✔ Filas 9001–10000 insertadas
  ✔ Filas 10001–11000 insertadas
  ✔ Filas 11001–11070 insertadas
