In [0]:
%pip install econdata

In [0]:
from econdata import BCRP

BCRP.search(
    consulta=['PBI'],
    grupo=['Producto', 'variaciones'],
    frecuencia='Trimestral'
)

In [0]:
df = BCRP.get_data(
    series={'PN02526AQ':'PBI'},
    fechaini='2000Q1',
    fechafin='2024Q4'
)

df

In [0]:
from econdata import WB

WB.search(
    consulta=['gdp', 'per', 'capita']
)

In [0]:
df = WB.get_data(
    countries = {
        'CO': 'Colombia',
        'CL': 'Chile',
        'PE': 'Perú'
    },
    indicators = {
        'NY.GDP.PCAP.PP.KD': 'Real GDP per capita'
    },
    fechaini = '2012',
    fechafin = '2024'
)

df

In [0]:
"""
Algoritmo para estimar TENDENCIAS de largo plazo y pronosticar PBI e Inflación
para países de Sudamérica. Diseñado para usarse con datos obtenidos vía `econdata`
(u otras fuentes), en Databricks o local.

Enfoque principal:
- Descomposición por modelo estructural (State Space) con tendencia local (local linear trend)
  usando `statsmodels.tsa.statespace.UnobservedComponents`.
- Opción alternativa: filtro de Hodrick–Prescott (HP) para estimar la tendencia suave.
- Forecast de la TENDENCIA (no del ciclo/ruido) para horizontes anuales o trimestrales.

Indicadores sugeridos (World Bank / práctica común):
- PBI (nivel real): NY.GDP.MKTP.KD (GDP, constant 2015 US$)  — tendencia sobre log.
- PBI crecimiento %: NY.GDP.MKTP.KD.ZG (GDP growth, annual %).
- Inflación %: FP.CPI.TOTL.ZG (Inflation, consumer prices, annual %).

Países (ISO3): ARG, BOL, BRA, CHL, COL, ECU, GUY, PER, PRY, SUR, URY, VEN.

NOTA IMPORTANTE:
- Este script es independiente de `econdata`. Incluye un bloque opcional para descargar
  con `econdata` (comentado), ya que la estructura exacta de funciones puede variar por versión.
  Si ya tienes tus DataFrames (p.ej. desde `econdata` o CSVs), basta con pasarlos a
  `run_pipeline(...)` en el formato especificado.

Requisitos:
  pip install pandas numpy statsmodels matplotlib
  (econdata es OPCIONAL si vas a descargar en vivo)
"""

from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.statespace.structural import UnobservedComponents
from statsmodels.tsa.filters.hp_filter import hpfilter

# =============================================================
# 1) Utilidades para ESTRUCTURA DE DATOS
# -------------------------------------------------------------
# Se espera un DataFrame con las columnas: ["date", "value", "country", "series"]
# - date: Periodo ("YYYY" para anual, "YYYY-Qn" para trimestral, o datetime64)
# - value: valor numérico de la métrica
# - country: código ISO3 del país
# - series: nombre/código del indicador (ej. "GDP_const2015USD", "Inflation_yoy")
# =============================================================


def ensure_period_index(df: pd.DataFrame, freq: str) -> pd.DataFrame:
    """Asegura que 'date' sea un PeriodIndex con frecuencia freq ('A' anual, 'Q' trimestral, 'M' mensual).
    Devuelve df ordenado por date.
    """
    out = df.copy()
    if np.issubdtype(out["date"].dtype, np.datetime64):
        out["date"] = out["date"].dt.to_period(freq)
    else:
        # Intentar parsear strings: "YYYY", "YYYY-Qn", etc.
        out["date"] = out["date"].astype(str)
        if freq.upper() == "A":
            out["date"] = pd.PeriodIndex(out["date"], freq="A")
        elif freq.upper() == "Q":
            # Acepta formatos tipo "2020Q1" o "2020-Q1" o "2020-Q1"
            out["date"] = out["date"].str.replace("-", "", regex=False).str.replace(" ", "", regex=False)
            out["date"] = out["date"].str.replace("Q", "Q", regex=False)
            # Normalizar a YYYYQn
            out["date"] = out["date"].str.replace(r"(\d{4})[\-\s]?Q(\d)", r"\1Q\2", regex=True)
            out["date"] = pd.PeriodIndex(out["date"], freq="Q")
        elif freq.upper() == "M":
            # Espera "YYYY-MM"
            out["date"] = pd.PeriodIndex(out["date"], freq="M")
        else:
            raise ValueError(f"Frecuencia no soportada: {freq}")
    return out.sort_values("date").reset_index(drop=True)


# =============================================================
# 2) MODELOS DE TENDENCIA: UCM (Local Linear Trend) y HP filter
# =============================================================

