<a href="https://colab.research.google.com/github/MocT117/Another-one-/blob/master/Untitled44.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# -*- coding: utf-8 -*-
"""
Pronóstico semanal por material usando el plan del diagnóstico (barra de progreso)
con TODOS los parches:
- Ventana de entrenamiento reciente
- TSB con parámetros adaptativos por recencia
- ADIDA con k sesgado por recencia (y desagregación correcta)
- Penalización ADIDA para MTO
- Ajuste universal por recencia (down-weight y obsolescencia)

Salidas:
  outputs/forecast_weekly.csv
  outputs/forecast_summary.csv
  outputs/forecast_summary.xlsx (se intenta abrir)
"""

import os, sys, warnings, math, subprocess
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm

from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tools.sm_exceptions import ConvergenceWarning

warnings.simplefilter("ignore", ConvergenceWarning)
warnings.simplefilter("ignore", FutureWarning)

# ===================== PARÁMETROS =====================
OUTDIR = "outputs"

# Plan (diagnóstico)
PLAN_XLSX         = r"C:\Users\k1021104\outputs\diagnostico_demanda_final_completo.xlsx"
PLAN_CSV_FALLBACK = r"C:\Users\k1021104\outputs\diagnostico_demanda_final_completo.csv"
PLAN_SHEET        = 0

# Series semanales
USE_WEEKLY_PARQUET = True
WEEKLY_PARQUET     = os.path.join(OUTDIR, "weekly_demand.parquet")

# Excel crudo (fallback)
INPUT_XLSX = r"C:\Users\k1021104\OneDrive - Krones AG\Desktop\FollowupLayouts\MaterialesULT.XLSX"
SHEETS     = None

# Frecuencia y horizonte
FREQ        = "W-SUN"
DEFAULT_M   = 52
H           = 8

# SARIMA grid compacto
ORDERS      = [(0,1,1), (1,1,0), (1,1,1)]
SEAS_ORDERS = [(0,1,1), (1,1,0)]

# Recencia (universal)
RECENCY_HALFLIFE_ADIs = 1.0     # cada +1×ADI extra reduce 50%
OBSOLETE_RATIO        = 3.0     # ratio >= 3 => 0
TAG_RECENCY_SUFFIX    = "+REC"  # marca en model_used cuando se aplica

# Ventana de entrenamiento (reciente)
TRAIN_WINDOW = 104  # semanas (ajusta a 78/156 si deseas)

# Penalización ADIDA para MTO
MTO_MAX_ORDERS     = 9        # <=9 órdenes históricas => MTO
MTO_ADIDA_GAMMA    = 0.80     # factor multiplicativo (20% reducción)
MTO_COL_NAME       = "MTS/MTO"  # si existe en plan

# ===================== UTILIDADES BÁSICAS =====================
def open_in_excel(path: Path):
    try:
        if sys.platform.startswith("win"):
            os.startfile(str(path))  # type: ignore[attr-defined]
        elif sys.platform == "darwin":
            subprocess.Popen(["open", str(path)])
        else:
            subprocess.Popen(["xdg-open", str(path)])
        print(f"[OK] Abierto en Excel: {path.resolve()}")
    except Exception as e:
        print(f"[AVISO] No se pudo abrir automáticamente: {e}")

def safe_to_csv(df: pd.DataFrame, path: Path) -> Path:
    path.parent.mkdir(parents=True, exist_ok=True)
    try:
        df.to_csv(path, index=False, encoding="utf-8-sig")
        print(f"[OK] CSV guardado en: {path.resolve()}")
        return path
    except PermissionError:
        alt = path.with_stem(path.stem + "_ALT")
        df.to_csv(alt, index=False, encoding="utf-8-sig")
        print(f"[AVISO] Archivo abierto. Guardado como: {alt.resolve()}")
        return alt

def norm_id(series, width):
    s = series.astype(str).str.strip()
    s = s.str.replace(r"\.0$", "", regex=True)
    if width is not None:
        mask_num = s.str.fullmatch(r"\d+")
        s.loc[mask_num] = s.loc[mask_num].str.zfill(int(width))
    return s

def infer_width_from_plan(plan: pd.DataFrame):
    m = plan["Material"].astype(str).str.strip().str.replace(r"\.0$", "", regex=True)
    m_num = m[m.str.fullmatch(r"\d+")]
    return int(m_num.str.len().max()) if not m_num.empty else None

