In [2]:
!pip install pandas numpy sqlalchemy psycopg2-binary scikit-learn lightgbm joblib


Collecting lightgbm
  Downloading lightgbm-4.6.0-py3-none-win_amd64.whl.metadata (17 kB)
Downloading lightgbm-4.6.0-py3-none-win_amd64.whl (1.5 MB)
   ---------------------------------------- 0.0/1.5 MB ? eta -:--:--
   ---------------------------------------- 1.5/1.5 MB 9.5 MB/s eta 0:00:00
Installing collected packages: lightgbm
Successfully installed lightgbm-4.6.0


In [1]:
import os
import json
import logging
import pandas as pd
import numpy as np
import joblib
from datetime import timedelta, datetime, date

from sqlalchemy import create_engine, text

from sklearn.metrics import mean_absolute_error, mean_squared_error
import lightgbm as lgb

import boto3
import requests

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# =========
# Config
# =========
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "postgres")
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASS = os.getenv("DB_PASS")  # ponlo vía Secrets o env var
TABLE_NAME = os.getenv("TABLE_NAME", "demanda_ree")

MODEL_PATH = "/tmp/lgb_demand_model.joblib"   # /tmp en Lambda
FEATURES_PATH = "/tmp/features_cols.txt"

S3_BUCKET = os.getenv("S3_BUCKET")
S3_MODEL_KEY = os.getenv("S3_MODEL_KEY", "models/demanda/lgbm/model.joblib")
S3_FEATURES_KEY = os.getenv("S3_FEATURES_KEY", "models/demanda/lgbm/features_cols.txt")

API_URL_RELOAD = os.getenv("API_URL_RELOAD", "")
API_TOKEN = os.getenv("API_TOKEN", "")

# Usa psycopg2-binary (simple) o cambia a psycopg v3 si prefieres
CONN_STR = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(CONN_STR, pool_pre_ping=True)


# ============================
# 1) EXTRAER DATOS DE AURORA
# ============================
def load_data_from_rds(limit_rows=None):
    query = f'SELECT id, fecha, demanda_mw FROM {TABLE_NAME} ORDER BY fecha ASC'
    if limit_rows:
        query += f" LIMIT {int(limit_rows)}"
    with engine.connect() as conn:
        df = pd.read_sql(query, conn)
    return df


In [2]:

lee Aurora,

entrena LightGBM,

sube model.joblib + features_cols.txt a S3,

genera 7 días y los guarda en Aurora,

(opcional) llama a /reload-model de tu API.

import os
import json
import logging
import pandas as pd
import numpy as np
import joblib
from datetime import timedelta, datetime, date

from sqlalchemy import create_engine, text

from sklearn.metrics import mean_absolute_error, mean_squared_error
import lightgbm as lgb

import boto3
import requests

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# =========
# Config
# =========
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "postgres")
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASS = os.getenv("DB_PASS")  # ponlo vía Secrets o env var
TABLE_NAME = os.getenv("TABLE_NAME", "demanda_ree")

MODEL_PATH = "/tmp/lgb_demand_model.joblib"   # /tmp en Lambda
FEATURES_PATH = "/tmp/features_cols.txt"

S3_BUCKET = os.getenv("S3_BUCKET")
S3_MODEL_KEY = os.getenv("S3_MODEL_KEY", "models/demanda/lgbm/model.joblib")
S3_FEATURES_KEY = os.getenv("S3_FEATURES_KEY", "models/demanda/lgbm/features_cols.txt")

API_URL_RELOAD = os.getenv("API_URL_RELOAD", "")
API_TOKEN = os.getenv("API_TOKEN", "")

# Usa psycopg2-binary (simple) o cambia a psycopg v3 si prefieres
CONN_STR = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(CONN_STR, pool_pre_ping=True)


# ============================
# 1) EXTRAER DATOS DE AURORA
# ============================
def load_data_from_rds(limit_rows=None):
    query = f'SELECT id, fecha, demanda_mw FROM {TABLE_NAME} ORDER BY fecha ASC'
    if limit_rows:
        query += f" LIMIT {int(limit_rows)}"
    with engine.connect() as conn:
        df = pd.read_sql(query, conn)
    return df


