In [1]:

# -*- coding: utf-8 -*-
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from statsmodels.tsa.seasonal import STL, seasonal_decompose
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (11, 6)
plt.rcParams["axes.titlesize"] = 13
plt.rcParams["axes.labelsize"] = 12

class TimeSeriesEDA:
    """
    EDA de series temporales con gráficos y pruebas clásicas.
    
    Parámetros:
        df (pd.DataFrame): datos con una columna de fecha y una columna de valor.
        date_col (str): nombre de la columna de fecha.
        value_col (str): nombre de la columna de valores.
        freq (str|None): frecuencia objetivo (p.ej., 'D', 'W', 'M'). Si None, se infiere.
        agg (str): agregación al resample (p.ej., 'sum', 'mean', 'median').
        tz (str|None): zona horaria si aplica.
    """
    def __init__(self, df, date_col, value_col, freq=None, agg='sum', tz=None):
        self.raw = df.copy()
        self.date_col = date_col
        self.value_col = value_col
        self.freq = freq
        self.agg = agg
        self.tz = tz
        
        # --- Preparación ---
        self.df = self._prepare()
        self._infer_freq_if_needed()
        self.ts = self._resample()
        
    # ---------- Utils ----------
    def _prepare(self):
        df = self.raw.copy()
        # Convertir fecha
        df[self.date_col] = pd.to_datetime(df[self.date_col], errors='coerce')
        if self.tz:
            df[self.date_col] = df[self.date_col].dt.tz_localize(self.tz, ambiguous='NaT', nonexistent='shift_forward')
        # Orden y limpieza
        df = df.dropna(subset=[self.date_col]).sort_values(self.date_col)
        # Quitar duplicados por timestamp (agregando por agg)
        if df[self.date_col].duplicated().any():
            df = df.groupby(self.date_col, as_index=False).agg({self.value_col: self.agg})
        df = df.set_index(self.date_col)
        return df[[self.value_col]].astype(float)
    
    def _infer_freq_if_needed(self):
        if self.freq is None:
            # Inferencia automática con pandas
            inferred = pd.infer_freq(self.df.index)
            self.freq = inferred if inferred is not None else 'D'  # fallback diario
        return self.freq
    
    def _resample(self):
        # Resample a frecuencia uniforme
        try:
            ts = self.df[self.value_col].resample(self.freq).agg(self.agg)
        except Exception:
            # Si la freq inferida falla, caer a 'D'
            ts = self.df[self.value_col].resample('D').agg(self.agg)
            self.freq = 'D'
        return ts
    
    # ---------- Calidad de datos ----------
    def data_quality_report(self):
        s = self.ts.copy()
        idx = s.index
        n = len(s)
        missing = s.isna().sum()
        span = (idx.min(), idx.max())
        expected_points = len(pd.date_range(idx.min(), idx.max(), freq=self.freq))
        gaps = expected_points - n
        duplicates_raw = self.raw[self.date_col].duplicated().sum()
        
        desc = s.describe()
        info = {
            "periodo_inicial": span[0],
            "periodo_final": span[1],
            "frecuencia": self.freq,
            "puntos_observados": n,
            "puntos_esperados": expected_points,
            "huecos_en_resample": gaps,
            "nulos": missing,
            "duplicados_en_raw": int(duplicates_raw),
            "min": float(desc['min']),
            "max": float(desc['max']),
            "media": float(desc['mean']),
            "std": float(desc['std'])
        }
        return pd.Series(info)
    
    def imputar_nulos(self, metodo='ffill'):
        """metodo: 'ffill' | 'bfill' | 'median' | 'zero'"""
        s = self.ts.copy()
        if metodo == 'ffill':
            s = s.ffill()
        elif metodo == 'bfill':
            s = s.bfill()
        elif metodo == 'median':
            s = s.fillna(s.median())
        elif metodo == 'zero':
            s = s.fillna(0)
        else:
            raise ValueError("Método de imputación no soportado.")
        self.ts = s
        return s
    
    # ---------- Gráficos básicos ----------
    def plot_serie(self, titulo="Serie temporal"):
        s = self.ts
        ax = s.plot(color="#2a7", lw=1.6)
        ax.set_title(titulo)
        ax.set_xlabel("Fecha")
        ax.set_ylabel(self.value_col)
        plt.show()
    
    def plot_rolling(self, ventana=None):
        """Rolling media y desviación estándar."""
        s = self.ts
        if ventana is None:
            # ventana ~ 30 días si diario, ~12 si mensual
            ventana = 30 if self.freq.upper().startswith('D') else 12
        roll_mean = s.rolling(ventana, min_periods=max(1, ventana//3)).mean()
        roll_std = s.rolling(ventana, min_periods=max(1, ventana//3)).std()
        ax = s.plot(color="#2a7", alpha=0.6, label="Serie")
        roll_mean.plot(ax=ax, color="#c33", lw=2, label=f"Media móvil ({ventana})")
        roll_std.plot(ax=ax, color="#555", lw=1.5, label=f"Desv. móvil ({ventana})")
        ax.set_title("Media y desviación móvil")
        ax.set_xlabel("Fecha"); ax.set_ylabel(self.value_col)
        ax.legend()
        plt.show()
    
    # ---------- Descomposición ----------
    def descomponer(self, modelo='additive', periodo=None, usar_stl=True):
        """
        modelo: 'additive' o 'multiplicative'
        periodo: estacionalidad (p.ej., 7 para D, 12 para M). Si None se intenta inferir.
        usar_stl: True usa STL (robusta), False usa seasonal_decompose clásico.
        """
        s = self.ts.dropna()
        if periodo is None:
            # heurística: diario->7, semanal->52, mensual->12
            f = self.freq.upper()
            if f.startswith('D'):
                periodo = 7
            elif f.startswith('W'):
                periodo = 52
            elif f.startswith('M'):
                periodo = 12
            else:
                periodo = max(2, int(np.clip(len(s)//12, 2, 365)))
        if usar_stl:
            stl = STL(s, period=periodo, robust=True)
            res = stl.fit()
            fig, axs = plt.subplots(4, 1, sharex=True, figsize=(11, 8))
            axs[0].plot(s, color="#2a7"); axs[0].set_title("Serie")
            axs[1].plot(res.trend, color="#c33"); axs[1].set_title("Tendencia (STL)")
            axs[2].plot(res.seasonal, color="#345"); axs[2].set_title("Estacionalidad (STL)")
            axs[3].plot(res.resid, color="#777"); axs[3].set_title("Residuo")
            for ax in axs: ax.set_xlabel("Fecha")
            plt.tight_layout(); plt.show()
            return {"trend": res.trend, "seasonal": res.seasonal, "resid": res.resid}
        else:
            res = seasonal_decompose(s, model=modelo, period=periodo)
            res.plot()
            plt.suptitle("Descomposición clásica", y=1.02)
            plt.show()
            return {"trend": res.trend, "seasonal": res.seasonal, "resid": res.resid}
    
    # ---------- Estacionariedad ----------
    def pruebas_estacionariedad(self):
        s = self.ts.dropna()
        # ADF (H0: no estacionaria, tiene raíz unitaria)
        adf_stat, adf_p, _, _, adf_crit, _ = adfuller(s, autolag='AIC')
        # KPSS (H0: estacionaria alrededor de tendencia)
        kpss_stat, kpss_p, _, kpss_crit = kpss(s, regression='c', nlags='auto')
        resumen = pd.DataFrame({
            "prueba": ["ADF", "KPSS"],
            "estadístico": [adf_stat, kpss_stat],
            "p_value": [adf_p, kpss_p],
            "crit_1%": [adf_crit.get('1%'), kpss_crit.get('1%')],
            "crit_5%": [adf_crit.get('5%'), kpss_crit.get('5%')],
            "crit_10%": [adf_crit.get('10%'), kpss_crit.get('10%')]
        })
        return resumen
    
    # ---------- Autocorrelación ----------
    def plot_acf_pacf(self, lags=40):
        s = self.ts.dropna()
        fig, axs = plt.subplots(1, 2, figsize=(12, 5))
        plot_acf(s, ax=axs[0], lags=lags, title="ACF (autocorrelación)")
        plot_pacf(s, ax=axs[1], lags=lags, title="PACF (autocorrelación parcial)", method='ywm')
        plt.show()
    
    # ---------- Estacionalidad por calendario ----------
    def estacionalidad_calendario(self):
        df = self.ts.to_frame(name=self.value_col)
        df["año"] = df.index.year
        df["mes"] = df.index.month
        df["día_mes"] = df.index.day
        df["día_semana"] = df.index.dayofweek
        df["semana_año"] = df.index.isocalendar().week.astype(int)
        
        fig, axs = plt.subplots(1, 3, figsize=(15, 4))
        sns.boxplot(data=df, x="mes", y=self.value_col, ax=axs[0], palette="viridis")
        axs[0].set_title("Distribución por mes"); axs[0].set_xlabel("Mes")
        sns.boxplot(data=df, x="día_semana", y=self.value_col, ax=axs[1], palette="magma")
        axs[1].set_title("Distribución por día de semana"); axs[1].set_xlabel("Día (0=Lun)")
        piv = df.pivot_table(index="mes", columns="día_semana", values=self.value_col, aggfunc="mean")
        sns.heatmap(piv, cmap="YlGnBu", ax=axs[2], annot=False)
        axs[2].set_title("Mapa de calor (mes × día semana)"); axs[2].set_xlabel("Día semana"); axs[2].set_ylabel("Mes")
        plt.tight_layout(); plt.show()
    
    # ---------- Outliers y cambios de nivel ----------
    def detectar_outliers(self, metodo='iqr', ventana=None, z_thresh=3.0):
        s = self.ts.copy()
        if metodo == 'iqr':
            q1 = s.quantile(0.25); q3 = s.quantile(0.75)
            iqr = q3 - q1
            low = q1 - 1.5 * iqr; high = q3 + 1.5 * iqr
            mask = (s < low) | (s > high)
        elif metodo == 'zscore':
            if ventana is None:
                ventana = 30 if self.freq.upper().startswith('D') else 12
            roll_mean = s.rolling(ventana, min_periods=max(1, ventana//3)).mean()
            roll_std = s.rolling(ventana, min_periods=max(1, ventana//3)).std()
            z = (s - roll_mean) / (roll_std.replace(0, np.nan))
            mask = z.abs() > z_thresh
        else:
            raise ValueError("Método de outliers no soportado.")
        outliers = s[mask]
        # Gráfico
        ax = s.plot(color="#2a7", alpha=0.5, label="Serie")
        s[mask].plot(ax=ax, style='o', color="#d22", label="Outliers")
        ax.set_title("Detección de outliers"); ax.set_xlabel("Fecha"); ax.set_ylabel(self.value_col)
        ax.legend()
        plt.show()
        return outliers.to_frame(name=self.value_col)
    
    def cambios_de_nivel(self, ventana=None):
        s = self.ts
        if ventana is None:
            ventana = 30 if self.freq.upper().startswith('D') else 12
        roll_mean = s.rolling(ventana, min_periods=max(1, ventana//3)).mean()
        roll_var  = s.rolling(ventana, min_periods=max(1, ventana//3)).var()
        fig, axs = plt.subplots(2, 1, figsize=(11, 7), sharex=True)
        s.plot(ax=axs[0], color="#2a7", alpha=0.6, label="Serie")
        roll_mean.plot(ax=axs[0], color="#c33", lw=2, label=f"Media móvil ({ventana})")
        axs[0].set_title("Cambios de nivel (media móvil)"); axs[0].legend()
        roll_var.plot(ax=axs[1], color="#555", lw=2, label=f"Varianza móvil ({ventana})")
        axs[1].set_title("Cambios de régimen (varianza móvil)"); axs[1].legend()
        axs[1].set_xlabel("Fecha"); axs[0].set_ylabel(self.value_col); axs[1].set_ylabel("Varianza")
        plt.tight_layout(); plt.show()
        return {"rolling_mean": roll_mean, "rolling_var": roll_var}
    
    # ---------- Informe rápido ----------
    def informe_rapido(self):
        print("=== Calidad de datos ===")
        print(self.data_quality_report(), "\n")
        print("=== Pruebas de estacionariedad (ADF & KPSS) ===")
        print(self.pruebas_estacionariedad(), "\n")
        print("=== Sugerencias ===")
        print("- Si ADF p<0.05 y KPSS p>0.05: la serie es estacionaria.")
        print("- Si ADF p>0.05 y KPSS p<0.05: aplicar diferenciación (differencing).")
        print("- Revisar estacionalidad y considerar SARIMA/Prophet/ETS según patrón.")
    
    # ---------- Pipeline completo ----------
    def ejecutar_pipeline(self, imputacion='ffill', periodo=None, lags=40):
        self.plot_serie("Serie temporal (original/resample)")
        self.imputar_nulos(imputacion)
        self.plot_rolling()
        self.descomponer(periodo=periodo, usar_stl=True)
        self.estacionalidad_calendario()
        self.plot_acf_pacf(lags=lags)
        self.detectar_outliers(metodo='zscore')
        self.cambios_de_nivel()
        self.informe_rapido()


# ------------------ EJEMPLO DE USO ------------------
if __name__ == "__main__":
    # Sustituye por tu fichero:
    # df = pd.read_csv("tu_archivo.csv")
    # Supongamos columnas: 'fecha' y 'valor'
    # df = pd.read_csv("serie.csv")
    # eda = TimeSeriesEDA(df, date_col="fecha", value_col="valor", freq=None, agg='sum', tz=None)
    # eda.ejecutar_pipeline(imputacion='ffill', periodo=None, lags=40)
    pass