def _trim(y: pd.Series, w=TRAIN_WINDOW) -> pd.Series:
    return y.iloc[-w:] if (w and len(y) > w) else y

# ===================== CARGA DE SERIES =====================
def build_weekly_from_excel(df, date_col, qty_col, material_col, freq=FREQ):
    df = df[[date_col, qty_col, material_col]].copy()
    df[date_col] = pd.to_datetime(df[date_col], errors="coerce")
    df[qty_col]  = pd.to_numeric(df[qty_col], errors="coerce").fillna(0)
    df = df.dropna(subset=[date_col, material_col])

    out = {}
    n_mats = df[material_col].nunique()
    for mat, g in tqdm(df.groupby(material_col), total=n_mats, desc="Construyendo series (Excel)", unit="SKU"):
        s = g.set_index(date_col)[qty_col].resample(freq).sum()
        if s.empty:
            continue
        s = s.asfreq(freq, fill_value=0).astype(float)
        out[str(mat)] = s
    return out

def load_weekly_series():
    weekly = {}
    if USE_WEEKLY_PARQUET and os.path.exists(WEEKLY_PARQUET):
        dfw = pd.read_parquet(WEEKLY_PARQUET)
        dfw["material"] = dfw["material"].astype(str)
        n_mats = dfw["material"].nunique()
        for mat, g in tqdm(dfw.groupby("material"), total=n_mats, desc="Construyendo series (parquet)", unit="SKU"):
            s = pd.Series(g["qty"].values, index=pd.DatetimeIndex(g["week"])).asfreq(FREQ, fill_value=0).astype(float)
            weekly[str(mat)] = s
        print(f"[INFO] Series desde parquet: {len(weekly):,} materiales.")
        return weekly

    print("[INFO] No hay weekly_demand.parquet; leyendo Excel crudo…")
    xl = pd.read_excel(INPUT_XLSX, sheet_name=SHEETS, dtype=str)
    df = pd.concat(xl.values(), ignore_index=True) if isinstance(xl, dict) else xl
    if 'Material' not in df.columns:
        for alt in ['Materiales','MATNR','matnr','material']:
            if alt in df.columns:
                df = df.rename(columns={alt:'Material'})
                break
    weekly = build_weekly_from_excel(df, "Fecha del documento", "Cantidad de pedido", "Material", FREQ)
    print(f"[INFO] Series construidas desde Excel: {len(weekly):,} materiales.")
    return weekly

# ===================== MODELOS =====================
def fc_naive_seasonal(y_train, h, m=DEFAULT_M):
    if len(y_train) < m:
        val = float(y_train.iloc[-1]) if len(y_train) else 0.0
        return np.repeat(val, h), {"model":"Naive","m":m}
    last_cycle = y_train.iloc[-m:]
    reps = int(np.ceil(h / m))
    yhat = np.tile(last_cycle.values, reps)[:h]
    return yhat, {"model":"Naive","m":m}

def fc_ets(y_train, h, m=DEFAULT_M):
    y = y_train.values
    best_fit, best_aic, best_spec = None, None, None
    use_seasonal = (len(y_train) >= 2*m) and (np.any(y > 0))
    specs = []
    if use_seasonal:
        specs += [
            dict(trend="add", damped_trend=True,  seasonal="add", seasonal_periods=m),
            dict(trend="add", damped_trend=False, seasonal="add", seasonal_periods=m),
        ]
    specs += [
        dict(trend="add", damped_trend=True,  seasonal=None),
        dict(trend="add", damped_trend=False, seasonal=None),
        dict(trend=None,  damped_trend=False, seasonal=None),
    ]
    for sp in specs:
        try:
            fit = ExponentialSmoothing(y_train, **sp).fit(optimized=True, use_brute=True)
            aic = fit.aic
            if (best_aic is None) or (aic < best_aic):
                best_aic, best_fit, best_spec = aic, fit, sp
        except Exception:
            continue
    if best_fit is None:
        yhat, meta = fc_naive_seasonal(y_train, h, m)
        meta["fallback"] = "ETS->Naive"
        return yhat, meta
    return best_fit.forecast(h), {"model":"ETS","spec":best_spec,"m":m}