@dataclass
class TrendResult:
    country: str
    series: str
    freq: str
    fitted_trend: pd.Series
    forecast_trend: pd.Series
    model_summary: Optional[str] = None


def fit_ucm_trend(y: pd.Series, freq: str, seasonal: bool = False, seasonal_periods: Optional[int] = None,
                  future_periods: int = 5) -> Tuple[pd.Series, pd.Series, str]:
    """
    Ajusta un modelo estructural con tendencia local (local linear trend).
    - y: Serie indexada por PeriodIndex.
    - seasonal: True si se desea componente estacional (para mensual/trimestral).
    - seasonal_periods: p.ej. 12 (mensual), 4 (trimestral). Si None, se infiere de 'freq'.
    - future_periods: horizonte de pronóstico en cantidad de períodos de la frecuencia.
    Return: (trend_in_sample, trend_forecast, summary_str)
    """
    if seasonal and seasonal_periods is None:
        seasonal_periods = 12 if freq.upper() == "M" else 4 if freq.upper() == "Q" else None

    # El componente de tendencia local ya captura tendencia suave.
    mod = UnobservedComponents(endog=y.astype(float), level="llevel", trend=True,
                               seasonal=seasonal_periods if seasonal and seasonal_periods else None,
                               freq_seasonal=None)
    res = mod.fit(disp=False)

    # Tendencia filtrada in-sample
    trend_fitted = res.level.smoothed
    # Forecast de la serie (incluye tendencia + otros compon.), pero usamos la tendencia esperada
    # Para extraer la componente de nivel futuro, generamos el pronóstico y tomamos el nivel pronosticado
    y_fore = res.get_forecast(steps=future_periods)
    # El resultado de nivel futuro se aproxima por la media del pronóstico cuando el resto es bajo;
    # alternativamente, podemos prolongar el estado de nivel directamente.
    # Usaremos la media pronosticada como proxy de tendencia de largo plazo.
    trend_fore = y_fore.predicted_mean

    return trend_fitted, trend_fore, res.summary().as_text()


def hp_trend(y: pd.Series, lamb: Optional[float] = None, freq: str = "A") -> pd.Series:
    """Estimación de tendencia con filtro HP. El lambda sugerido por frecuencia:
    - Anual: 6.25
    - Trimestral: 1600
    - Mensual: 129600
    """
    if lamb is None:
        lamb = 6.25 if freq.upper() == "A" else 1600 if freq.upper() == "Q" else 129600
    cycle, trend = hpfilter(y.astype(float), lamb=lamb)
    return trend


# =============================================================
# 3) PIPELINE MULTIPAÍS / MULTISERIE
# =============================================================

SOUTH_AMERICA_ISO3 = ["ARG","BOL","BRA","CHL","COL","ECU","GUY","PER","PRY","SUR","URY","VEN"]


def prepare_panel(df: pd.DataFrame, freq: str) -> pd.DataFrame:
    df = ensure_period_index(df, freq)
    # Chequeo básico de duplicados
    df = df.drop_duplicates(subset=["country","series","date"]).copy()
    # Ordenamos
    df = df.sort_values(["series","country","date"]).reset_index(drop=True)
    return df


def run_trend_for_group(df: pd.DataFrame, freq: str, method: str = "ucm", horizon: int = 5,
                        seasonal: bool = False, seasonal_periods: Optional[int] = None) -> List[TrendResult]:
    results: List[TrendResult] = []
    df = prepare_panel(df, freq)

    for (cc, ss), g in df.groupby(["country","series"], sort=False):
        y = g.set_index("date")["value"].astype(float)
        if method == "ucm":
            trend_in, trend_fc, summ = fit_ucm_trend(y, freq=freq, seasonal=seasonal,
                                                     seasonal_periods=seasonal_periods, future_periods=horizon)
        elif method == "hp":
            trend_in = hp_trend(y, freq=freq)
            # Forecast naive sobre tendencia final (random walk sobre nivel):
            last = trend_in.iloc[-1]
            idx_fc = pd.period_range(start=y.index[-1]+1, periods=horizon, freq=y.index.freq)
            trend_fc = pd.Series([last]*horizon, index=idx_fc)
            summ = f"HP filter lambda auto, freq={freq}"
        else:
            raise ValueError("method debe ser 'ucm' o 'hp'")

        results.append(TrendResult(country=cc, series=ss, freq=freq,
                                   fitted_trend=trend_in, forecast_trend=trend_fc,
                                   model_summary=summ))
    return results


# =============================================================
# 4) PLOTEO Y EXPORTACIÓN
# =============================================================


