# Segundo Trabajo Práctico: Machine Learning para Macroeconomía 2025-01
- Docente: Josué Cox (a20062953@pucp.edu.pe) 

- Fecha de Entrega: Hasta el miércoles 5 de febrero del 2025 a las 11:59 p.m. 

- Integrantes: Franco Vela, Paul Flores, Jhoany Rodríguez

## Ejercicio 3: Estimando un modelo de nivel local
Estime un modelo de nivel local para la inflación peruana para el 
periodo enero 2000 a diciembre 2019 (ver los datos en Inflacion_Peru.csv): 
1. Estime el modelo usando métodos frecuentistas (maximización de la 
verosimilitud) o bayesianos (simulación de la posterior mediante Gibbs 
sampling), escoja solo un método 
2. Muestre los gráficos de los estados estimados y las estimaciones de las 
varianzas.

In [None]:
CODIGO 1:

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import norm
import seaborn as sns

class LocalLevelModel:
    def __init__(self, y, dates, n_iterations=5000, burnin=1000):
        """
        Inicializa el modelo de nivel local
        y: datos observados (inflación)
        dates: fechas correspondientes a las observaciones
        n_iterations: número de iteraciones para Gibbs sampling
        burnin: número de iteraciones iniciales a descartar
        """
        self.y = y
        self.dates = dates
        self.n = len(y)
        self.n_iterations = n_iterations
        self.burnin = burnin
        
        # Almacenamiento para los estados y parámetros
        self.alpha = np.zeros((n_iterations, self.n))  # Estados
        self.sigma_e = np.zeros(n_iterations)  # Varianza del error de observación
        self.sigma_n = np.zeros(n_iterations)  # Varianza del error de estado
        
    def gibbs_sampling(self):
        """
        Implementa el algoritmo de Gibbs sampling
        """
        # Valores iniciales
        self.sigma_e[0] = 0.1
        self.sigma_n[0] = 0.1
        self.alpha[0] = np.zeros(self.n)
        
        # Iteraciones de Gibbs sampling
        for t in range(1, self.n_iterations):
            # 1. Muestrear estados (Forward filtering, backward sampling)
            self.alpha[t] = self.sample_states(self.sigma_e[t-1], self.sigma_n[t-1])
            
            # 2. Muestrear varianzas
            self.sigma_e[t] = self.sample_sigma_e(self.alpha[t])
            self.sigma_n[t] = self.sample_sigma_n(self.alpha[t])
            
            if t % 500 == 0:
                print(f"Iteración {t} completada")
    
    def sample_states(self, sigma_e, sigma_n):
        """
        Muestrea los estados usando FFBS (Forward Filtering Backward Sampling)
        """
        # Forward filtering
        filtered_means = np.zeros(self.n)
        filtered_vars = np.zeros(self.n)
        
        # Inicialización
        filtered_means[0] = self.y[0]
        filtered_vars[0] = sigma_e
        
        # Forward pass
        for t in range(1, self.n):
            # Predicción
             pred_mean = filtered_means[t-1]
            pred_var = filtered_vars[t-1] + sigma_n
            
            # Actualización
            K = pred_var / (pred_var + sigma_e)
            filtered_means[t] = pred_mean + K * (self.y[t] - pred_mean)
            filtered_vars[t] = pred_var * (1 - K)
        
        # Backward sampling
        states = np.zeros(self.n)
        states[-1] = norm.rvs(filtered_means[-1], np.sqrt(filtered_vars[-1]))
        
        for t in range(self.n-2, -1, -1):
            mean = filtered_means[t] + filtered_vars[t]/filtered_vars[t] * (states[t+1] - filtered_means[t])
            var = filtered_vars[t] * (1 - filtered_vars[t]/filtered_vars[t])
            states[t] = norm.rvs(mean, np.sqrt(var))
            
        return states
    
    def sample_sigma_e(self, alpha):
        """
        Muestrea la varianza del error de observación
        """
        e = self.y - alpha
        shape = (self.n + 1) / 2
        scale = np.sum(e**2) / 2
        return 1 / np.random.gamma(shape, 1/scale)
    
    def sample_sigma_n(self, alpha):
    """
        Muestrea la varianza del error de estado
        """
        n = alpha[1:] - alpha[:-1]
        shape = self.n / 2
        scale = np.sum(n**2) / 2
        return 1 / np.random.gamma(shape, 1/scale)
    
    def plot_results(self):
        """
        Genera gráficos de los resultados
        """
        # Usar muestras después del periodo de burnin
        post_burnin = slice(self.burnin, None)
        
        # 1. Estados estimados
        plt.figure(figsize=(15, 7))
        alpha_mean = self.alpha[post_burnin].mean(axis=0)
        alpha_std = self.alpha[post_burnin].std(axis=0)
        
        plt.plot(self.dates, self.y, 'o', label='Inflación observada', alpha=0.5)
        plt.plot(self.dates, alpha_mean, label='Nivel local estimado', linewidth=2)
        plt.fill_between(self.dates, 
                        alpha_mean - 2*alpha_std,
                        alpha_mean + 2*alpha_std,
                        alpha=0.2,
                        label='Intervalo de credibilidad 95%')
        plt.title('Inflación Peruana: Nivel Local Estimado (2000-2019)')
        plt.xlabel('Fecha')
        plt.ylabel('Inflación (%)')
        plt.legend()
        plt.grid(True)
        plt.xticks(rotation=45)
          plt.tight_layout()
        plt.show()
        
        # 2. Distribuciones posteriores de las varianzas
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
        
        sns.histplot(self.sigma_e[post_burnin], ax=ax1, bins=50)
        ax1.set_title('Distribución Posterior σ²_e\n(Varianza del Error de Observación)')
        ax1.set_xlabel('Valor')
        
        sns.histplot(self.sigma_n[post_burnin], ax=ax2, bins=50)
        ax2.set_title('Distribución Posterior σ²_η\n(Varianza del Error de Estado)')
        ax2.set_xlabel('Valor')
        
        plt.tight_layout()
        plt.show()
        
        # Imprimir estadísticas resumidas
        print("\nEstadísticas de las varianzas estimadas:")
        print(f"σ²_e (Varianza del Error de Observación):")
        print(f"  Media = {self.sigma_e[post_burnin].mean():.4f}")
        print(f"  Desviación estándar = {self.sigma_e[post_burnin].std():.4f}")
        print(f"\nσ²_η (Varianza del Error de Estado):")
        print(f"  Media = {self.sigma_n[post_burnin].mean():.4f}")
        print(f"  Desviación estándar = {self.sigma_n[post_burnin].std():.4f}")

