
# EDA de Supuestos para Detección de Anomalías en Consumo de Gas

Este cuaderno evalúa **distribución**, **estacionalidad** y **correlación** para ayudarte a elegir el modelo adecuado de **detección de anomalías** en el consumo comercial e industrial de gas.

> Archivo por defecto: `/mnt/data/dataset_contugas.xlsx` (puedes cambiar la ruta en la celda de parámetros).


In [None]:

# ============================
# Parámetros del análisis
# ============================
FILE_PATH = "/mnt/data/dataset_contugas.xlsx"  # <-- Cambia esta ruta si es necesario
SHEET_NAME = None  # None = primera hoja; o pon el nombre/índice de la hoja

# ============================
# Importación de librerías
# ============================
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pathlib import Path
from typing import Optional, Tuple, List

from scipy import stats
from statsmodels.api import qqplot
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.stattools import jarque_bera

pd.set_option("display.max_columns", 100)
pd.set_option("display.width", 140)


In [None]:

# =========================================
# Funciones auxiliares (detección de columnas, etc.)
# =========================================

def _likely_date_names():
    return [
        "fecha", "date", "fecha_consumo", "periodo", "period", "time", "timestamp",
        "Fecha", "FECHA", "Date", "DATE"
    ]

def _likely_target_names():
    return [
        "consumo", "consumption", "gas", "m3", "m^3", "volumen", "volume", "kwh",
        "energia", "energy", "demanda", "usage", "usg", "consumo_m3", "consumo_kwh"
    ]

def find_date_column(df: pd.DataFrame) -> Optional[str]:
    # 1) Intento por nombre
    cols_lower = {c.lower(): c for c in df.columns}
    for name in _likely_date_names():
        if name.lower() in cols_lower:
            return cols_lower[name.lower()]
    # 2) Intento por tipo: buscar columna que parsea a datetime con baja tasa de error
    best_col = None
    best_ok = -1
    for c in df.columns:
        s = df[c]
        try:
            parsed = pd.to_datetime(s, errors="coerce", dayfirst=True, infer_datetime_format=True)
            ok = parsed.notna().sum()
            if ok > best_ok and ok >= max(3, int(0.4*len(s))):  # al menos 40% parseable
                best_ok = ok
                best_col = c
        except Exception:
            continue
    return best_col

def find_target_column(df: pd.DataFrame) -> Optional[str]:
    # 1) Por nombre
    cols_lower = {c.lower(): c for c in df.columns}
    for name in _likely_target_names():
        for c_lower, c in cols_lower.items():
            if name in c_lower:
                # Debe ser numérico
                if pd.api.types.is_numeric_dtype(df[c]):
                    return c
    # 2) Elegir la columna numérica con mayor varianza positiva
    num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
    if not num_cols:
        return None
    var = df[num_cols].var(numeric_only=True).fillna(0.0)
    if var.empty:
        return None
    return var.sort_values(ascending=False).index[0]

def infer_seasonal_period(series: pd.Series, max_lag: int = 400) -> Optional[int]:
    # Usa autocorrelación simple para detectar el primer pico significativo
    # Normaliza la serie y calcula ACF manualmente para evitar dependencias extra
    s = series.dropna().values
    if len(s) < 20:
        return None
    s = (s - np.mean(s)) / (np.std(s) if np.std(s) != 0 else 1)
    acf_vals = [1.0]
    L = min(max_lag, len(s) - 1)
    for lag in range(1, L+1):
        v = np.corrcoef(s[:-lag], s[lag:])[0,1] if len(s) - lag > 1 else np.nan
        acf_vals.append(v)
    acf_vals = np.array(acf_vals, dtype=float)

    # Detectar picos simples sobre umbral
    # Umbral heurístico
    thr = 0.25
    peaks = [i for i in range(1, len(acf_vals)) if acf_vals[i] is not np.nan and acf_vals[i] > thr]
    if peaks:
        return peaks[0]
    return None

def safe_log_transform(x: pd.Series) -> pd.Series:
    # Log para positivos, de lo contrario devuelve original
    if (x > 0).all():
        return np.log(x)
    return x