# =========================================
# 2) PREPROCESS / FEATURE ENGINEERING
# =========================================
def preprocess_and_features(df):
    df = df.copy()
    df['fecha'] = pd.to_datetime(df['fecha']).dt.date
    df['fecha'] = pd.to_datetime(df['fecha'])
    df = df.sort_values('fecha').drop_duplicates(subset='fecha').reset_index(drop=True)

    # Features temporales
    df['dayofweek'] = df['fecha'].dt.dayofweek
    df['month'] = df['fecha'].dt.month
    df['is_weekend'] = df['dayofweek'].isin([5,6]).astype(int)

    # Lags
    for l in [1, 7, 14, 30]:
        df[f'lag_{l}'] = df['demanda_mw'].shift(l)

    # Rolling
    for w in [7, 14, 30]:
        df[f'roll_mean_{w}'] = df['demanda_mw'].shift(1).rolling(window=w, min_periods=1).mean()
        df[f'roll_std_{w}'] = df['demanda_mw'].shift(1).rolling(window=w, min_periods=1).std().fillna(0)

    df = df.dropna().reset_index(drop=True)
    return df



In [3]:

# ====================================
# 3) TRAIN/SPLIT temporal
# ====================================
def temporal_train_test_split(df, val_size_days=500, test_size_days=500):
    n = len(df)
    if n < (val_size_days + test_size_days + 10):
        raise ValueError("Muy pocos datos para dividir en train/val/test.")
    train_end = n - val_size_days - test_size_days
    val_end = n - test_size_days
    train = df.iloc[:train_end].copy()
    val = df.iloc[train_end:val_end].copy()
    test = df.iloc[val_end:].copy()
    return train, val, test

In [4]:

# ======================
# 4) ENTRENAR LGBM
# ======================
def train_lightgbm(train_df, val_df, features, target='demanda_mw', params=None):
    train_data = lgb.Dataset(train_df[features], label=train_df[target])
    val_data = lgb.Dataset(val_df[features], label=val_df[target])
    default_params = {
        "objective": "regression",
        "metric": "rmse",
        "verbosity": -1,
        "boosting_type": "gbdt",
        "learning_rate": 0.005,
        "num_leaves": 31,
        "n_estimators": 1000,
    }
    if params:
        default_params.update(params)
    gbm = lgb.train(
        default_params,
        train_data,
        valid_sets=[val_data],
        callbacks=[
            lgb.early_stopping(stopping_rounds=50),
            lgb.log_evaluation(period=50)
        ]
    )
    return gbm



In [11]:

# ======================
# 5) EVALUAR
# ======================
def evaluate_model(model, df, features, target='demanda_mw'):
    preds = model.predict(df[features])
    mae = mean_absolute_error(df[target], preds)
    rmse = mean_squared_error(df[target], preds, squared=False)
    mape = np.mean(np.abs((df[target] - preds) / (df[target] + 1e-9))) * 100
    accuracy = 100 - mape
    return {"MAE": mae, "RMSE": rmse, "MAPE(%)": mape, "ACC(%)": accuracy}


In [12]:

# =====================================================
# 6) PREDECIR 7 DÍAS
# =====================================================
def predict_future(model, last_df, features, horizon=7):
    df_work = last_df.copy().sort_values('fecha').reset_index(drop=True)
    preds = []
    for _ in range(horizon):
        next_time = df_work['fecha'].iloc[-1] + pd.Timedelta(days=1)
        row = {"fecha": next_time}
        row['dayofweek'] = next_time.dayofweek
        row['month'] = next_time.month
        row['is_weekend'] = int(next_time.dayofweek in [5,6])
        for l in [1,7,14,30]:
            row[f'lag_{l}'] = df_work['demanda_mw'].iloc[-l] if l <= len(df_work) else np.nan
        for w in [7,14,30]:
            vals = df_work['demanda_mw'].shift(1).iloc[-w:] if len(df_work) >= 1 else []
            if len(vals) > 0:
                row[f'roll_mean_{w}'] = float(vals.mean())
                row[f'roll_std_{w}'] = float(vals.std()) if len(vals) > 1 else 0.0
            else:
                row[f'roll_mean_{w}'] = np.nan
                row[f'roll_std_{w}'] = np.nan
        row_df = pd.DataFrame([row])
        for c in features:
            if c not in row_df.columns:
                row_df[c] = np.nan
        row_df = row_df[features].fillna(df_work[features].mean(numeric_only=True))
        pred = float(model.predict(row_df)[0])
        preds.append({"fecha": next_time, "demanda_mw": pred})
        df_work = pd.concat([df_work, pd.DataFrame([{"fecha": next_time, "demanda_mw": pred}])], ignore_index=True)
    return pd.DataFrame(preds)