# Cargar y preparar datos
df = pd.read_csv('C:/Users/HP/Documents/2) Clases PUCP/VERANO/ML/Inflacion_Peru.csv')
df['fecha'] = pd.to_datetime(df.iloc[:, 0])  # Convertir primera columna a datetime
inflacion = df['PN01273PM'].values  # Usar la columna correcta de inflación
fechas = df['fecha']
# Filtrar datos para el período 2000-2019
mask = (fechas >= '2000-01-01') & (fechas <= '2019-12-31')
inflacion = inflacion[mask]
fechas = fechas[mask]

print("Período de análisis:", fechas.min().strftime('%Y-%m-%d'), "a", fechas.max().strftime('%Y-%m-%d'))
print("Número de observaciones:", len(inflacion))

# Crear y estimar el modelo
modelo = LocalLevelModel(inflacion, fechas, n_iterations=5000, burnin=1000)
modelo.gibbs_sampling()

# Visualizar resultados
modelo.plot_results()

CODIGO 2

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import invgamma
import seaborn as sns

class LocalLevelModel:
    def __init__(self, y, dates, n_iterations=5000, burnin=1000):
        """
        Inicializa el modelo de nivel local usando muestreo de Gibbs
        y: datos observados (inflación)
        dates: fechas correspondientes a las observaciones
        n_iterations: número de iteraciones para Gibbs sampling
        burnin: número de iteraciones iniciales a descartar
        """
        self.y = y
        self.dates = dates
        self.n = len(y)
        self.n_iterations = n_iterations
        self.burnin = burnin
        
        # Almacenamiento para los estados y parámetros
        self.alpha = np.zeros((n_iterations, self.n))  # Estados
        self.sigma_e = np.zeros(n_iterations)  # Varianza del error de observación
        self.sigma_n = np.zeros(n_iterations)  # Varianza del error de estado
        
    def gibbs_sampling(self):
        """
        Implementa el algoritmo de Gibbs sampling con mejora en la eficiencia
        """
        # Valores iniciales
        self.sigma_e[0] = max(np.var(self.y) * 0.1, 1e-6)
        self.sigma_n[0] = max(np.var(self.y) * 0.1, 1e-6)
        self.alpha[0] = np.zeros(self.n)
        
        # Iteraciones de Gibbs sampling
        for t in range(1, self.n_iterations):
            self.alpha[t] = self.sample_states(self.sigma_e[t-1], self.sigma_n[t-1])
            self.sigma_e[t] = self.sample_sigma_e(self.alpha[t])
            self.sigma_n[t] = self.sample_sigma_n(self.alpha[t])
            
            if t % 500 == 0:
                print(f"Iteración {t} completada")
    
    def sample_states(self, sigma_e, sigma_n):
        """
        Muestrea los estados usando Forward Filtering Backward Sampling (FFBS)
        """
        states = np.zeros(self.n)
        mean = np.zeros(self.n)
        variance = np.zeros(self.n)
        
        # Forward Filtering
        mean[0] = self.y[0]
        variance[0] = sigma_e
        for t in range(1, self.n):
            pred_mean = mean[t-1]
            pred_var = variance[t-1] + sigma_n
            K = pred_var / (pred_var + sigma_e)
            mean[t] = pred_mean + K * (self.y[t] - pred_mean)
            variance[t] = pred_var * (1 - K)
            # Backward Sampling
        states[-1] = np.random.normal(mean[-1], np.sqrt(variance[-1]))
        for t in range(self.n - 2, -1, -1):
            mean_t = mean[t] + variance[t] / variance[t+1] * (states[t+1] - mean[t])
            var_t = max(variance[t] * (1 - variance[t] / variance[t+1]), 1e-6)
            states[t] = np.random.normal(mean_t, np.sqrt(var_t))
        
        return states
    
    def sample_sigma_e(self, alpha):
        """
        Muestrea la varianza del error de observación
        """
        e = self.y - alpha
        shape = (self.n - 1) / 2
        scale = max(np.sum(e**2) / 2, 1e-6)
        return invgamma.rvs(shape, scale=scale)
    
    def sample_sigma_n(self, alpha):
        """
        Muestrea la varianza del error de estado
        """
        n = np.diff(alpha)
        shape = (self.n - 1) / 2
        scale = max(np.sum(n**2) / 2, 1e-6)
        return invgamma.rvs(shape, scale=scale)
    
    def plot_results(self):
        """
        Genera gráficos de los resultados
        """
        post_burnin = slice(self.burnin, None)
        alpha_mean = self.alpha[post_burnin].mean(axis=0)
        alpha_std = self.alpha[post_burnin].std(axis=0)
        
        plt.figure(figsize=(15, 7))
        plt.plot(self.dates, self.y, 'o', label='Inflación observada', alpha=0.5)
        plt.plot(self.dates, alpha_mean, label='Nivel local estimado', linewidth=2)
        plt.fill_between(self.dates, 
                         alpha_mean - 2*alpha_std, 
                         alpha_mean + 2*alpha_std, 
                         alpha=0.2, label='Intervalo de credibilidad 95%')
        plt.title('Inflación Peruana: Nivel Local Estimado (2000-2019)')
        plt.xlabel('Fecha')
        plt.ylabel('Inflación (%)')
        plt.legend()
        plt.grid(True)
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
        
        # Gráficos de distribuciones de varianzas
        fig, axes = plt.subplots(1, 2, figsize=(15, 5))
        sns.histplot(self.sigma_e[post_burnin], bins=50, kde=True, ax=axes[0])
        axes[0].set_title("Distribución Posterior de σ²_e")
        axes[0].set_xlabel("Valor")
        sns.histplot(self.sigma_n[post_burnin], bins=50, kde=True, ax=axes[1])
        axes[1].set_title("Distribución Posterior de σ²_η")
        axes[1].set_xlabel("Valor")
        
        plt.tight_layout()
        plt.show()
        
        # Imprimir estadísticas resumidas
        print("\nEstadísticas de las varianzas estimadas:")
        print(f"σ²_e (Varianza del Error de Observación):")
        print(f"  Media = {self.sigma_e[post_burnin].mean():.4f}")
        print(f"  Desviación estándar = {self.sigma_e[post_burnin].std():.4f}")
        print(f"\nσ²_η (Varianza del Error de Estado):")
        print(f"  Media = {self.sigma_n[post_burnin].mean():.4f}")
        print(f"  Desviación estándar = {self.sigma_n[post_burnin].std():.4f}")

