# Pronóstico de ventas diarias por Tienda

* **Dataset**: `ventas_por_tienda_dia.parquet` – cada fila ya es la venta agregada de una tienda en una fecha.
* **Modelos**:
  * **LightGBM** multiserie con `skforecast` (versión 0.15.x)
  * **Regresión lineal** (baseline)

El cuaderno:

1. Lee el parquet con PySpark (sin agregaciones).
2. Completa huecos del calendario y muestra los días rellenados.
3. Genera variables de calendario.
4. Entrena, valida y calcula MAE/RMSE de ambos modelos.
5. Maneja índices, tipos y columnas duplicadas para evitar errores.

In [1]:
# Instalar dependencias si hace falta
# %pip install pyspark pandas numpy lightgbm scikit-learn skforecast==0.15.0

In [2]:
import pandas as pd
import numpy as np

In [3]:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName('ForecastTiendaDiario').getOrCreate()

In [4]:
PARQUET_PATH = 'ventas_por_tienda_dia.parquet'  # archivo local
START_DATE   = '2022-01-02'
END_DATE     = '2025-02-28'

In [5]:

# 1 · Leer parquet ya agregado (Tienda-diario)
df = (spark.read.parquet(PARQUET_PATH)
         .filter((F.col('FechaVenta') >= F.lit(START_DATE)) & (F.col('FechaVenta') <= F.lit(END_DATE)))
         .select('FechaVenta', 'Tienda', 'PrecioVta'))

print("Filas leídas:", df.count())
df.show(3)


Filas leídas: 194168
+----------+------+---------+
|FechaVenta|Tienda|PrecioVta|
+----------+------+---------+
|2022-01-02|   203|  1055990|
|2022-01-03|   203|  1233800|
|2022-01-04|   203|  1512980|
+----------+------+---------+
only showing top 3 rows



In [6]:

# 2 · Pivot: columnas = Tienda
pivot = (df.groupBy('FechaVenta')
           .pivot('Tienda')
           .agg(F.first('PrecioVta'))
           .fillna(0)
           .orderBy('FechaVenta'))

pivot_pd = pivot.toPandas()
pivot_pd['FechaVenta'] = pd.to_datetime(pivot_pd['FechaVenta'])
pivot_pd = pivot_pd.sort_values('FechaVenta')
y_wide = pivot_pd.set_index('FechaVenta').astype('float32')
del pivot_pd
print("Shape matriz ancha:", y_wide.shape)


Shape matriz ancha: (1150, 191)


In [7]:

# 3 · Completar calendario diario y mostrar huecos
idx_full = pd.date_range(start=y_wide.index.min(), end=y_wide.index.max(), freq='D')
missing = idx_full.difference(y_wide.index)
print(f"Se rellenaron {len(missing)} días faltantes.")
if len(missing):
    print('Primeros huecos:', missing[:10].date.tolist())
y_wide = y_wide.reindex(idx_full, fill_value=0)


Se rellenaron 4 días faltantes.
Primeros huecos: [datetime.date(2023, 1, 1), datetime.date(2023, 12, 25), datetime.date(2024, 1, 1), datetime.date(2025, 1, 1)]


In [8]:

# 4 · Split y exógenos
train_end = pd.Timestamp('2023-12-31')
valid_end = pd.Timestamp('2024-09-30')

y_train = y_wide.loc[:train_end]
y_valid = y_wide.loc[train_end + pd.Timedelta(days=1): valid_end]

def calendar(idx):
    cal = pd.DataFrame(index=idx)
    cal['dow'] = idx.dayofweek.astype('int8')
    cal['weekend'] = (cal['dow']>=5).astype('int8')
    cal['day'] = idx.day.astype('int8')
    cal['month'] = idx.month.astype('int8')
    cal['year'] = idx.year.astype('int16')
    cal['weekofyear'] = idx.isocalendar().week.astype('int8')
    return cal

X_train = calendar(y_train.index)
X_valid = calendar(y_valid.index)


In [9]:
# 5 · LightGBM multiserie
from lightgbm import LGBMRegressor
from skforecast.recursive import ForecasterRecursiveMultiSeries
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np

lgb = LGBMRegressor(objective='regression', n_estimators=600, learning_rate=0.05,
                    subsample=0.8, random_state=42, n_jobs=-1)
fc  = ForecasterRecursiveMultiSeries(regressor=lgb, lags=[1,7,14])
fc.fit(series=y_train, exog=X_train)        # y_train mantiene columnas INT

# ── PREDICCIÓN VALID  (sin levels=…) ────────────────────
pred_long = fc.predict(
    steps = len(y_valid),
    exog  = X_valid        # ← sin levels
)

# 1) Si viene largo ['level', 'pred']  ➜  pivot a ancho
if list(pred_long.columns) == ['level', 'pred']:
    pred = (pred_long
             .pivot(columns='level', values='pred')
             .astype('float32'))
    pred.index = y_valid.index
