In [None]:
# -*- coding: utf-8 -*-
"""
Script de depuracion completa de series horarias ambientales
Diseñado para su ejecucion local. Comentarios y cadenas en español.
Dependencias: pandas, numpy, scikit-learn, joblib, xgboost (opcional)
Ejecutar: python depuracion_pipeline.py --input ruta_fichero.csv [--stations_meta ruta_meta.csv]
"""

import os
import json
import math
import argparse
from datetime import timedelta

import numpy as np
import pandas as pd

from sklearn.ensemble import IsolationForest, RandomForestRegressor
from sklearn.impute import KNNImputer
from sklearn.experimental import enable_iterative_imputer  # noqa
from sklearn.impute import IterativeImputer
from sklearn.metrics import mean_absolute_error, mean_squared_error

try:
    import xgboost as xgb  # noqa
    XGBOOST_AVAILABLE = True
except Exception:
    XGBOOST_AVAILABLE = False

# Carpeta de salida
OUT_DIR = "depuracion_output"
os.makedirs(OUT_DIR, exist_ok=True)

# Variables esperadas en los datos
VARIABLES = [
    "O3", "NO", "NO2", "NOx",
    "wind_speed", "wind_dir", "temp", "pressure", "rad"
]

# Limites de deteccion orientativos
LOD = {
    "O3": 1.0,
    "NO": 0.1,
    "NO2": 0.1,
    "NOx": 0.1,
    "wind_speed": 0.01,
    "wind_dir": 0.0,
    "temp": -273.15,
    "pressure": 300.0,
    "rad": 0.0
}

# Rangos plausibles orientativos
RANGOS_PLAUSIBLES = {
    "O3": (0.0, 1000.0),
    "NO": (0.0, 500.0),
    "NO2": (0.0, 500.0),
    "NOx": (0.0, 1000.0),
    "wind_speed": (0.0, 80.0),
    "wind_dir": (0.0, 360.0),
    "temp": (-50.0, 60.0),
    "pressure": (250.0, 1100.0),
    "rad": (0.0, 2000.0)
}

# Parametros detectores e imputacion
HAMPEL_WINDOW_H = 12
HAMPEL_NSIGMA = 3.5
IFOREST_CONTAMINATION = 0.02
KNN_K = 5
ITERATIVE_IMPUTATIONS = 5
NOX_TOLERANCE = 20.0

# -----------------------
# Funciones utilitarias
# -----------------------

def ensure_datetime_index(df, time_col="timestamp"):
    """
    Asegura indice datetime con zona Europe Madrid y resample horario.
    Mantiene columnas no numericas rellenando por propagacion.
    """
    df2 = df.copy()
    if time_col in df2.columns:
        df2[time_col] = pd.to_datetime(df2[time_col], errors="coerce")
        df2 = df2.set_index(time_col)
    else:
        if not isinstance(df2.index, pd.DatetimeIndex):
            raise ValueError("Se requiere columna timestamp o indice datetime")
    # normalizar zona horaria
    try:
        if df2.index.tz is None:
            df2.index = df2.index.tz_localize("Europe/Madrid")
        else:
            df2.index = df2.index.tz_convert("Europe/Madrid")
    except Exception:
        df2.index = df2.index.tz_localize(None)
        df2.index = df2.index.tz_localize("Europe/Madrid")
    # separar columnas numericas y no numericas
    numeric_cols = df2.select_dtypes(include=[np.number]).columns.tolist()
    non_numeric_cols = [c for c in df2.columns if c not in numeric_cols]
    # resample numericas por media
    if len(numeric_cols) > 0:
        df_num = df2[numeric_cols].resample("H").mean()
    else:
        df_num = pd.DataFrame(index=pd.date_range(start=df2.index.min(), end=df2.index.max(), freq="H", tz="Europe/Madrid"))
    # resample no numericas por ffill
    df_non = pd.DataFrame(index=df_num.index)
    for c in non_numeric_cols:
        ser = df2[c].fillna(method="ffill").fillna(method="bfill")
        ser_r = ser.reindex(df_num.index, method="ffill")
        df_non[c] = ser_r
    df_out = pd.concat([df_num, df_non], axis=1)
    return df_out