In [None]:
# =======================================================
# 7) GUARDAR FORECAST EN AURORA (UPSERT)
# =======================================================
def save_forecast_to_db(preds_df, model_version="v1", ambito="ES"):
    if preds_df.empty:
        logger.warning("No hay predicciones para guardar.")
        return
    fdate = date.today()
    rows = [
        {
            "forecast_date": fdate,
            "target_date": pd.to_datetime(r["fecha"]).date(),
            "ambito": ambito,
            "y_hat": float(r["demanda_mw"]),
            "model_version": model_version
        }
        for _, r in preds_df.iterrows()
    ]
    sql = text("""
        INSERT INTO demanda_ree_forecast (forecast_date, target_date, ambito, y_hat, model_version)
        VALUES (:forecast_date, :target_date, :ambito, :y_hat, :model_version)
        ON CONFLICT (forecast_date, target_date, ambito)
        DO UPDATE SET y_hat = EXCLUDED.y_hat, model_version = EXCLUDED.model_version
    """)
    with engine.begin() as conn:
        conn.execute(sql, rows)
    logger.info(f"Guardadas {len(rows)} predicciones en demanda_ree_forecast (forecast_date={fdate}).")

In [None]:
# ==========================================
# 8) SUBIR ARTEFACTOS A S3
# ==========================================
def upload_artifacts_to_s3(model_path, features_path):
    s3 = boto3.client("s3")
    with open(model_path, "rb") as f:
        s3.upload_fileobj(f, S3_BUCKET, S3_MODEL_KEY)
    with open(features_path, "rb") as f:
        s3.upload_fileobj(f, S3_BUCKET, S3_FEATURES_KEY)
    logger.info(f"Subidos a s3://{S3_BUCKET}/{S3_MODEL_KEY} y s3://{S3_BUCKET}/{S3_FEATURES_KEY}")



In [None]:
# ===================================================
# 9) (Opcional) NOTIFICAR A LA API /reload-model
# ===================================================
def notify_api_reload():
    if not API_URL_RELOAD:
        logger.info("API_URL_RELOAD vacío: no se notifica a la API.")
        return
    headers = {"Content-Type": "application/json"}
    if API_TOKEN:
        headers["Authorization"] = f"Bearer {API_TOKEN}"
    try:
        r = requests.post(API_URL_RELOAD, headers=headers, timeout=10)
        r.raise_for_status()
        logger.info("API /reload-model notificada OK: %s", r.text)
    except Exception as e:
        logger.warning("No se pudo notificar a /reload-model: %s", e

In [None]:
# =========================
# 10) HANDLER LAMBDA
# =========================
def lambda_handler(event, context):
    logger.info("Inicio job entrenamiento+forecast")
    df_raw = load_data_from_rds()
    logger.info("Registros extraídos: %s", len(df_raw))

    df_proc = preprocess_and_features(df_raw)
    logger.info("Tras preprocess: %s", len(df_proc))

    train, val, test = temporal_train_test_split(df_proc)
    excluded = ['id', 'fecha', 'demanda_mw']
    features = [c for c in df_proc.columns if c not in excluded]
    with open(FEATURES_PATH, 'w') as f:
        f.write("\n".join(features))
    logger.info("Num features: %s", len(features))

    model = train_lightgbm(train, val, features)
    joblib.dump(model, MODEL_PATH)
    logger.info("Modelo guardado en /tmp")

    # métricas (logs)
    logger.info("Eval TRAIN: %s", evaluate_model(model, train, features))
    logger.info("Eval VAL  : %s", evaluate_model(model, val, features))
    logger.info("Eval TEST : %s", evaluate_model(model, test, features))

    upload_artifacts_to_s3(MODEL_PATH, FEATURES_PATH)

    # Predicción 7 días y guardado
    last_block = df_proc.iloc[-180:].copy()
    preds_future = predict_future(model, last_block, features, horizon=7)
    logger.info("Predicciones generadas: %s", len(preds_future))
    model_version = datetime.utcnow().strftime("%Y%m%d")
    save_forecast_to_db(preds_future, model_version=model_version, ambito="ES")

    # Avisar a API
    notify_api_reload()

    return {"statusCode": 200, "body": json.dumps({"msg": "OK", "preds": len(preds_future)})}