In [None]:
import os
import glob
import logging
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import IterableDataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error

# Configuración de logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s — %(levelname)s — %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)

# Detectar GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Usando dispositivo: {device}")

# IterableDataset que lee shards en streaming
class ParquetShardsDataset(IterableDataset):
    def __init__(self, shards, batch_size, scaler=None):
        """
        shards: lista de rutas a archivos parquet
        batch_size: tamaño de lote
        scaler: StandardScaler entrenado (solo para val)
        """
        self.shards = shards
        self.batch_size = batch_size
        self.scaler = scaler

    def __iter__(self):
        for shard_path in self.shards:
            df = pd.read_parquet(shard_path)
            X, y = prepare_features_target(df)
            if self.scaler is not None:
                X = self.scaler.transform(X)
            # emitir en batches
            for i in range(0, len(y), self.batch_size):
                Xb = X[i : i + self.batch_size]
                yb = y[i : i + self.batch_size]
                # convertir a tensores
                Xb_t = torch.from_numpy(Xb).float()
                yb_t = torch.from_numpy(yb.reshape(-1,1)).float()
                yield Xb_t, yb_t

# Definición de la red
class PriceNet(nn.Module):
    def __init__(self, n_features):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_features, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )
    def forward(self, x):
        return self.net(x)

def train_with_shards(
    train_dir, val_dir,
    batch_size=1024, lr=1e-3, n_epochs=10
):
    # Listar shards
    train_shards = sorted(glob.glob(os.path.join(train_dir, "*.parquet")))
    val_shards   = sorted(glob.glob(os.path.join(val_dir,   "*.parquet")))
    logger.info(f"{len(train_shards)} shards de entrenamiento, "
                f"{len(val_shards)} shards de validación")

    # Primer shard para entrenar scaler y dimensionar la red
    df0 = pd.read_parquet(train_shards[0])
    X0, y0 = prepare_features_target(df0)
    scaler = StandardScaler().fit(X0)
    n_features = X0.shape[1]

    # Datasets y loaders
    train_ds = ParquetShardsDataset(train_shards, batch_size, scaler=None)
    val_ds   = ParquetShardsDataset(val_shards,   batch_size, scaler=scaler)
    train_loader = DataLoader(train_ds, batch_size=None)
    val_loader   = DataLoader(val_ds,   batch_size=None)

    # Modelo, optimizador y loss
    model = PriceNet(n_features).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    for epoch in range(1, n_epochs + 1):
        # --- Entrenamiento ---
        model.train()
        train_losses = []
        for Xb, yb in train_loader:
            Xb, yb = Xb.to(device), yb.to(device)
            optimizer.zero_grad()
            preds = model(Xb)
            loss = loss_fn(preds, yb)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())
        logger.info(
            f"Epoch {epoch}/{n_epochs} — Train MSE: {np.mean(train_losses):.4f}"
        )

        # --- Validación ---
        model.eval()
        val_mae = []
        with torch.no_grad():
            for Xb, yb in val_loader:
                Xb, yb = Xb.to(device), yb.to(device)
                preds = model(Xb)
                # MAE en GPU
                mae = torch.mean(torch.abs(preds - yb)).item()
                val_mae.append(mae)
        logger.info(
            f"Epoch {epoch}/{n_epochs} — Val MAE: {np.mean(val_mae):.4f}"
        )

    return model, scaler

# Ejecución
# model, scaler = train_with_shards(
#     train_dir="data/train",
#     val_dir="data/val",
#     batch_size=2048,
#     lr=1e-3,
#     n_epochs=5
# )

In [None]:
import os
import math
import pandas as pd
from sqlalchemy import create_engine
from sklearn.model_selection import StratifiedShuffleSplit

# 1) Parámetros
ENGINE_URL = "postgresql://user:pass@host:port/dbname"
SCHEMA = "schema_name"
TABLE = "table_name"
ID_COL = "primary_key_col"
STRAT_COL = "column_to_stratify"
OUTPUT_DIR = "data"           # aquí crearás data/train y data/val
BATCH_SIZE = 100_000          # filas por chunk al leer la BD
SHARD_SIZE = 200_000          # filas por archivo Parquet

# 2) Conectar a la base de datos
engine = create_engine(ENGINE_URL)

# 3) Leer sólo id y columna de estratificación
query = f"""
SELECT {ID_COL}, {STRAT_COL}
FROM {SCHEMA}.{TABLE}
"""
df_index = pd.read_sql(query, engine)

# 4) Split estratificado
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, val_idx = next(
    sss.split(df_index, df_index[STRAT_COL])
)
train_ids = set(df_index.iloc[train_idx][ID_COL])
val_ids   = set(df_index.iloc[val_idx][ID_COL])
del df_index  # liberar memoria

# 5) Función para volcar shards Parquet
def dump_shards(id_set, out_subdir):
    os.makedirs(out_subdir, exist_ok=True)
    shard_rows = []
    shard_num = 0
    offset = 0
    
    # Leer en chunks y filtrar por id_set
    while True:
        chunk = pd.read_sql(
            f"SELECT * FROM {SCHEMA}.{TABLE} "
            f"LIMIT {BATCH_SIZE} OFFSET {offset}",
            engine
        )
        if chunk.empty:
            break
        offset += BATCH_SIZE
        
        # Filtrar
        filtered = chunk[chunk[ID_COL].isin(id_set)]
        for start in range(0, len(filtered), SHARD_SIZE):
            shard = filtered.iloc[start:start+SHARD_SIZE]
            path = os.path.join(out_subdir, f"shard_{shard_num:03d}.parquet")
            shard.to_parquet(path, index=False)
            print(f"Escrito {path} ({len(shard)} filas)")
            shard_num += 1

# 6) Volcar train y validation
dump_shards(train_ids, os.path.join(OUTPUT_DIR, "train"))
dump_shards(val_ids,   os.path.join(OUTPUT_DIR, "val"))