# Cargar datos
df = pd.read_csv('C:/Users/HP/Documents/2) Clases PUCP/VERANO/ML/Inflacion_Peru.csv')
df['Fecha'] = pd.to_datetime(df.iloc[:, 0])
inflacion = df['PN01273PM'].values 
fechas = df['Fecha']

mask = (fechas >= '2000-01-01') & (fechas <= '2019-12-31')
inflacion = inflacion[mask]
fechas = fechas[mask]

modelo = LocalLevelModel(inflacion, fechas, n_iterations=5000, burnin=1000)
modelo.gibbs_sampling()
modelo.plot_results()

## Ejercicio 4: Replicando resultado de Stock and Watson (2021)
Usaremos los datos de desempleo, inflación y la tasa de 
interés para la economía norteamericana para el periodo 1960:Q1 – 2000:Q4. 

1. Estima el modelo VAR con 4 rezagos 
2. Utilice restricciones cero de corto plazo recursivas (Choleski) asumiendo 
que la inflación y el desempleo reaccionan con rezagos a la política 
monetaria. En particular asumimos que
$$
\begin{bmatrix} 
\pi_t \\ u_t \\ r_t 
\end{bmatrix} 
= \sum_{p=1}^{4} \phi_p x_{t-p} + 
\begin{bmatrix} 
b_{11} & 0 & 0 \\ 
b_{21} & b_{22} & 0 \\ 
b_{31} & b_{32} & b_{33} 
\end{bmatrix} 
\begin{bmatrix} 
\epsilon_t^1 \\ \epsilon_t^2 \\ \epsilon_t^{MonPol} 
\end{bmatrix}
$$

Donde 𝜋 es la tasa de inflación, 𝑢 es la tasa de desempleo y 𝑟 es la tasa 
de política monetaria. Produzca las funciones de impulso respuesta (FIR) 
para el shock de política monetaria sobre las tres variables endógenas. 
Muestre las FIR con los intervalos de confianza al 95% 

3. Produzca gráficos mostrando la descomposición del error de predicción para cada variable endógena 
4. Produzca gráficos mostrando la descomposición histórica para cada 
variable endógena