def _croston_update(y, alpha=0.1):
    Q, Z, pZ = 0.0, 0.0, 1
    first = True
    for x in y:
        if x > 0:
            if first:
                Q, Z = x, pZ; first = False
            else:
                Q = Q + alpha*(x - Q)
                Z = Z + alpha*(pZ - Z)
            pZ = 1
        else:
            pZ += 1
    return Q, Z

def fc_croston_sba(y_train, h, alpha=0.1):
    Q, Z = _croston_update(y_train.values, alpha=alpha)
    if Z == 0:
        return np.zeros(h), {"model":"CrostonSBA","alpha":alpha,"Q":Q,"Z":Z}
    yhat = (1 - alpha/2.0) * (Q / Z)
    return np.repeat(yhat, h), {"model":"CrostonSBA","alpha":alpha,"Q":Q,"Z":Z}

def fc_tsb(y_train, h, alpha=0.1, beta=0.1):
    y = y_train.values
    p, q, seen = 0.0, 0.0, False
    for x in y:
        I = 1.0 if x > 0 else 0.0
        p = p + alpha*(I - p)
        if I == 1:
            if not seen: q = x; seen = True
            else:       q = q + beta*(x - q)
    return np.repeat(p*q, h), {"model":"TSB","alpha":alpha,"beta":beta,"p_hat":p,"q_hat":q}

def fc_sarima(y_train, h, m=DEFAULT_M):
    if len(y_train) < (m + 24):
        yhat, meta = fc_naive_seasonal(y_train, h, m)
        meta["fallback"] = "SARIMA->Naive"
        return yhat, meta
    best_fit, best_aic, best_cfg = None, None, None
    for (p,d,q) in ORDERS:
        for (P,D,Q) in SEAS_ORDERS:
            try:
                fit = SARIMAX(y_train, order=(p,d,q),
                              seasonal_order=(P,D,Q,m),
                              enforce_stationarity=False,
                              enforce_invertibility=False).fit(disp=False)
                aic = fit.aic
                if (best_aic is None) or (aic < best_aic):
                    best_aic, best_fit, best_cfg = aic, fit, {"order":(p,d,q), "seasonal_order":(P,D,Q,m)}
            except Exception:
                continue
    if best_fit is None:
        yhat, meta = fc_naive_seasonal(y_train, h, m)
        meta["fallback"] = "SARIMA->Naive"
        return yhat, meta
    return best_fit.forecast(h), {"model":"SARIMA","cfg":best_cfg}

# ====== ADIDA (con desagregación correcta) ======
def fc_adida(y_weekly, h, base_model="ETS", m=DEFAULT_M, ks=(4,8,13), zero_ratio_target=0.30):
    y = y_weekly.astype(float).copy()

    def aggregate_by_k(series, k):
        n = len(series)
        nb = int(math.ceil(n / k))
        vals = []
        for b in range(nb):
            start = b*k
            end   = min((b+1)*k, n)
            vals.append(series.iloc[start:end].sum())
        idx = pd.date_range(series.index[0], periods=nb, freq=f"{k}W-SUN")
        return pd.Series(vals, index=idx)

    # elegir mejor k (simple) en el orden recibido
    best_k, best_sr, best_agg = None, None, None
    for k in ks:
        agg = aggregate_by_k(y, k)
        zero_ratio = float((agg <= 0).sum() / len(agg)) if len(agg) > 0 else 1.0
        pos_count  = int((agg > 0).sum())
        if (best_k is None) and (zero_ratio <= zero_ratio_target or pos_count >= 6):
            best_k, best_sr, best_agg = k, agg, (zero_ratio, pos_count)
    if best_k is None:
        best_k  = ks[-1]
        best_sr = aggregate_by_k(y, best_k)
        best_agg = ((best_sr <= 0).sum()/len(best_sr), int((best_sr > 0).sum()))

    # pronóstico en agregado
    if base_model.upper() == "ETS":
        yhat_agg, meta = fc_ets(best_sr, math.ceil(h / best_k), m=max(1, int(m/best_k)))
    elif base_model.upper() == "SARIMA":
        yhat_agg, meta = fc_sarima(best_sr, math.ceil(h / best_k), m=max(1, int(m/best_k)))
    else:
        yhat_agg, meta = fc_naive_seasonal(best_sr, math.ceil(h / best_k), m=max(1, int(m/best_k)))

    # desagregar (repartir el bloque entre k semanas)
    yhat_agg = np.asarray(yhat_agg, dtype=float)
    yhat_w   = np.repeat(yhat_agg / best_k, best_k)[:h]
    yhat_w   = np.clip(yhat_w, 0.0, None)

    meta.update({"model": f"ADIDA({base_model})", "k": best_k,
                 "zero_ratio_agg": best_agg[0], "pos_agg": best_agg[1]})
    return yhat_w, meta