def plot_trend(result: TrendResult, original: pd.Series, title_prefix: str = ""):
    def to_ts(idx):
        # Convertir PeriodIndex a DatetimeIndex si aplica
        if hasattr(idx, "to_timestamp"):
            try:
                return idx.to_timestamp()
            except Exception:
                return idx
        return idx
    
    # Serie original asegurada
    orig_ts = pd.Series(original.values, index=to_ts(original.index))

    # Fitted trend
    if isinstance(result.fitted_trend, np.ndarray):
        trend_in_ts = pd.Series(result.fitted_trend, index=orig_ts.index)
    else:
        trend_in_ts = result.fitted_trend
        trend_in_ts.index = to_ts(trend_in_ts.index)

    # Forecast trend
    if isinstance(result.forecast_trend, np.ndarray):
        # Construir índice de forecast extendiendo el último del original
        last_date = orig_ts.index[-1]
        horizon = len(result.forecast_trend)
        if isinstance(last_date, pd.Period):
            fc_index = pd.period_range(last_date+1, periods=horizon, freq=last_date.freq).to_timestamp()
        else:
            fc_index = pd.date_range(start=last_date + pd.offsets.YearEnd(), periods=horizon, freq="A")
        trend_fc_ts = pd.Series(result.forecast_trend, index=fc_index)
    else:
        trend_fc_ts = result.forecast_trend
        trend_fc_ts.index = to_ts(trend_fc_ts.index)

    # Plot
    plt.figure(figsize=(10,5))
    plt.plot(orig_ts.index, orig_ts.values, label="Serie original", linewidth=1.2, alpha=0.6)
    plt.plot(trend_in_ts.index, trend_in_ts.values, label="Tendencia (in-sample)", linewidth=2)
    plt.plot(trend_fc_ts.index, trend_fc_ts.values, label="Tendencia (forecast)", linestyle="--")
    plt.title(f"{title_prefix}{result.series} - {result.country}")
    plt.legend()
    plt.grid(True, alpha=0.25)
    plt.tight_layout()
    plt.show()




# =============================================================
# 5) EJEMPLO DE USO
# =============================================================

"""
EJEMPLO 1: Suponiendo que ya tienes data ANUAL en un DataFrame con columnas
[date, value, country, series] para Sudamérica (p.ej. PBI real e Inflación).

# df = pd.read_csv("panel_sudamerica.csv")
# df = prepare_panel(df, freq="A")
# results = run_trend_for_group(df, freq="A", method="ucm", horizon=5)
# -- Visualizar un país/serie --
# g = df.query("country=='PER' and series=='GDP_const2015USD'").set_index("date")["value"]
# per_gdp_res = [r for r in results if r.country=="PER" and r.series=="GDP_const2015USD"][0]
# plot_trend(per_gdp_res, g, title_prefix="Tendencia de largo plazo: ")

EJEMPLO 2 (OPCIONAL): Descarga con econdata (verifica firma exacta de tu versión).
Descomenta y ajusta según tu versión de econdata. La idea es obtener un DataFrame con el formato requerido.
"""

# from econdata import WB
# 
# # Indicadores WB sugeridos
# WB_GDP_LEVEL = "NY.GDP.MKTP.KD"      # GDP, constant 2015 US$
# WB_INFLATION = "FP.CPI.TOTL.ZG"      # Inflation, consumer prices (annual %)
# 
# COUNTRIES = SOUTH_AMERICA_ISO3
# START, END = "2000", "2024"
# 
# # --- PSEUDOCÓDIGO (ajustar a tu versión de econdata): ---
# gdp_df = WB.get_data(series={WB_GDP_LEVEL: "GDP_const2015USD"},
#                      countries=COUNTRIES, fechaini=START, fechafin=END,
#                      periodicidad="anual")
# infl_df = WB.get_data(series={WB_INFLATION: "Inflation_yoy"},
#                       countries=COUNTRIES, fechaini=START, fechafin=END,
#                       periodicidad="anual")
# 
# # Normaliza a formato estándar
# def normalize_wb(df_raw: pd.DataFrame, value_col: str, series_name: str) -> pd.DataFrame:
#     # Adapta estos nombres de columnas a lo que retorne tu econdata.
#     out = df_raw.rename(columns={"anio": "date", value_col: "value", "pais": "country"})
#     out["series"] = series_name
#     return out[["date","value","country","series"]]
# 
# gdp_panel = normalize_wb(gdp_df, value_col="valor", series_name="GDP_const2015USD")
# infl_panel = normalize_wb(infl_df, value_col="valor", series_name="Inflation_yoy")
# panel = pd.concat([gdp_panel, infl_panel], ignore_index=True)
# 
# # Si el PBI está en nivel, suele ser útil trabajar en log para tendencia (opcional):
# panel.loc[panel["series"]=="GDP_const2015USD", "value"] = (
#     np.log(panel.loc[panel["series"]=="GDP_const2015USD", "value"].astype(float))
# )
# 
# # Ejecutar pipeline anual y pronosticar 5 años
# results = run_trend_for_group(panel, freq="A", method="ucm", horizon=5)
# 
# # Ejemplo de plot para Perú - PBI
# per_gdp = panel.query("country=='PER' and series=='GDP_const2015USD'").set_index("date")["value"].astype(float)
# per_res = [r for r in results if r.country=="PER" and r.series=="GDP_const2015USD"][0]
# plot_trend(per_res, per_gdp, title_prefix="Tendencia de largo plazo: ")