def mark_non_detects(df, variables=VARIABLES, lod=LOD):
    for v in variables:
        flag = f"{v}_nd"
        if v in df.columns and v in lod:
            df[flag] = df[v].notna() & (df[v] < lod[v])
        else:
            df[flag] = False
    return df

def plausibility_flags(df, variables=VARIABLES, ranges=RANGOS_PLAUSIBLES):
    for v in variables:
        col = f"{v}_plausible"
        if v in df.columns and v in ranges:
            low, high = ranges[v]
            df[col] = df[v].between(low, high) | df[v].isna()
        else:
            df[col] = True
    return df

def hampel_filter(series, window_H=HAMPEL_WINDOW_H, n_sigma=HAMPEL_NSIGMA):
    k = int(window_H)
    if k < 1:
        k = 1
    rolled_med = series.rolling(window=2*k+1, center=True, min_periods=1).median()
    mad = series.rolling(window=2*k+1, center=True, min_periods=1).apply(lambda x: np.median(np.abs(x - np.median(x))), raw=True)
    threshold = n_sigma * 1.4826 * mad
    diff = (series - rolled_med).abs()
    outliers = diff > threshold
    return outliers.fillna(False)

def iqr_outliers(series, window_days=7, factor=1.5):
    win = int(window_days * 24)
    if win < 1:
        win = 24
    flags = pd.Series(False, index=series.index)
    for start in range(0, len(series), win):
        block = series.iloc[start:start+win]
        if len(block) == 0:
            continue
        q1 = np.nanpercentile(block, 25)
        q3 = np.nanpercentile(block, 75)
        iqr = q3 - q1
        low = q1 - factor * iqr
        high = q3 + factor * iqr
        fl = (block < low) | (block > high)
        flags.iloc[start:start+win] = fl.values
    return flags.fillna(False)

def robust_zscore(series, window_H=24, th=3.0):
    win = int(window_H)
    med = series.rolling(window=win, min_periods=1, center=True).median()
    mad = series.rolling(window=win, min_periods=1, center=True).apply(lambda x: np.median(np.abs(x - np.median(x))), raw=True)
    denom = mad.replace(0, np.nan) * 1.4826
    z = (series - med) / denom
    return (z.abs() > th).fillna(False)

def isolation_forest_flags(df_vars, contamination=IFOREST_CONTAMINATION, random_state=0):
    df_med = df_vars.fillna(df_vars.median())
    iso = IsolationForest(contamination=contamination, random_state=random_state)
    iso.fit(df_med.values)
    preds = iso.predict(df_med.values)
    flags = pd.Series(preds == -1, index=df_vars.index)
    return flags

def haversine(lat1, lon1, lat2, lon2):
    R = 6371000.0
    phi1 = np.radians(lat1)
    phi2 = np.radians(lat2)
    dphi = np.radians(lat2 - lat1)
    dlambda = np.radians(lon2 - lon1)
    a = np.sin(dphi / 2.0) ** 2 + np.cos(phi1) * np.cos(phi2) * np.sin(dlambda / 2.0) ** 2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    return R * c

# -----------------------
# Deteccion interestacional
# -----------------------