# ===================== RECENCIA & ADAPTATIVOS =====================
def compute_recency_from_series(y: pd.Series, adi_fallback=None):
    last_pos_idx = y[y > 0].index.max() if (y > 0).any() else None
    last_index   = y.index.max()
    weeks_since_last = float(((last_index - last_pos_idx).days / 7)) if last_pos_idx is not None else float('inf')
    total_weeks = max(1, len(y))
    pos_weeks   = int((y > 0).sum())
    adi = (total_weeks / pos_weeks) if pos_weeks > 0 else (adi_fallback if adi_fallback else float('inf'))
    ratio = weeks_since_last / adi if (adi and adi > 0 and np.isfinite(adi)) else float('inf')

    if ratio <= 1.0:
        weight = 1.0
    else:
        decay_power = (ratio - 1.0) / max(RECENCY_HALFLIFE_ADIs, 1e-9)
        weight = 0.5 ** decay_power
    if ratio >= OBSOLETE_RATIO:
        weight = 0.0
    return ratio, weight

def adaptive_tsb_params(recency_ratio: float):
    if not np.isfinite(recency_ratio):
        return 0.05, 0.05
    if recency_ratio <= 0.5:
        return 0.30, 0.30
    elif recency_ratio <= 1.0:
        return 0.20, 0.20
    elif recency_ratio <= 1.5:
        return 0.15, 0.15
    else:
        return 0.08, 0.08

def choose_adida_ks(recency_ratio: float):
    if not np.isfinite(recency_ratio):
        return (13, 8, 4)
    if recency_ratio <= 0.75:
        return (4, 8, 13)
    elif recency_ratio <= 1.5:
        return (8, 4, 13)
    else:
        return (13, 8, 4)

def is_mto(plan_row: pd.Series) -> bool:
    # Preferimos flag explícito si viene
    if MTO_COL_NAME in plan_row.index:
        if str(plan_row[MTO_COL_NAME]).upper() == "MTO":
            return True
    # o por conteo de órdenes si existe
    for col in ["n_orders_orig", "n_pedidos", "orders_count"]:
        if col in plan_row.index:
            try:
                return float(plan_row[col]) <= MTO_MAX_ORDERS
            except Exception:
                pass
    return False

# ===================== PLAN I/O =====================
def load_plan_any(path_xlsx, sheet=0, csv_fallback=None):
    if os.path.exists(path_xlsx):
        plan = pd.read_excel(path_xlsx, sheet_name=sheet, dtype=str)
        src = "xlsx"
    elif csv_fallback and os.path.exists(csv_fallback):
        plan = pd.read_csv(csv_fallback, dtype=str, encoding="utf-8-sig")
        src = "csv"
    else:
        raise FileNotFoundError(f"No se encontró el plan en:\n- {path_xlsx}\n- {csv_fallback}")
    if "Material" not in plan.columns:
        raise KeyError("El plan no contiene la columna 'Material'.")
    return plan, src

def normalize_models_list(primary, candidates):
    norm = {
        "ETS":"ETS", "Holt-Winters":"ETS",
        "SARIMA":"SARIMA",
        "TSB":"TSB",
        "CrostonSBA":"CrostonSBA","Croston":"CrostonSBA",
        "NaiveEstacional":"Naive","Naive":"Naive",
        "ADIDA":"ADIDA"
    }
    lst = []
    if isinstance(primary, str) and primary.strip():
        lst.append(norm.get(primary.strip(), primary.strip()))
    if isinstance(candidates, str) and candidates.strip():
        for tok in candidates.split(";"):
            tok = tok.strip()
            if tok:
                lst.append(norm.get(tok, tok))
    seen = set(); out = []
    for m in lst:
        if m not in seen:
            out.append(m); seen.add(m)
    if not out:
        out = ["ETS","SARIMA","TSB","CrostonSBA","Naive"]
    return out