def describe_normality(x: pd.Series, alpha: float = 0.05, sample_n: int = 5000) -> dict:
    x = x.dropna()
    n = len(x)
    if n == 0:
        return {"n": 0}

    # Muestras para Shapiro (máx 5000 recomendado)
    xs = x.sample(min(sample_n, n), random_state=42) if n > sample_n else x

    out = {"n": n}
    try:
        W, p = stats.shapiro(xs)
        out["shapiro_W"] = float(W)
        out["shapiro_p"] = float(p)
        out["shapiro_normal"] = bool(p > alpha)
    except Exception as e:
        out["shapiro_error"] = str(e)

    try:
        k2, p = stats.normaltest(x)  # D'Agostino
        out["dagostino_K2"] = float(k2)
        out["dagostino_p"] = float(p)
        out["dagostino_normal"] = bool(p > alpha)
    except Exception as e:
        out["dagostino_error"] = str(e)

    try:
        jb_stat, jb_p, _, _ = jarque_bera(x)
        out["jarque_bera"] = float(jb_stat)
        out["jarque_bera_p"] = float(jb_p)
        out["jarque_bera_normal"] = bool(jb_p > alpha)
    except Exception as e:
        out["jarque_bera_error"] = str(e)

    try:
        ad = stats.anderson(x, dist="norm")
        out["anderson_stat"] = float(ad.statistic)
        out["anderson_crit"] = [float(v) for v in ad.critical_values]
        out["anderson_sig"] = [float(v) for v in ad.significance_level]
        # No hay p-value directo en Anderson; interpretamos por niveles críticos
    except Exception as e:
        out["anderson_error"] = str(e)

    out["skew"] = float(stats.skew(x))
    out["kurtosis"] = float(stats.kurtosis(x, fisher=True))
    return out

def adf_kpss_tests(series: pd.Series, alpha: float = 0.05) -> dict:
    res = {}
    x = series.dropna().values
    if len(x) < 20:
        return {"error": "Serie demasiado corta para pruebas ADF/KPSS"}
    try:
        adf_stat, adf_p, _, _, adf_crit, _ = adfuller(series.dropna(), autolag="AIC")
        res["adf_stat"] = float(adf_stat)
        res["adf_p"] = float(adf_p)
        res["adf_stationary_at_alpha"] = bool(adf_p < alpha)
        res["adf_crit"] = {k: float(v) for k, v in adf_crit.items()}
    except Exception as e:
        res["adf_error"] = str(e)

    try:
        kpss_stat, kpss_p, _, kpss_crit = kpss(series.dropna(), regression="c", nlags="auto")
        res["kpss_stat"] = float(kpss_stat)
        res["kpss_p"] = float(kpss_p)
        res["kpss_stationary_at_alpha"] = bool(kpss_p > alpha)  # KPSS: H0 es estacionariedad
        res["kpss_crit"] = {k: float(v) for k, v in kpss_crit.items()}
    except Exception as e:
        res["kpss_error"] = str(e)

    return res


## 1) Carga y preparación de datos

In [None]:

# Carga del Excel
path = Path(FILE_PATH)
assert path.exists(), f"No se encontró el archivo: {path}"

df = pd.read_excel(path, sheet_name=SHEET_NAME)
print("Dimensiones:", df.shape)
df.head()


In [None]:

# Detección automática de columna de fecha y variable objetivo (consumo)
date_col = find_date_column(df)
target_col = find_target_column(df)

print("Columna de fecha detectada:", date_col)
print("Columna de consumo/objetivo detectada:", target_col)

# Conversión de fecha e indexado
if date_col is not None:
    df[date_col] = pd.to_datetime(df[date_col], errors="coerce", dayfirst=True, infer_datetime_format=True)
    df = df.sort_values(by=date_col)
    df = df.set_index(date_col)

# Limpieza básica de la columna objetivo
if target_col is not None:
    df[target_col] = pd.to_numeric(df[target_col], errors="coerce")

print("Valores nulos por columna:")
print(df.isna().sum())


In [None]:

# Vista rápida de la serie objetivo
if target_col is None:
    raise ValueError("No se detectó una columna objetivo numérica. Revisa los nombres o ajusta la heurística.")

series = df[target_col].dropna()
print("Cantidad de observaciones:", len(series))
display(series.head(10))


## 2) Distribución y supuestos de normalidad

In [None]:

# Histograma (una figura)
plt.figure()
series.plot(kind="hist", bins=30, edgecolor="black", alpha=0.7)
plt.title("Histograma del consumo")
plt.xlabel(target_col)
plt.ylabel("Frecuencia")
plt.show()


In [None]:

# Boxplot (una figura)
plt.figure()
plt.boxplot(series.dropna().values, vert=True, labels=[target_col])
plt.title("Boxplot del consumo")
plt.show()