def interstation_anomaly(df, station_col, var, stations_meta, radius_km=50):
    """
    df debe tener columna station y timestamp en el indice
    stations_meta DataFrame con columnas station lat lon
    """
    df_reset = df.reset_index().rename(columns={"index": "timestamp"})
    pivot = df_reset.pivot(index="timestamp", columns=station_col, values=var)
    neighbors = {}
    for st in stations_meta["station"].unique():
        lat = stations_meta.loc[stations_meta["station"] == st, "lat"].values[0]
        lon = stations_meta.loc[stations_meta["station"] == st, "lon"].values[0]
        nbrs = []
        for st2 in stations_meta["station"].unique():
            if st2 == st:
                continue
            lat2 = stations_meta.loc[stations_meta["station"] == st2, "lat"].values[0]
            lon2 = stations_meta.loc[stations_meta["station"] == st2, "lon"].values[0]
            d = haversine(lat, lon, lat2, lon2) / 1000.0
            if d <= radius_km:
                nbrs.append(st2)
        neighbors[st] = nbrs

    flags = pd.Series(False, index=pivot.index)
    for st in pivot.columns:
        nbrs = neighbors.get(st, [])
        if len(nbrs) == 0:
            continue
        median_neighbors = pivot[nbrs].median(axis=1)
        diff = (pivot[st] - median_neighbors).abs()
        neigh_std = pivot[nbrs].std(axis=1).replace(0, np.nan)
        flag_series = diff > 3 * neigh_std
        flags |= flag_series.fillna(False)
    return flags

# -----------------------
# Imputacion
# -----------------------

def classify_gaps(series):
    mask = series.isna()
    gaps = []
    if mask.sum() == 0:
        return gaps
    in_gap = False
    start = None
    prev_t = None
    for t, m in mask.iteritems():
        if m and not in_gap:
            in_gap = True
            start = t
        elif not m and in_gap:
            end = prev_t
            length = int((end - start) / np.timedelta64(1, "h")) + 1
            gaps.append((start, end, length))
            in_gap = False
        prev_t = t
    if in_gap:
        end = prev_t
        length = int((end - start) / np.timedelta64(1, "h")) + 1
        gaps.append((start, end, length))
    return gaps

def impute_short(series):
    return series.interpolate(method="time", limit_direction="both")

def impute_medium(block_df, variables=VARIABLES, k=KNN_K):
    imputer = KNNImputer(n_neighbors=k)
    arr = imputer.fit_transform(block_df[variables])
    block_imp = block_df.copy()
    block_imp[variables] = arr
    return block_imp, imputer

def impute_long_model(df_block, target_var, features, model_type="rf", random_state=0):
    df_feat = df_block[features + [target_var]].copy()
    train = df_feat[df_feat[target_var].notna()].dropna()
    predict_idx = df_feat[df_feat[target_var].isna()].index
    if len(train) < 50:
        return df_block, None
    X = train[features].values
    y = train[target_var].values
    if model_type == "xgb" and XGBOOST_AVAILABLE:
        dtrain = xgb.DMatrix(X, label=y)
        params = {"objective": "reg:squarederror", "verbosity": 0}
        bst = xgb.train(params, dtrain, num_boost_round=100)
        if len(predict_idx) > 0:
            Xpred = df_feat.loc[predict_idx, features].fillna(df_feat[features].median()).values
            ypred = bst.predict(xgb.DMatrix(Xpred))
            df_block.loc[predict_idx, target_var] = ypred
        return df_block, bst
    else:
        rf = RandomForestRegressor(n_estimators=200, random_state=random_state, n_jobs=None)
        rf.fit(X, y)
        if len(predict_idx) > 0:
            Xpred = df_feat.loc[predict_idx, features].fillna(df_feat[features].median()).values
            ypred = rf.predict(Xpred)
            df_block.loc[predict_idx, target_var] = ypred
        return df_block, rf

def multiple_imputation_iterative(df_block, variables=VARIABLES, M=ITERATIVE_IMPUTATIONS, random_state=0):
    imputations = []
    for m in range(M):
        imp = IterativeImputer(sample_posterior=True, random_state=random_state + m, max_iter=10)
        arr = imp.fit_transform(df_block[variables])
        df_imp = df_block.copy()
        df_imp[variables] = arr
        imputations.append(df_imp)
    return imputations