# ===================== ORQUESTADOR =====================
def forecast_one(y: pd.Series, plan_row: pd.Series, models: list, m: int,
                 low_data_flag: bool, forced_intermittent: bool,
                 apply_uplift: bool, uplift_percent: float):
    """
    Devuelve yhat (np.array) y meta(dict).
    Aplica:
      - ventana de entrenamiento
      - TSB adaptativo
      - ADIDA ks por recencia
      - penalización ADIDA para MTO
    """
    # recencia (calculada sobre la serie completa)
    rec_ratio, rec_weight = compute_recency_from_series(y)

    tried = []
    models_ord = models[:]
    if low_data_flag and "ADIDA" not in models_ord:
        models_ord = ["ADIDA"] + models_ord
    if forced_intermittent:
        front = [mm for mm in ["TSB","CrostonSBA"] if mm in models_ord]
        rest  = [mm for mm in models_ord if mm not in front]
        models_ord = front + rest

    for mod in models_ord:
        try:
            y_in = _trim(y, TRAIN_WINDOW)

            if mod == "ETS":
                yhat, meta = fc_ets(y_in, H, m)
            elif mod == "SARIMA":
                yhat, meta = fc_sarima(y_in, H, m)
            elif mod == "TSB":
                a, b = adaptive_tsb_params(rec_ratio)
                yhat, meta = fc_tsb(y_in, H, alpha=a, beta=b)
                meta["alpha"], meta["beta"] = a, b
            elif mod == "CrostonSBA":
                yhat, meta = fc_croston_sba(y_in, H, alpha=0.1)
            elif mod == "Naive":
                yhat, meta = fc_naive_seasonal(y_in, H, m)
            elif mod == "ADIDA":
                ks = choose_adida_ks(rec_ratio)
                yhat, meta = fc_adida(y_in, H, base_model="ETS", m=m, ks=ks)

                # Penalización ADIDA si es MTO
                if is_mto(plan_row):
                    yhat = yhat * MTO_ADIDA_GAMMA
                    meta["mto_penalty"] = MTO_ADIDA_GAMMA
            else:
                tried.append({"model":mod,"status":"skip"})
                continue

            if apply_uplift and uplift_percent and float(uplift_percent) > 0:
                yhat = yhat * (1.0 + float(uplift_percent))

            meta_out = {"model_used": mod, **meta,
                        "recency_ratio": rec_ratio, "recency_weight": rec_weight,
                        "tried": [mod] + [t["model"] for t in tried]}
            return yhat, meta_out

        except Exception as e:
            tried.append({"model":mod,"status":f"fail:{e}"})
            continue

    yhat, meta = fc_naive_seasonal(_trim(y, TRAIN_WINDOW), H, m)
    meta["fallback"] = "all_failed->Naive"
    return yhat, {"model_used":"Naive", **meta,
                  "recency_ratio": rec_ratio, "recency_weight": rec_weight,
                  "tried":[t['model'] for t in tried]}