"""
Notas y RECOMENDACIONES:
- Para PBI en nivel: use log antes de modelar tendencia. Interprete el forecast en log y re-exponencie si necesita el nivel.
- Para Inflación (ya está en %): modele la serie directamente, o aplique suavizado previo si hay outliers.
- Falta de datos/NA: considere imputación lineal por país antes de ajustar el modelo.
- Trimestral/mensual: active `seasonal=True` y defina `seasonal_periods` (4 o 12) en `run_trend_for_group`.
- Validación: reserve las últimas k observaciones para evaluar MAE/MAPE del trend-forecast.
- Comparabilidad: la TENDENCIA es un concepto suave; no es un pronóstico de corto plazo. Úselo para planes estratégicos.
"""


In [0]:
from econdata import WB
import pandas as pd
import numpy as np

# Indicadores del World Bank
WB_GDP_LEVEL = "NY.GDP.MKTP.KD"   # GDP (constant 2015 US$)
WB_INFLATION = "FP.CPI.TOTL.ZG"  # Inflation, consumer prices (annual %)

# Países
countries = {"PE": "Perú", "CL": "Chile", "CO": "Colombia"}

start, end = "2000", "2025"

# Descarga datos
gdp_df = WB.get_data(countries=countries,
                     indicators={WB_GDP_LEVEL: "GDP_const2015USD"},
                     fechaini=start, fechafin=end)

infl_df = WB.get_data(countries=countries,
                      indicators={WB_INFLATION: "Inflation_yoy"},
                      fechaini=start, fechafin=end)

print("PBI:\n", gdp_df.head())
print("Inflación:\n", infl_df.head())

In [0]:
import pandas as pd

def normalize_pivoted(df_raw: pd.DataFrame, series_name: str) -> pd.DataFrame:
    # Resetear el índice (time -> columna)
    df = df_raw.reset_index().rename(columns={"time": "date"})
    
    # Derretir columnas de países a filas
    df_long = df.melt(id_vars="date", var_name="country", value_name="value")
    
    # Agregar nombre de la serie
    df_long["series"] = series_name
    return df_long[["date", "value", "country", "series"]]

# Ejemplo para tu gdp_df
gdp_panel = normalize_pivoted(gdp_df, "GDP_const2015USD")
print(gdp_panel.head())


In [0]:
infl_panel = normalize_pivoted(infl_df, "Inflation_yoy")
print(gdp_panel.head())

In [0]:
panel = pd.concat([gdp_panel, infl_panel], ignore_index=True)

# Log-transformar el PBI
import numpy as np
panel.loc[panel["series"]=="GDP_const2015USD","value"] = np.log(
    panel.loc[panel["series"]=="GDP_const2015USD","value"].astype(float)
)

print(panel.head())

In [0]:
# Filtrar PBI
pbi_df = panel[panel["series"] == "GDP_const2015USD"].copy()

# Filtrar Inflación
inflacion_df = panel[panel["series"] == "Inflation_yoy"].copy()

pbi_sdf = spark.createDataFrame(pbi_df)
inflacion_sdf = spark.createDataFrame(inflacion_df)

# Guardar PBI
pbi_sdf.write.format("delta").mode("overwrite").saveAsTable("esan_202502.economia.pbi")

# Guardar Inflación
inflacion_sdf.write.format("delta").mode("overwrite").saveAsTable("esan_202502.economia.inflacion")


In [0]:
results = run_trend_for_group(panel, freq="A", method="ucm", horizon=5)

In [0]:
# Convertir columna date a datetime (ej. años -> 2000-01-01)
panel["date"] = pd.to_datetime(panel["date"], format="%Y")
panel["date"] = panel["date"].dt.to_period("A")

In [0]:
# Serie original corregida
per_gdp = panel.query("country == 'Perú' and series == 'GDP_const2015USD'")
per_gdp = per_gdp.set_index("date")["value"].astype(float)

# Resultado del modelo
per_res = [r for r in results if r.country == "Perú" and r.series == "GDP_const2015USD"][0]

# Ahora sí graficar
plot_trend(per_res, per_gdp, title_prefix="Tendencia de largo plazo: ")