# -----------------------
# Consistencia y reportes
# -----------------------

def check_nox_consistency(df, tolerance=NOX_TOLERANCE):
    if not all(v in df.columns for v in ["NO", "NO2", "NOx"]):
        df["nox_consistent"] = np.nan
        return df
    diff = np.abs(df["NOx"] - (df["NO"].fillna(0) + df["NO2"].fillna(0)))
    df["nox_consistent"] = diff <= tolerance
    df["nox_diff"] = diff
    return df

def quality_report(df, variables=VARIABLES):
    report = {}
    total = len(df)
    for v in variables:
        n_missing = int(df[v].isna().sum()) if v in df.columns else 0
        n_nd = int(df.get(f"{v}_nd", pd.Series(False, index=df.index)).sum())
        n_implausible = int((~df.get(f"{v}_plausible", pd.Series(True, index=df.index))).sum())
        report[v] = {
            "total": int(total),
            "missing": n_missing,
            "missing_frac": float(n_missing / total) if total > 0 else 0.0,
            "nd_count": n_nd,
            "implausible_count": n_implausible
        }
    out_path = os.path.join(OUT_DIR, "quality_report.json")
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(report, f, indent=2, ensure_ascii=False)
    return report

# -----------------------
# Pipeline principal
# -----------------------