In [None]:

# QQ-plot (una figura)
plt.figure()
qqplot(series.dropna(), line='s')
plt.title("QQ-plot vs Normal")
plt.show()


In [None]:

# Pruebas de normalidad y momentos
norm_info = describe_normality(series)
print("Pruebas de normalidad y momentos:")
for k, v in norm_info.items():
    print(f"- {k}: {v}")


## 3) Estacionalidad, tendencia y estacionariedad

In [None]:

# Serie temporal
if isinstance(df.index, pd.DatetimeIndex):
    plt.figure()
    series.plot()
    plt.title("Serie temporal del consumo")
    plt.xlabel("Fecha")
    plt.ylabel(target_col)
    plt.show()

    # Pruebas de estacionariedad
    print("\nResultados ADF y KPSS:")
    tests = adf_kpss_tests(series)
    for k, v in tests.items():
        print(f"- {k}: {v}")

    # ACF y PACF
    plt.figure()
    plot_acf(series.dropna(), lags=40)
    plt.title("ACF - Autocorrelación")
    plt.show()

    plt.figure()
    plot_pacf(series.dropna(), lags=40, method="ywm")
    plt.title("PACF - Autocorrelación parcial")
    plt.show()

    # Descomposición (periodo inferido)
    inferred_period = infer_seasonal_period(series)
    print("\nPeriodo estacional inferido (heurístico):", inferred_period)

    # Intentar log si aplica
    series_for_decomp = safe_log_transform(series)

    if inferred_period is not None and inferred_period >= 2:
        try:
            result = seasonal_decompose(series_for_decomp.dropna(), period=inferred_period, model="additive", extrapolate_trend="freq")
            # Observada
            plt.figure()
            result.observed.plot()
            plt.title("Descomposición - Observado")
            plt.show()
            # Tendencia
            plt.figure()
            result.trend.plot()
            plt.title("Descomposición - Tendencia")
            plt.show()
            # Estacional
            plt.figure()
            result.seasonal.plot()
            plt.title("Descomposición - Estacionalidad")
            plt.show()
            # Residual
            plt.figure()
            result.resid.plot()
            plt.title("Descomposición - Residual")
            plt.show()
        except Exception as e:
            print("Error en descomposición estacional:", e)
    else:
        print("No se pudo inferir un periodo confiable para descomposición.")
else:
    print("El índice no es de tipo fecha. Convierte una columna a fecha y vuelve a ejecutar.")


## 4) Correlación entre variables

In [None]:

# Matriz de correlación entre variables numéricas
num_df = df.select_dtypes(include=[np.number]).dropna(how="all")
if num_df.shape[1] >= 2:
    corr = num_df.corr(numeric_only=True)
    print("Matriz de correlación (numérica):")
    display(corr)

    # Mapa de calor simple (una figura, sin seaborn)
    plt.figure()
    plt.imshow(corr.values, aspect="auto")
    plt.xticks(range(corr.shape[1]), corr.columns, rotation=45, ha="right")
    plt.yticks(range(corr.shape[0]), corr.index)
    plt.title("Mapa de calor de correlación")
    plt.colorbar()
    plt.tight_layout()
    plt.show()
else:
    print("No hay suficientes columnas numéricas para una matriz de correlación.")


## 5) Sugerencias de modelado (guía)

In [None]:

print("""
Guía rápida (según supuestos observados):

1) Si la distribución es ~normal y sin estacionalidad marcada:
   - Z-score, modelos paramétricos, regresión lineal/gaussiana con residuos.

2) Si NO hay normalidad (sesgo fuerte) pero la dinámica temporal es importante:
   - Modelos robustos/no paramétricos: Isolation Forest, LOF, DBSCAN (con features de tiempo).
   - Modelos por residuos: Ajusta ARIMA/SARIMA/Prophet/LSTM y detecta anomalías en el residual.

3) Si hay estacionalidad clara (picos en ACF o descomposición):
   - SARIMA/Prophet. Detecta anomalías sobre el residual (valores que exceden umbrales en residuales).

4) Si existen múltiples variables explicativas (clima, categoría, ubicación, etc.):
   - Modelos multivariados: regresión robusta, random forest, XGBoost o autoencoders multivariados.
   - Detección de anomalías en el espacio de error de predicción.

5) Si la serie no es estacionaria (ADF > 0.05 y/o KPSS < 0.05):
   - Diferenciación (d=1) o transformaciones (log) antes de modelar.
""")