def main():
    outdir = Path(OUTDIR).resolve()
    outdir.mkdir(parents=True, exist_ok=True)

    # 1) Plan
    print("[1/4] Leyendo plan…")
    plan, _ = load_plan_any(PLAN_XLSX, PLAN_SHEET, PLAN_CSV_FALLBACK)
    if "Material_excel" in plan.columns:
        plan = plan.drop(columns=["Material_excel"])
    width = infer_width_from_plan(plan)
    plan["Material"] = norm_id(plan["Material"], width)
    print(f"[INFO] Ancho observado en plan: {width}")

    # columnas control
    defaults = {"candidates":"", "seasonal_periods": DEFAULT_M, "low_data_flag": False,
                "forced_intermittent": False, "apply_uplift": False, "uplift_percent": 0.0}
    for k,v in defaults.items():
        if k not in plan.columns:
            plan[k] = v
    def to_int_m(x):
        try: return int(float(x))
        except Exception: return DEFAULT_M
    plan["seasonal_periods"] = plan["seasonal_periods"].apply(to_int_m)

    # 2) Series
    print("[2/4] Cargando/Construyendo series…")
    weekly = load_weekly_series()
    # normalizar llaves a mismo ancho
    weekly = { norm_id(pd.Series([k]), width).iloc[0]: v for k,v in weekly.items() }
    print(f"[INFO] Materiales en series (normalizados): {len(weekly):,}")

    mats_plan   = plan["Material"].astype(str).tolist()
    mats_series = set(weekly.keys())
    mats = [m for m in mats_plan if m in mats_series]
    missing = [m for m in mats_plan if m not in mats_series]
    print(f"[INFO] A pronosticar: {len(mats):,} | Faltan en series: {len(missing):,}")
    if missing:
        print(f"  Ejemplos faltantes: {missing[:10]}")

    # 3) Pronóstico
    print("[3/4] Pronosticando…")
    rows, wrows = [], []
    with tqdm(total=len(mats), desc="Pronosticando SKUs", unit="SKU") as pbar:
        for _, r in plan[plan["Material"].isin(mats)].iterrows():
            mat = str(r["Material"])
            y = weekly[mat]
            if len(y) < 4:
                pbar.update(1)
                pbar.set_postfix({"material": mat, "status": "serie_corta"})
                continue

            models = normalize_models_list(r.get("primary_model",""), r.get("candidates",""))
            m = int(r.get("seasonal_periods", DEFAULT_M) or DEFAULT_M)

            low     = str(r.get("low_data_flag", False)).lower() in {"true","1","yes"}
            finterm = str(r.get("forced_intermittent", False)).lower() in {"true","1","yes"}
            ap_upl  = str(r.get("apply_uplift", False)).lower() in {"true","1","yes"}
            upc     = float(r.get("uplift_percent", 0.0) or 0.0)

            yhat, meta = forecast_one(y, r, models, m, low, finterm, ap_upl, upc)

            # SUM/AVG base
            yhat_sum = float(np.sum(yhat))
            yhat_avg = float(np.mean(yhat))

            # Ajuste de recencia universal (down-weight)
            ratio, weight = meta.get("recency_ratio", np.nan), meta.get("recency_weight", np.nan)
            applied_recency = False
            if np.isfinite(ratio) and ratio > 1.0:
                yhat_sum *= weight
                yhat_avg *= weight
                applied_recency = True
            if np.isfinite(ratio) and ratio >= OBSOLETE_RATIO:
                yhat_sum = 0.0
                yhat_avg = 0.0
                applied_recency = True

            model_used = meta.get("model_used","NA")
            if applied_recency:
                model_used = f"{model_used}{TAG_RECENCY_SUFFIX}"

            # Detalle semanal
            idx_fut = pd.date_range(y.index.max() + pd.tseries.frequencies.to_offset(FREQ), periods=H, freq=FREQ)
            for i, ts in enumerate(idx_fut):
                wrows.append({"Material": mat, "week": ts, "yhat": float(yhat[i] if i < len(yhat) else 0.0)})

            # Resumen
            rows.append({
                "Material": mat,
                "yhat_sum_H": yhat_sum,
                "yhat_avg_H": yhat_avg,
                "model_used": model_used,
                "recency_ratio": ratio,
                "recency_weight": weight,
                "seasonal_periods_used": m
            })

            pbar.update(1)
            pbar.set_postfix({"material": mat, "modelo": model_used})

    df_weekly  = pd.DataFrame(wrows)
    df_summary = pd.DataFrame(rows)

    # 4) Guardar
    print("[4/4] Guardando…")
    safe_to_csv(df_weekly,  Path(OUTDIR) / "forecast_weekly.csv")
    safe_to_csv(df_summary, Path(OUTDIR) / "forecast_summary.csv")

    try:
        xlsx = Path(OUTDIR) / "forecast_summary.xlsx"
        with pd.ExcelWriter(xlsx, engine="xlsxwriter") as writer:
            df_summary.to_excel(writer, index=False, sheet_name="Resumen")
            wb, ws = writer.book, writer.sheets["Resumen"]
            text_fmt = wb.add_format({'num_format':'@'})
            col_idx = list(df_summary.columns).index('Material')
            ws.set_column(col_idx, col_idx, 22, text_fmt)
        print(f"[OK] XLSX guardado: {xlsx.resolve()}")
        open_in_excel(xlsx)
    except Exception as e:
        print(f"[AVISO] No se pudo escribir/abrir XLSX: {e}")

if __name__ == "__main__":
    main()