def depuration_pipeline(df, stations_meta=None, station_col="station"):
    meta = {}
    # sincronizar y resample horario
    if "timestamp" not in df.columns and not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("El dataframe debe contener columna timestamp o indice datetime")
    if "timestamp" in df.columns:
        df = df.copy()
    df = df.reset_index().rename(columns={"index": "timestamp"}) if isinstance(df.index, pd.DatetimeIndex) and "timestamp" not in df.columns else df
    df = ensure_datetime_index(df, time_col="timestamp")

    # marcar no detectables y plausibilidad
    df = mark_non_detects(df)
    df = plausibility_flags(df)

    # deteccion de anomalias univariantes
    anomaly_flags = pd.DataFrame(index=df.index)
    for v in VARIABLES:
        if v not in df.columns:
            continue
        s = df[v]
        ham = hampel_filter(s)
        iqr = iqr_outliers(s)
        rz = robust_zscore(s)
        flags = ham | iqr | rz
        anomaly_flags[f"{v}_anomaly"] = flags

    # deteccion multivariante
    block = df[VARIABLES].copy()
    iso_flags = isolation_forest_flags(block)
    anomaly_flags["multivar_anomaly"] = iso_flags

    # comparacion interestacional si hay metadatos
    if stations_meta is not None and "station" in df.columns:
        for v in VARIABLES:
            try:
                interest_flags = interstation_anomaly(df, station_col, v, stations_meta)
                anomaly_flags[f"{v}_interstation"] = interest_flags.reindex(df.index, fill_value=False)
            except Exception:
                anomaly_flags[f"{v}_interstation"] = False

    # volcar flags en df
    for col in anomaly_flags.columns:
        df[col] = anomaly_flags[col]

    # imputacion jerarquizada por estacion
    df_imputed = df.copy()
    provenance = {}
    stations = df_imputed["station"].unique() if "station" in df_imputed.columns else [None]
    for st in stations:
        if st is None:
            sub_idx = df_imputed.index
        else:
            sub_idx = df_imputed[df_imputed["station"] == st].index
        sub = df_imputed.loc[sub_idx].copy()
        for v in VARIABLES:
            if v not in sub.columns:
                continue
            gaps = classify_gaps(sub[v])
            provenance.setdefault(v, []).append({"station": st, "n_gaps": len(gaps), "gaps": [(str(a), str(b), int(c)) for a, b, c in gaps]})
            for start, end, length in gaps:
                if length <= 6:
                    sub.loc[start:end, v] = impute_short(sub[v].loc[start:end])
                    for t in pd.date_range(start, end, freq="H"):
                        sub.loc[t, f"{v}_imputed_method"] = "interpolacion_corta"
                elif length <= 72:
                    window_start = max(sub.index.min(), start - pd.Timedelta(hours=72))
                    window_end = min(sub.index.max(), end + pd.Timedelta(hours=72))
                    block = sub.loc[window_start:window_end]
                    try:
                        block_imp, knn = impute_medium(block, variables=VARIABLES, k=KNN_K)
                        sub.loc[window_start:window_end, VARIABLES] = block_imp[VARIABLES]
                        for t in pd.date_range(start, end, freq="H"):
                            sub.loc[t, f"{v}_imputed_method"] = "knn_multivariante"
                    except Exception:
                        sub.loc[start:end, v] = impute_short(sub[v].loc[start:end])
                        for t in pd.date_range(start, end, freq="H"):
                            sub.loc[t, f"{v}_imputed_method"] = "interpolacion_fallback"
                else:
                    features = [c for c in VARIABLES if c != v]
                    try:
                        sub_filled, model = impute_long_model(sub, v, features, model_type="xgb" if XGBOOST_AVAILABLE else "rf")
                        sub = sub_filled
                        method_name = "model_xgb" if (model is not None and XGBOOST_AVAILABLE) else ("model_rf" if model is not None else "imputacion_fallback")
                        for t in pd.date_range(start, end, freq="H"):
                            sub.loc[t, f"{v}_imputed_method"] = method_name
                    except Exception:
                        sub.loc[start:end, v] = impute_short(sub[v].loc[start:end])
                        for t in pd.date_range(start, end, freq="H"):
                            sub.loc[t, f"{v}_imputed_method"] = "interpolacion_fallback"
        df_imputed.loc[sub_idx] = sub

    # imputacion multiple para estimar incertidumbre si procede
    imputed_fraction = {v: float(df_imputed[v].isna().mean()) if v in df_imputed.columns else 0.0 for v in VARIABLES}
    multiple_results = {}
    for v in VARIABLES:
        if imputed_fraction.get(v, 0.0) > 0.05:
            try:
                imputations = multiple_imputation_iterative(df_imputed, variables=VARIABLES)
                multiple_results[v] = imputations
            except Exception:
                multiple_results[v] = None

    # comprobaciones finales de consistencia
    df_final = check_nox_consistency(df_imputed)

    # informe de calidad
    report = quality_report(df_final)

    meta["provenance"] = provenance
    meta["imputed_fraction"] = imputed_fraction
    meta["report"] = report
    meta_path = os.path.join(OUT_DIR, "depuration_meta.json")
    with open(meta_path, "w", encoding="utf-8") as f:
        json.dump(meta, f, indent=2, ensure_ascii=False)

    out_csv = os.path.join(OUT_DIR, "df_depured.csv")
    df_final.reset_index().rename(columns={"index": "timestamp"}).to_csv(out_csv, index=False, encoding="utf-8")

    return df_final, meta, multiple_results

# -----------------------
# Interfaz linea de comandos
# -----------------------

def main():
    parser = argparse.ArgumentParser(description="Pipeline de depuracion de series ambientales")
    parser.add_argument("--input", required=True, help="Ruta al CSV de entrada con columna timestamp")
    parser.add_argument("--stations_meta", required=False, help="CSV con metadatos de estaciones columnas station lat lon")
    args = parser.parse_args()

    df = pd.read_csv(args.input)
    stations_meta = None
    if args.stations_meta:
        stations_meta = pd.read_csv(args.stations_meta)

    df_out, meta, multiple = depuration_pipeline(df, stations_meta=stations_meta)
    print("Depuracion completada. Ficheros guardados en", OUT_DIR)
    print("Resumen calidad por variable:")
    for k, v in meta["report"].items():
        print(k, " missing_frac:", round(v["missing_frac"], 3), " missing:", v["missing"])

if __name__ == "__main__":
    main()