else:
    if isinstance(pred_long.columns, pd.MultiIndex):
        pred = pred_long.groupby(level=0, axis=1).first()
    else:
        pred = pred_long

# 2) Normalizar nombres y alinear
pred.columns    = pred.columns.astype(str)
y_valid.columns = y_valid.columns.astype(str)
common = y_valid.columns.intersection(pred.columns)
print("Columnas comunes:", len(common))   # ahora > 0

pred   = pred[common]
y_eval = y_valid[common]

# 3) Métricas
mae = mean_absolute_error(y_eval.values.flatten(),
                          pred.values.flatten())

mse = mean_squared_error(y_eval.values.flatten(),
                         pred.values.flatten())
rmse = np.sqrt(mse)              # √MSE  →  RMSE

print(f"LightGBM VALID  MAE : {mae :.2f} | RMSE : {rmse :.2f}")

  from .autonotebook import tqdm as notebook_tqdm


[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.002551 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 1066
[LightGBM] [Info] Number of data points in the train set: 136565, number of used features: 10
[LightGBM] [Info] Start training from score 2640374.094036
Columnas comunes: 191
LightGBM VALID  MAE : 1172365.12 | RMSE : 1665904.65



## 📊 Interpretación de resultados

| Modelo | MAE (VALID) | RMSE (VALID) |
|--------|-------------|--------------|
| **LightGBM** | **1 172 365** | **1 665 905** |
| Regresión lineal (baseline) | 1 327 552 | 1 856 836 |

**LightGBM supera al baseline** en un ~12 % de MAE y ~10 % de RMSE, lo que
confirma que las relaciones no‑lineales y las interacciones entre lags y
variables de calendario capturadas por los árboles aportan valor.

### Recomendaciones inmediatas

1. **Guardar modelos**  
   ```python
   import pickle, joblib
   pickle.dump(best_fc, open("lightgbm_forecaster.pkl","wb"))
   joblib.dump(pipe, "baseline_linear.pkl")
   ```

2. **Hiperparámetros**  
   *Probar `min_child_samples`, `lambda_l1/l2`, `num_leaves` más altos,
   y lags adicionales `[2,3,21,30]`.*

3. **Variables exógenas**  
   Incluir festivos regionales, campañas promocionales y clima para cada
   tienda puede reducir error en picos.

4. **Validación rolling**  
   Realizar back‑testing con ventanas deslizantes (p. ej. 3× 90 días)
   para garantizar estabilidad a lo largo de 2024‑2025.

5. **Despliegue**  
   Montar un job diario (Airflow o similar) que:
   1. Actualice el calendario.
   2. Cargue el modelo.
   3. Genere la previsión del día + 14.


In [11]:
# ============================================================
# 6 · Grid search manual y predicción TEST
# ============================================================

from itertools import product
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
import numpy as np, pandas as pd, time

# ---------- Configuración -----------------------------------
param_grid = {
    'n_estimators'  : [300, 600, 900],
    'learning_rate' : [0.05, 0.03],
    'num_leaves'    : [31, 63],
    'max_depth'     : [-1, 8],
    'subsample'     : [0.8],
    'colsample_bytree':[1.0, 0.8]
}
lags_used = [1, 7, 14, 28]        # lags fijos
valid_steps = len(y_valid)        # tamaño ventana VALID

def param_combos(grid):
    keys, vals = zip(*grid.items())
    for combo in product(*vals):
        yield dict(zip(keys, combo))

results = []
t0 = time.time()

print("⏳ Buscando mejores hiperparámetros…")
for i, params in enumerate(param_combos(param_grid), 1):
    lgb = LGBMRegressor(objective='regression', random_state=42,
                        n_jobs=-1, **params)

    fore = ForecasterRecursiveMultiSeries(regressor=lgb, lags=lags_used)
    fore.fit(series=y_train, exog=X_train)

    # --- predicción VALID ---
    pred_long = fore.predict(steps=valid_steps, exog=X_valid)

    # Formato largo → ancho
    if list(pred_long.columns) == ['level', 'pred']:
        pred = (pred_long.pivot(columns='level', values='pred')
                          .astype('float32'))
        pred.index = y_valid.index
    else:
        pred = (pred_long.groupby(level=0, axis=1).first()
                if isinstance(pred_long.columns, pd.MultiIndex)
                else pred_long)

    # Alinear nombres
    pred.columns    = pred.columns.astype(str)
    y_valid.columns = y_valid.columns.astype(str)
    common = y_valid.columns.intersection(pred.columns)
    rmse   = np.sqrt(mean_squared_error(
                y_valid[common].values.flatten(),
                pred[common].values.flatten()))
    results.append({'params': params, 'rmse': rmse})
    print(f"[{i:02d}] RMSE VALID: {rmse:,.0f}  |  {params}")

best = min(results, key=lambda d: d['rmse'])
best_params = best['params']
print(f"\n✅ Mejores hiperparámetros: {best_params}")
print(f"RMSE VALID: {best['rmse']:,.0f}")
print(f"Tiempo búsqueda: {(time.time()-t0)/60:.1f} min")

# ============================================================
# 7 · Re-entrenar con train+valid  y predecir TEST
# ============================================================

y_train_val = pd.concat([y_train, y_valid])
X_train_val = calendar(y_train_val.index)

best_lgb = LGBMRegressor(objective='regression', random_state=42,
                         n_jobs=-1, **best_params)

best_fc = ForecasterRecursiveMultiSeries(regressor=best_lgb, lags=lags_used)
best_fc.fit(series=y_train_val, exog=X_train_val)

⏳ Buscando mejores hiperparámetros…
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.011926 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1321
[LightGBM] [Info] Number of data points in the train set: 133891, number of used features: 11
[LightGBM] [Info] Start training from score 2671624.256858
[01] RMSE VALID: 1,688,084  |  {'n_estimators': 300, 'learning_rate': 0.05, 'num_leaves': 31, 'max_depth': -1, 'subsample': 0.8, 'colsample_bytree': 1.0}
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.002437 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 1321
[LightGBM] [Info] Number of data points in the train set: 133891, number of used features: 11
[LightGBM] [Info] Start training from score 2671624.256858
[02] RMSE VALID: 1,712,474  |  {'n_estimators': 

In [12]:
# --- preparar TEST ---
test_start = valid_end + pd.Timedelta(days=1)
test_end   = y_wide.index.max()
y_test     = y_wide.loc[test_start:test_end]
X_test     = calendar(y_test.index)

# --- predicción TEST ---
pred_long = best_fc.predict(steps=len(y_test), exog=X_test)

if list(pred_long.columns) == ['level', 'pred']:
    pred_test = (pred_long.pivot(columns='level', values='pred')
                          .astype('float32'))
    pred_test.index = y_test.index
else:
    pred_test = (pred_long.groupby(level=0, axis=1).first()
                 if isinstance(pred_long.columns, pd.MultiIndex)
                 else pred_long)

pred_test.columns = pred_test.columns.astype(str)
y_test.columns    = y_test.columns.astype(str)
common = y_test.columns.intersection(pred_test.columns)

mae_test  = mean_absolute_error(
                y_test[common].values.flatten(),
                pred_test[common].values.flatten())
rmse_test = np.sqrt(
                mean_squared_error(
                    y_test[common].values.flatten(),
                    pred_test[common].values.flatten()))

print(f"\n📊 TEST MAE  : {mae_test:,.0f}")
print(f"📊 TEST RMSE : {rmse_test:,.0f}")

# --- guardar predicciones ---
pred_test.to_csv("predicciones_test_tienda.csv")
print("📁 Archivo guardado: predicciones_test_tienda.csv")



📊 TEST MAE  : 1,770,046
📊 TEST RMSE : 2,991,042
📁 Archivo guardado: predicciones_test_tienda.csv


In [14]:
# 6 · Baseline Regresión lineal
long_pdf = df.filter(F.col('FechaVenta') <= F.lit(valid_end.strftime('%Y-%m-%d'))).toPandas()
long_pdf['FechaVenta'] = pd.to_datetime(long_pdf['FechaVenta'])
for col, func in {'dow':'dayofweek','day':'day','month':'month','year':'year'}.items():
    long_pdf[col] = getattr(long_pdf['FechaVenta'].dt, func).astype('int16')
long_pdf['weekend'] = (long_pdf['dow']>=5).astype('int8')
long_pdf['weekofyear'] = long_pdf['FechaVenta'].dt.isocalendar().week.astype('int8')

mask_tr = long_pdf['FechaVenta'] <= train_end
mask_val= (long_pdf['FechaVenta'] > train_end) & (long_pdf['FechaVenta'] <= valid_end)

from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression

X_cols = ['Tienda','dow','weekend','day','month','year','weekofyear']
y_col  = 'PrecioVta'

ohe = OneHotEncoder(handle_unknown='ignore', sparse_output=True)  # ← cambio
prep = ColumnTransformer(
        transformers=[('cat', ohe, ['Tienda'])],
        remainder='passthrough'
)

pipe = Pipeline([
    ('prep', prep),
    ('reg',  LinearRegression())
])

pipe.fit(long_pdf.loc[mask_tr, X_cols], long_pdf.loc[mask_tr, y_col])
pred_lin = pipe.predict(long_pdf.loc[mask_val, X_cols])

from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np
mae_lin = mean_absolute_error(long_pdf.loc[mask_val, y_col], pred_lin)
rmse_lin = np.sqrt(mean_squared_error(long_pdf.loc[mask_val, y_col], pred_lin))
print(f"LinearReg VALID  MAE : {mae_lin:,.0f} | RMSE : {rmse_lin:,.0f}")



LinearReg VALID  MAE : 1,327,552 | RMSE : 1,856,836


In [15]:
spark.stop()