# Sistema de Recomendaci√≥n ALS - Entrenamiento en Kaggle
## MovieLens-20M Dataset

Este notebook implementa un sistema completo de entrenamiento, reentrenamiento incremental y predicci√≥n para el modelo ALS usando PySpark.

**Caracter√≠sticas:**
- Entrenamiento escalable en Kaggle con PySpark
- Reentrenamiento incremental autom√°tico
- Sistema de versionado de modelos
- Descarga y predicci√≥n local
- Monitoreo y validaci√≥n autom√°tica

## 1. Configuraci√≥n del Entorno y Dependencias

In [None]:
# Instalaci√≥n de dependencias
!pip install -q pyspark==3.5.0
!pip install -q kaggle
!pip install -q pandas numpy matplotlib seaborn
!pip install -q joblib

import sys
print(f"Python version: {sys.version}")

In [None]:
# Imports necesarios
import os
import json
import shutil
import hashlib
from datetime import datetime
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import RankingMetrics

print("‚úì Imports completados")

In [None]:
# Configuraci√≥n de rutas y variables globales
class Config:
    """Configuraci√≥n centralizada del sistema"""
    
    # Rutas base
    BASE_PATH = Path("/kaggle/working")
    INPUT_PATH = Path("/kaggle/input/movielens-20m-dataset")
    MODEL_PATH = BASE_PATH / "models"
    METRICS_PATH = BASE_PATH / "metrics"
    LOGS_PATH = BASE_PATH / "logs"
    
    # Configuraci√≥n de Spark
    SPARK_MEMORY = "14g"  # Kaggle tiene ~16GB RAM
    SPARK_CORES = 4
    SPARK_PARTITIONS = 200
    
    # Par√°metros del modelo ALS
    DEFAULT_RANK = 10
    DEFAULT_MAX_ITER = 5
    DEFAULT_REG_PARAM = 0.1
    DEFAULT_ALPHA = 1.0
    
    # Configuraci√≥n de reentrenamiento
    INCREMENTAL_THRESHOLD = 0.1  # 10% de nuevos datos
    MODEL_VERSION_FORMAT = "als_model_v{version}_{date}"
    MAX_MODEL_VERSIONS = 5
    
    # M√©tricas
    EVALUATION_METRICS = ["rmse", "mae"]
    TOP_K = [5, 10, 20]
    
    @classmethod
    def setup_directories(cls):
        """Crear directorios necesarios"""
        for path in [cls.MODEL_PATH, cls.METRICS_PATH, cls.LOGS_PATH]:
            path.mkdir(parents=True, exist_ok=True)
        print(f"‚úì Directorios creados en {cls.BASE_PATH}")
    
    @classmethod
    def get_system_info(cls):
        """Obtener informaci√≥n del sistema"""
        return {
            "spark_memory": cls.SPARK_MEMORY,
            "spark_cores": cls.SPARK_CORES,
            "spark_partitions": cls.SPARK_PARTITIONS,
            "timestamp": datetime.now().isoformat()
        }

Config.setup_directories()
print(f"‚úì Configuraci√≥n inicializada")
print(f"  - Memoria Spark: {Config.SPARK_MEMORY}")
print(f"  - Cores: {Config.SPARK_CORES}")
print(f"  - Particiones: {Config.SPARK_PARTITIONS}")

In [None]:
# Inicializar SparkSession con configuraci√≥n optimizada
def create_spark_session(app_name="ALS-MovieLens-Training"):
    """
    Crea una sesi√≥n de Spark optimizada para Kaggle
    """
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.driver.memory", Config.SPARK_MEMORY) \
        .config("spark.executor.memory", Config.SPARK_MEMORY) \
        .config("spark.sql.shuffle.partitions", Config.SPARK_PARTITIONS) \
        .config("spark.default.parallelism", Config.SPARK_CORES * 2) \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "512m") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    
    print("‚úì Spark Session creada exitosamente")
    print(f"  - Spark Version: {spark.version}")
    print(f"  - Master: {spark.sparkContext.master}")
    print(f"  - App Name: {spark.sparkContext.appName}")
    
    return spark

spark = create_spark_session()

## 2. Carga y Preprocesamiento de Datos MovieLens

In [None]:
class DataLoader:
    """Maneja la carga y preprocesamiento de datos"""
    
    def __init__(self, spark, input_path=Config.INPUT_PATH):
        self.spark = spark
        self.input_path = input_path
        self.ratings_df = None
        self.movies_df = None
        self.stats = {}
    
    def load_ratings(self):
        """Carga el dataset de ratings"""
        print("Cargando ratings...")
        
        ratings_file = self.input_path / "rating.csv"
        
        if not ratings_file.exists():
            raise FileNotFoundError(f"No se encontr√≥ el archivo: {ratings_file}")
        
        # Leer CSV con inferencia de esquema
        self.ratings_df = self.spark.read.csv(
            str(ratings_file),
            header=True,
            inferSchema=True
        )
        
        # Convertir tipos expl√≠citamente para asegurar compatibilidad con ALS
        self.ratings_df = self.ratings_df.select(
            F.col("userId").cast(IntegerType()).alias("userId"),
            F.col("movieId").cast(IntegerType()).alias("movieId"),
            F.col("rating").cast(FloatType()).alias("rating"),
            F.col("timestamp").cast(LongType()).alias("timestamp")
        )
        
        # Verificar que no est√© vac√≠o
        initial_count = self.ratings_df.count()
        if initial_count == 0:
            raise ValueError("El dataset de ratings est√° vac√≠o")
        
        print(f"‚úì Ratings cargados: {initial_count:,} registros")
        
        # Reparticionar para mejor distribuci√≥n
        self.ratings_df = self.ratings_df.repartition(
            Config.SPARK_PARTITIONS, 
            "userId"
        )
        
        return self.ratings_df
    
    def load_movies(self):
        """Carga informaci√≥n de pel√≠culas"""
        print("Cargando informaci√≥n de pel√≠culas...")
        
        movies_file = self.input_path / "movie.csv"
        
        if movies_file.exists():
            self.movies_df = self.spark.read.csv(
                str(movies_file),
                header=True,
                inferSchema=True
            )
            
            count = self.movies_df.count()
            print(f"‚úì Pel√≠culas cargadas: {count:,} registros")
        else:
            print("‚ö† Archivo de pel√≠culas no encontrado")
        
        return self.movies_df
    
    def explore_data(self):
        """Explora y genera estad√≠sticas del dataset"""
        print("\n" + "="*60)
        print("EXPLORACI√ìN DE DATOS")
        print("="*60)
        
        if self.ratings_df is None:
            print("‚ö† No hay datos cargados")
            return
        
        # Estad√≠sticas b√°sicas
        self.stats['total_ratings'] = self.ratings_df.count()
        self.stats['unique_users'] = self.ratings_df.select("userId").distinct().count()
        self.stats['unique_movies'] = self.ratings_df.select("movieId").distinct().count()
        
        print(f"\nTotal de Ratings: {self.stats['total_ratings']:,}")
        print(f"Usuarios √∫nicos: {self.stats['unique_users']:,}")
        print(f"Pel√≠culas √∫nicas: {self.stats['unique_movies']:,}")
        
        # Sparsity
        sparsity = 1.0 - (
            self.stats['total_ratings'] / 
            (self.stats['unique_users'] * self.stats['unique_movies'])
        )
        self.stats['sparsity'] = sparsity
        print(f"Sparsity: {sparsity:.4%}")
        
        # Distribuci√≥n de ratings
        print("\nDistribuci√≥n de Ratings:")
        rating_dist = self.ratings_df.groupBy("rating").count() \
            .orderBy("rating") \
            .collect()
        
        for row in rating_dist:
            print(f"  Rating {row['rating']}: {row['count']:,}")
        
        # Estad√≠sticas de ratings
        stats = self.ratings_df.select(
            F.mean("rating").alias("mean"),
            F.stddev("rating").alias("std"),
            F.min("rating").alias("min"),
            F.max("rating").alias("max")
        ).collect()[0]
        
        print(f"\nEstad√≠sticas de Ratings:")
        print(f"  Media: {stats['mean']:.2f}")
        print(f"  Desv. Est√°ndar: {stats['std']:.2f}")
        print(f"  M√≠nimo: {stats['min']:.1f}")
        print(f"  M√°ximo: {stats['max']:.1f}")
        
        # Usuarios y pel√≠culas m√°s activos
        print("\nTop 5 Usuarios m√°s activos:")
        top_users = self.ratings_df.groupBy("userId").count() \
            .orderBy(F.desc("count")) \
            .limit(5) \
            .collect()
        
        for i, row in enumerate(top_users, 1):
            print(f"  {i}. Usuario {row['userId']}: {row['count']} ratings")
        
        print("\nTop 5 Pel√≠culas m√°s valoradas:")
        top_movies = self.ratings_df.groupBy("movieId").count() \
            .orderBy(F.desc("count")) \
            .limit(5) \
            .collect()
        
        for i, row in enumerate(top_movies, 1):
            print(f"  {i}. Pel√≠cula {row['movieId']}: {row['count']} ratings")
        
        print("="*60 + "\n")
        
        return self.stats
    
    def clean_data(self):
        """Limpia y valida los datos"""
        print("Limpiando datos...")
        
        initial_count = self.ratings_df.count()
        
        # Eliminar nulos
        self.ratings_df = self.ratings_df.na.drop()
        
        # Validar rango de ratings
        self.ratings_df = self.ratings_df.filter(
            (F.col("rating") >= 0.5) & (F.col("rating") <= 5.0)
        )
        
        # Eliminar duplicados
        self.ratings_df = self.ratings_df.dropDuplicates(["userId", "movieId"])
        
        # Validar que userId y movieId no sean nulos o negativos
        self.ratings_df = self.ratings_df.filter(
            (F.col("userId").isNotNull()) & 
            (F.col("movieId").isNotNull()) &
            (F.col("userId") > 0) &
            (F.col("movieId") > 0)
        )
        
        final_count = self.ratings_df.count()
        removed = initial_count - final_count
        
        if final_count == 0:
            raise ValueError("Todos los datos fueron removidos durante la limpieza. Verifica el dataset.")
        
        print(f"‚úì Limpieza completada")
        print(f"  - Registros removidos: {removed:,}")
        print(f"  - Registros finales: {final_count:,}")
        
        return self.ratings_df
    
    def prepare_train_test_split(self, train_ratio=0.8, seed=42):
        """Divide datos en entrenamiento y prueba"""
        print(f"\nDividiendo datos (train: {train_ratio:.0%}, test: {1-train_ratio:.0%})...")
        
        # Verificar que hay datos antes de dividir
        total_count = self.ratings_df.count()
        if total_count == 0:
            raise ValueError("No hay datos para dividir. El DataFrame est√° vac√≠o.")
        
        train_df, test_df = self.ratings_df.randomSplit(
            [train_ratio, 1-train_ratio], 
            seed=seed
        )
        
        # Persistir en memoria para evitar recomputaci√≥n
        train_df = train_df.persist()
        test_df = test_df.persist()
        
        train_count = train_df.count()
        test_count = test_df.count()
        
        if train_count == 0:
            raise ValueError("El conjunto de entrenamiento est√° vac√≠o despu√©s de la divisi√≥n")
        
        if test_count == 0:
            raise ValueError("El conjunto de prueba est√° vac√≠o despu√©s de la divisi√≥n")
        
        print(f"‚úì Divisi√≥n completada")
        print(f"  - Train: {train_count:,} registros ({train_count/total_count:.1%})")
        print(f"  - Test: {test_count:,} registros ({test_count/total_count:.1%})")
        
        return train_df, test_df

# Inicializar y cargar datos
data_loader = DataLoader(spark)
ratings_df = data_loader.load_ratings()
movies_df = data_loader.load_movies()

In [None]:
# Explorar y limpiar datos
stats = data_loader.explore_data()
ratings_df = data_loader.clean_data()

In [None]:
# Preparar splits de datos
train_df, test_df = data_loader.prepare_train_test_split(train_ratio=0.8)

In [None]:
# Verificar datos antes del entrenamiento
print("="*60)
print("VERIFICACI√ìN PRE-ENTRENAMIENTO")
print("="*60)

print(f"\nDataFrame de entrenamiento:")
print(f"  - Registros: {train_df.count():,}")
print(f"  - Columnas: {train_df.columns}")
print(f"  - Schema:")
train_df.printSchema()

print(f"\nMuestra de datos de entrenamiento:")
train_df.show(10)

print(f"\nEstad√≠sticas b√°sicas:")
train_df.describe().show()

print("="*60)

## 3. Entrenamiento Inicial del Modelo ALS

## 3. Entrenamiento Inicial del Modelo ALS

### üìö Gu√≠a de Entrenamiento: Normal vs Optimizado vs Mejor Modelo

#### **üéØ Entrenamiento Normal (R√°pido - 5-8 minutos)**
```python
model = trainer.train_model(
    train_df,
    rank=10,          # Factores latentes (baja complejidad)
    maxIter=5,        # Pocas iteraciones (convergencia r√°pida)
    regParam=0.1      # Regularizaci√≥n moderada
)
```

**Caracter√≠sticas:**
- ‚ö° M√°s r√°pido (~5-8 minutos con 20M ratings)
- üéØ Bueno para desarrollo y pruebas
- üìä RMSE esperado: ~0.85-0.90
- üíæ Menos memoria (~2-3 GB)
- ‚úÖ **√ösalo para**: Validar pipeline, desarrollo inicial, entrenamiento diario

---

#### **üî¨ Entrenamiento con Optimizaci√≥n (Grid Search - 30-60 minutos)**
```python
param_grid = {
    'rank': [10, 20, 30],
    'regParam': [0.01, 0.1, 1.0]
}

model = trainer.train_with_optimization(
    train_df,
    validation_df=test_df,
    param_grid=param_grid
)
```

**Caracter√≠sticas:**
- üîç Busca la mejor combinaci√≥n de hiperpar√°metros
- üìà Prueba m√∫ltiples configuraciones (3√ó3 = 9 entrenamientos)
- üéØ Selecciona el modelo con mejor RMSE en validaci√≥n
- ‚è±Ô∏è M√°s lento (30-60 min dependiendo del grid)
- üìä RMSE esperado: ~0.82-0.87
- ‚úÖ **√ösalo para**: Primera vez, ajuste fino, cuando cambias el dataset

---

#### **üèÜ Mejor Modelo Posible (Producci√≥n - 20-40 minutos)**
```python
model = trainer.train_model(
    train_df,
    rank=50,          # M√°s factores latentes = mayor expresividad
    maxIter=15,       # M√°s iteraciones = mejor convergencia
    regParam=0.05,    # Regularizaci√≥n ajustada (no sobreajustar)
    alpha=1.0
)
```

**Configuraci√≥n √ìptima para MovieLens-20M:**
- **rank=50**: Captura m√°s patrones complejos de preferencias
- **maxIter=15**: Balance convergencia/tiempo
- **regParam=0.05**: Evita overfitting sin perder expresividad
- **checkpointInterval=5**: Previene StackOverflow en iteraciones largas

**Caracter√≠sticas:**
- üéØ M√°xima precisi√≥n posible
- üìä RMSE esperado: ~0.78-0.83
- üíæ Mayor consumo de memoria (~3.5-4 GB)
- ‚è±Ô∏è Tiempo moderado (20-40 min)
- ‚úÖ **√ösalo para**: Modelo final de producci√≥n, evaluaci√≥n benchmark

---

### üìä Comparativa de Par√°metros

| Par√°metro | Entrenamiento R√°pido | Optimizaci√≥n | Mejor Modelo | Efecto |
|-----------|---------------------|--------------|--------------|--------|
| **rank** | 10 | Grid: [10,20,30] | 50 | ‚Üë Mayor expresividad, ‚Üë memoria |
| **maxIter** | 5 | 5 (por modelo) | 15 | ‚Üë Mejor convergencia, ‚Üë tiempo |
| **regParam** | 0.1 | Grid: [0.01,0.1,1.0] | 0.05 | ‚Üì Menos overfitting |
| **RMSE** | ~0.88 | ~0.84 | ~0.80 | ‚Üì Mejor |
| **Tiempo** | 5-8 min | 30-60 min | 20-40 min | - |
| **Memoria** | ~2.5 GB | ~2.5 GB/modelo | ~4 GB | - |

---

### üéõÔ∏è Explicaci√≥n de Hiperpar√°metros

#### **rank** (Factores Latentes)
- **Qu√© es**: Dimensiones del espacio latente (factores ocultos)
- **Valores t√≠picos**: 10-100
- **Efecto**:
  - ‚Üë Mayor rank = M√°s expresividad, captura patrones complejos
  - ‚Üì Mayor rank = M√°s memoria, m√°s riesgo de overfitting
- **Recomendaci√≥n**: 
  - rank=10 para desarrollo
  - rank=20-30 para producci√≥n est√°ndar
  - rank=50 para m√°xima calidad

#### **maxIter** (Iteraciones)
- **Qu√© es**: N√∫mero de pasadas del algoritmo ALS
- **Valores t√≠picos**: 5-20
- **Efecto**:
  - ‚Üë M√°s iteraciones = Mejor convergencia
  - ‚Üì M√°s iteraciones = M√°s tiempo
  - Despu√©s de ~15 iter, mejora marginal
- **Recomendaci√≥n**:
  - maxIter=5 para desarrollo
  - maxIter=10 para producci√≥n
  - maxIter=15 para m√°xima calidad

#### **regParam** (Regularizaci√≥n L2)
- **Qu√© es**: Penalizaci√≥n para evitar overfitting
- **Valores t√≠picos**: 0.01-1.0
- **Efecto**:
  - ‚Üë Mayor valor = M√°s generalizaci√≥n, menos overfitting
  - ‚Üì Menor valor = M√°s ajuste a datos de entrenamiento
- **Recomendaci√≥n**:
  - regParam=0.1 para empezar
  - regParam=0.01-0.05 si tienes muchos datos
  - regParam=0.5-1.0 si ves overfitting

#### **alpha** (Confianza Impl√≠cita)
- **Qu√© es**: Peso para ratings impl√≠citos
- **Valores t√≠picos**: 1.0 (ratings expl√≠citos), 40 (impl√≠citos)
- **Efecto**: En MovieLens (expl√≠cito) usar alpha=1.0
- **Recomendaci√≥n**: Dejar en 1.0 para ratings expl√≠citos

---

### üöÄ Estrategia Recomendada

#### **Fase 1: Desarrollo (Primera vez)**
```python
# 1. Entrenamiento r√°pido para validar
model_dev = trainer.train_model(train_df, rank=10, maxIter=5)
# Tiempo: ~5 min | RMSE: ~0.88
```

#### **Fase 2: Optimizaci√≥n (Una vez)**
```python
# 2. Buscar mejores hiperpar√°metros
param_grid = {
    'rank': [10, 20, 30],
    'regParam': [0.05, 0.1, 0.2]
}
model_opt = trainer.train_with_optimization(train_df, test_df, param_grid)
# Tiempo: ~45 min | RMSE: ~0.84 | Mejores params guardados
```

#### **Fase 3: Producci√≥n (Modelo Final)**
```python
# 3. Entrenar con mejores par√°metros encontrados + m√°s recursos
model_prod = trainer.train_model(
    train_df,
    rank=30,      # Del grid search
    maxIter=15,   # M√°s iteraciones
    regParam=0.05 # Del grid search
)
# Tiempo: ~25 min | RMSE: ~0.80
```

#### **Fase 4: Mantenimiento (Diario/Semanal)**
```python
# 4. Reentrenamiento incremental con par√°metros √≥ptimos
model_updated = incremental_trainer.auto_retrain_pipeline(new_ratings_df)
# Usa autom√°ticamente los mejores par√°metros guardados
```

---

### üí° Consejos Pr√°cticos

1. **Primera vez**: Usa entrenamiento r√°pido (rank=10, iter=5) para validar
2. **Tienes tiempo**: Corre grid search una vez para encontrar mejores par√°metros
3. **Producci√≥n**: Usa rank=30-50, iter=15, regParam del grid search
4. **Actualizaciones**: El sistema autom√°tico usa par√°metros guardados
5. **Memoria limitada**: Mant√©n rank ‚â§ 20
6. **Tiempo limitado**: Usa maxIter=5-8

---

### üìà Expectativas de Rendimiento

| Dataset | Rank | Iter | RMSE | Precision@10 | Tiempo |
|---------|------|------|------|--------------|--------|
| MovieLens-20M | 10 | 5 | 0.88 | 0.13 | 5-8 min |
| MovieLens-20M | 20 | 10 | 0.84 | 0.16 | 12-18 min |
| MovieLens-20M | 50 | 15 | 0.80 | 0.19 | 25-35 min |

**RMSE < 0.85** = Excelente para sistemas de recomendaci√≥n  
**Precision@10 > 0.15** = Buen sistema de recomendaci√≥n

In [None]:
# ============================================================================
# ELIGE TU ESTRATEGIA DE ENTRENAMIENTO
# ============================================================================

# Descomenta UNA de las siguientes opciones:

# OPCI√ìN 1: ENTRENAMIENTO R√ÅPIDO (Recomendado para primera vez)
# ‚ö° Tiempo: ~5-8 min | üéØ RMSE esperado: ~0.88
print("üöÄ Estrategia: ENTRENAMIENTO R√ÅPIDO")
print("=" * 60)
model = trainer.train_model(
    train_df,
    rank=10,
    maxIter=5,
    regParam=0.1
)

# OPCI√ìN 2: ENTRENAMIENTO CON OPTIMIZACI√ìN (Ejecutar una vez para encontrar mejores params)
# üî¨ Tiempo: ~45-60 min | üéØ RMSE esperado: ~0.84
# print("üî¨ Estrategia: OPTIMIZACI√ìN DE HIPERPAR√ÅMETROS")
# print("=" * 60)
# param_grid = {
#     'rank': [10, 20, 30],
#     'regParam': [0.05, 0.1, 0.2]
# }
# model = trainer.train_with_optimization(
#     train_df,
#     validation_df=test_df,
#     param_grid=param_grid
# )

# OPCI√ìN 3: MEJOR MODELO POSIBLE (Para producci√≥n final)
# üèÜ Tiempo: ~25-35 min | üéØ RMSE esperado: ~0.80
# print("üèÜ Estrategia: MEJOR MODELO POSIBLE")
# print("=" * 60)
# model = trainer.train_model(
#     train_df,
#     rank=50,
#     maxIter=15,
#     regParam=0.05
# )

print(f"\n‚úì Modelo entrenado exitosamente")
print(f"  - Rank: {model.rank}")
print(f"  - User factors: {model.userFactors.count():,}")
print(f"  - Item factors: {model.itemFactors.count():,}")

In [None]:
# OPTIMIZACI√ìN AVANZADA (Opcional - Solo ejecutar si tienes tiempo)
# Tiempo estimado: 2-3 horas
# Prueba m√°s combinaciones para encontrar el √≥ptimo absoluto

# Descomenta para ejecutar:
# print("üî¨ OPTIMIZACI√ìN AVANZADA")
# print("=" * 60)
# print("‚ö†Ô∏è  Esto tomar√° 2-3 horas. Solo ejecuta si tienes tiempo.")
# print()
# 
# param_grid_advanced = {
#     'rank': [10, 20, 30, 50],
#     'regParam': [0.01, 0.05, 0.1, 0.2],
#     'maxIter': 15,
#     'alpha': [1.0, 5.0, 10.0],
# }
# 
# model_advanced = trainer.train_with_optimization(
#     train_df,
#     validation_df=test_df,
#     param_grid=param_grid_advanced
# )
# 
# print(f"\n‚úì Mejores par√°metros encontrados:")
# print(f"  {trainer.best_params}")
# print(f"  RMSE: {trainer.training_metrics.get('best_rmse', 'N/A'):.4f}")

print("üí° Tip: Para optimizaci√≥n avanzada, descomenta el c√≥digo de arriba")
print("   Recomendado solo si buscas el m√°ximo rendimiento absoluto")

### üìñ Ejemplo de Optimizaci√≥n Avanzada (Opcional)

Si quieres explorar m√°s combinaciones de par√°metros, aqu√≠ hay una configuraci√≥n avanzada:

In [None]:
class ALSTrainer:
    """Maneja el entrenamiento del modelo ALS"""
    
    def __init__(self, spark):
        self.spark = spark
        self.model = None
        self.training_metrics = {}
        self.best_params = {}
    
    def build_als_model(self, rank=None, maxIter=None, regParam=None, alpha=None):
        """
        Construye el modelo ALS con par√°metros especificados
        """
        rank = rank or Config.DEFAULT_RANK
        maxIter = maxIter or Config.DEFAULT_MAX_ITER
        regParam = regParam or Config.DEFAULT_REG_PARAM
        alpha = alpha or Config.DEFAULT_ALPHA
        
        als = ALS(
            rank=rank,
            maxIter=maxIter,
            regParam=regParam,
            userCol="userId",
            itemCol="movieId",
            ratingCol="rating",
            coldStartStrategy="drop",
            nonnegative=True,
            implicitPrefs=False,
            alpha=alpha,
            seed=42,
            checkpointInterval=10
        )
        
        print(f"‚úì Modelo ALS configurado:")
        print(f"  - Rank: {rank}")
        print(f"  - Max Iterations: {maxIter}")
        print(f"  - Reg Parameter: {regParam}")
        print(f"  - Alpha: {alpha}")
        
        return als
    
    def train_model(self, train_df, rank=None, maxIter=None, regParam=None, alpha=None):
        """
        Entrena el modelo ALS
        """
        print("\n" + "="*60)
        print("ENTRENAMIENTO DEL MODELO ALS")
        print("="*60)
        
        # Validar datos de entrada
        print("\nValidando datos de entrenamiento...")
        train_count = train_df.count()
        
        if train_count == 0:
            raise ValueError("El DataFrame de entrenamiento est√° vac√≠o")
        
        print(f"‚úì Datos de entrenamiento v√°lidos: {train_count:,} registros")
        
        # Verificar esquema
        required_cols = ["userId", "movieId", "rating"]
        actual_cols = train_df.columns
        
        for col in required_cols:
            if col not in actual_cols:
                raise ValueError(f"Columna requerida '{col}' no encontrada en el DataFrame")
        
        print(f"‚úì Esquema validado: {actual_cols}")
        
        # Mostrar muestra de datos
        print("\nMuestra de datos:")
        train_df.show(5)
        
        start_time = datetime.now()
        print(f"\nInicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
        
        # Construir modelo
        als = self.build_als_model(rank, maxIter, regParam, alpha)
        
        # Entrenar
        print("\nEntrenando modelo...")
        try:
            self.model = als.fit(train_df)
        except Exception as e:
            print(f"\n‚úó Error durante el entrenamiento: {str(e)}")
            print("\nDiagn√≥stico:")
            print(f"  - Registros: {train_count}")
            print(f"  - Columnas: {train_df.columns}")
            print(f"  - Tipos: {train_df.dtypes}")
            raise
        
        end_time = datetime.now()
        training_duration = (end_time - start_time).total_seconds()
        
        # Guardar m√©tricas de entrenamiento
        self.training_metrics = {
            'start_time': start_time.isoformat(),
            'end_time': end_time.isoformat(),
            'duration_seconds': training_duration,
            'rank': als.getRank(),
            'max_iter': als.getMaxIter(),
            'reg_param': als.getRegParam(),
            'alpha': als.getAlpha(),
            'train_size': train_count
        }
        
        print(f"\n‚úì Entrenamiento completado en {training_duration:.2f} segundos ({training_duration/60:.2f} minutos)")
        print("="*60 + "\n")
        
        return self.model
    
    def train_with_optimization(self, train_df, validation_df=None, param_grid=None):
        """
        Entrena con optimizaci√≥n de hiperpar√°metros (opcional)
        """
        print("\n" + "="*60)
        print("ENTRENAMIENTO CON OPTIMIZACI√ìN")
        print("="*60)
        
        if param_grid is None:
            # Grid simple para b√∫squeda r√°pida
            param_grid = {
                'rank': [10, 20],
                'regParam': [0.1, 0.01]
            }
        
        als = self.build_als_model()
        
        best_rmse = float('inf')
        best_params = {}
        best_model = None
        
        total_combinations = len(param_grid['rank']) * len(param_grid['regParam'])
        current = 0
        
        print(f"\nProbando {total_combinations} combinaciones de par√°metros...")
        
        for rank in param_grid['rank']:
            for regParam in param_grid['regParam']:
                current += 1
                print(f"\n[{current}/{total_combinations}] Entrenando con rank={rank}, regParam={regParam}")
                
                # Entrenar modelo
                als_temp = self.build_als_model(rank=rank, regParam=regParam, maxIter=5)
                model_temp = als_temp.fit(train_df)
                
                # Evaluar
                if validation_df is not None:
                    predictions = model_temp.transform(validation_df)
                    evaluator = RegressionEvaluator(
                        metricName="rmse",
                        labelCol="rating",
                        predictionCol="prediction"
                    )
                    rmse = evaluator.evaluate(predictions)
                    
                    print(f"  RMSE: {rmse:.4f}")
                    
                    if rmse < best_rmse:
                        best_rmse = rmse
                        best_params = {'rank': rank, 'regParam': regParam}
                        best_model = model_temp
                        print(f"  ‚úì Nuevo mejor modelo!")
        
        print(f"\n‚úì Optimizaci√≥n completada")
        print(f"  - Mejores par√°metros: {best_params}")
        print(f"  - Mejor RMSE: {best_rmse:.4f}")
        
        self.model = best_model
        self.best_params = best_params
        self.training_metrics['best_params'] = best_params
        self.training_metrics['best_rmse'] = best_rmse
        
        print("="*60 + "\n")
        
        return self.model

# Inicializar trainer
trainer = ALSTrainer(spark)

In [None]:
# Entrenar modelo con par√°metros por defecto
model = trainer.train_model(
    train_df,
    rank=Config.DEFAULT_RANK,
    maxIter=Config.DEFAULT_MAX_ITER,
    regParam=Config.DEFAULT_REG_PARAM
)

## 4. Evaluaci√≥n y M√©tricas del Modelo

In [None]:
class ModelEvaluator:
    """Eval√∫a el rendimiento del modelo"""
    
    def __init__(self, model, spark):
        self.model = model
        self.spark = spark
        self.metrics = {}
    
    def evaluate_regression_metrics(self, test_df):
        """
        Calcula m√©tricas de regresi√≥n (RMSE, MAE, MSE)
        """
        print("\n" + "="*60)
        print("EVALUACI√ìN DE M√âTRICAS DE REGRESI√ìN")
        print("="*60)
        
        # Generar predicciones
        predictions = self.model.transform(test_df)
        predictions = predictions.na.drop()  # Eliminar cold start
        
        # RMSE
        evaluator_rmse = RegressionEvaluator(
            metricName="rmse",
            labelCol="rating",
            predictionCol="prediction"
        )
        rmse = evaluator_rmse.evaluate(predictions)
        
        # MAE
        evaluator_mae = RegressionEvaluator(
            metricName="mae",
            labelCol="rating",
            predictionCol="prediction"
        )
        mae = evaluator_mae.evaluate(predictions)
        
        # MSE
        evaluator_mse = RegressionEvaluator(
            metricName="mse",
            labelCol="rating",
            predictionCol="prediction"
        )
        mse = evaluator_mse.evaluate(predictions)
        
        # R2
        evaluator_r2 = RegressionEvaluator(
            metricName="r2",
            labelCol="rating",
            predictionCol="prediction"
        )
        r2 = evaluator_r2.evaluate(predictions)
        
        self.metrics['rmse'] = rmse
        self.metrics['mae'] = mae
        self.metrics['mse'] = mse
        self.metrics['r2'] = r2
        self.metrics['test_size'] = test_df.count()
        self.metrics['predictions_count'] = predictions.count()
        
        print(f"\nM√©tricas de Regresi√≥n:")
        print(f"  - RMSE: {rmse:.4f}")
        print(f"  - MAE: {mae:.4f}")
        print(f"  - MSE: {mse:.4f}")
        print(f"  - R¬≤: {r2:.4f}")
        print(f"\nRegistros de test: {test_df.count():,}")
        print(f"Predicciones v√°lidas: {predictions.count():,}")
        
        print("="*60 + "\n")
        
        return self.metrics
    
    def evaluate_ranking_metrics(self, test_df, k_values=None):
        """
        Calcula m√©tricas de ranking (Precision@K, Recall@K)
        """
        if k_values is None:
            k_values = Config.TOP_K
        
        print("\n" + "="*60)
        print("EVALUACI√ìN DE M√âTRICAS DE RANKING")
        print("="*60)
        
        # Generar recomendaciones para todos los usuarios
        print("\nGenerando recomendaciones...")
        user_recs = self.model.recommendForAllUsers(max(k_values))
        
        # Preparar datos de test (pel√≠culas reales que gustaron)
        # Consideramos rating >= 4.0 como positivo
        actual = test_df.filter(F.col("rating") >= 4.0) \
            .groupBy("userId") \
            .agg(F.collect_list("movieId").alias("actual_movies"))
        
        # Extraer IDs de pel√≠culas recomendadas
        user_recs = user_recs.withColumn(
            "recommended_movies",
            F.col("recommendations.movieId")
        ).select("userId", "recommended_movies")
        
        # Join con datos reales
        eval_data = user_recs.join(actual, "userId", "inner")
        
        ranking_metrics = {}
        
        for k in k_values:
            print(f"\nCalculando m√©tricas para K={k}...")
            
            # Truncar recomendaciones a K
            eval_k = eval_data.withColumn(
                "recommended_k",
                F.slice("recommended_movies", 1, k)
            )
            
            # Calcular Precision@K y Recall@K
            eval_k = eval_k.withColumn(
                "hits",
                F.size(F.array_intersect("recommended_k", "actual_movies"))
            )
            
            eval_k = eval_k.withColumn(
                "precision",
                F.col("hits") / k
            )
            
            eval_k = eval_k.withColumn(
                "recall",
                F.col("hits") / F.size("actual_movies")
            )
            
            # Promedios
            avg_metrics = eval_k.agg(
                F.avg("precision").alias("avg_precision"),
                F.avg("recall").alias("avg_recall")
            ).collect()[0]
            
            precision_k = avg_metrics['avg_precision']
            recall_k = avg_metrics['avg_recall']
            f1_k = 2 * (precision_k * recall_k) / (precision_k + recall_k) if (precision_k + recall_k) > 0 else 0
            
            ranking_metrics[f'precision@{k}'] = precision_k
            ranking_metrics[f'recall@{k}'] = recall_k
            ranking_metrics[f'f1@{k}'] = f1_k
            
            print(f"  - Precision@{k}: {precision_k:.4f}")
            print(f"  - Recall@{k}: {recall_k:.4f}")
            print(f"  - F1@{k}: {f1_k:.4f}")
        
        self.metrics.update(ranking_metrics)
        
        print("="*60 + "\n")
        
        return ranking_metrics
    
    def evaluate_coverage(self, train_df, test_df, k=10):
        """
        Calcula la cobertura del modelo (% de items recomendados)
        """
        print("\nCalculando cobertura del modelo...")
        
        # Total de pel√≠culas √∫nicas
        total_movies = train_df.select("movieId").distinct().count()
        
        # Generar recomendaciones
        user_recs = self.model.recommendForAllUsers(k)
        
        # Pel√≠culas √∫nicas recomendadas
        recommended_movies = user_recs.select(
            F.explode("recommendations.movieId").alias("movieId")
        ).distinct().count()
        
        coverage = recommended_movies / total_movies
        
        self.metrics['coverage'] = coverage
        self.metrics['total_movies'] = total_movies
        self.metrics['recommended_movies'] = recommended_movies
        
        print(f"‚úì Cobertura: {coverage:.2%}")
        print(f"  - Pel√≠culas √∫nicas: {total_movies:,}")
        print(f"  - Pel√≠culas recomendadas: {recommended_movies:,}")
        
        return coverage
    
    def generate_evaluation_report(self):
        """
        Genera un reporte completo de evaluaci√≥n
        """
        print("\n" + "="*60)
        print("REPORTE DE EVALUACI√ìN COMPLETO")
        print("="*60)
        
        print(f"\n{'M√©trica':<25} {'Valor':<15}")
        print("-" * 40)
        
        for metric, value in sorted(self.metrics.items()):
            if isinstance(value, float):
                print(f"{metric:<25} {value:<15.4f}")
            else:
                print(f"{metric:<25} {value:<15,}")
        
        print("="*60 + "\n")
        
        return self.metrics
    
    def save_metrics(self, filepath=None):
        """
        Guarda las m√©tricas en un archivo JSON
        """
        if filepath is None:
            filepath = Config.METRICS_PATH / f"metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        metrics_with_metadata = {
            'timestamp': datetime.now().isoformat(),
            'metrics': self.metrics
        }
        
        with open(filepath, 'w') as f:
            json.dump(metrics_with_metadata, f, indent=2)
        
        print(f"‚úì M√©tricas guardadas en: {filepath}")
        
        return filepath

# Inicializar evaluador
evaluator = ModelEvaluator(model, spark)

In [None]:
# Evaluar m√©tricas de regresi√≥n
regression_metrics = evaluator.evaluate_regression_metrics(test_df)

In [None]:
# Evaluar m√©tricas de ranking
ranking_metrics = evaluator.evaluate_ranking_metrics(test_df, k_values=[5, 10, 20])

In [None]:
# Evaluar cobertura
coverage = evaluator.evaluate_coverage(train_df, test_df, k=10)

In [None]:
# Generar reporte completo
all_metrics = evaluator.generate_evaluation_report()

## 5. Guardado y Exportaci√≥n del Modelo

In [None]:
class ModelManager:
    """Gestiona el guardado, carga y versionado de modelos"""
    
    def __init__(self, base_path=Config.MODEL_PATH):
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
        self.metadata = {}
    
    def get_next_version(self):
        """
        Obtiene el siguiente n√∫mero de versi√≥n
        """
        existing_versions = []
        
        for folder in self.base_path.iterdir():
            if folder.is_dir() and folder.name.startswith("als_model_v"):
                try:
                    version = int(folder.name.split("_v")[1].split("_")[0])
                    existing_versions.append(version)
                except:
                    continue
        
        return max(existing_versions, default=0) + 1
    
    def save_model(self, model, metrics=None, training_params=None, version=None):
        """
        Guarda el modelo con metadatos
        """
        print("\n" + "="*60)
        print("GUARDANDO MODELO")
        print("="*60)
        
        # Determinar versi√≥n
        if version is None:
            version = self.get_next_version()
        
        # Crear nombre del modelo
        date_str = datetime.now().strftime('%Y%m%d_%H%M%S')
        model_name = f"als_model_v{version}_{date_str}"
        model_path = self.base_path / model_name
        
        print(f"\nGuardando modelo: {model_name}")
        print(f"Ruta: {model_path}")
        
        # Guardar modelo de Spark
        model_spark_path = model_path / "spark_model"
        model.write().overwrite().save(str(model_spark_path))
        print(f"‚úì Modelo Spark guardado")
        
        # Preparar metadatos
        self.metadata = {
            'version': version,
            'model_name': model_name,
            'timestamp': datetime.now().isoformat(),
            'spark_version': spark.version,
            'model_type': 'ALS',
            'model_path': str(model_path),
            'metrics': metrics or {},
            'training_params': training_params or {},
            'system_info': Config.get_system_info()
        }
        
        # Guardar metadatos
        metadata_path = model_path / "metadata.json"
        with open(metadata_path, 'w') as f:
            json.dump(self.metadata, f, indent=2)
        print(f"‚úì Metadatos guardados")
        
        # Guardar informaci√≥n del modelo
        model_info = {
            'rank': model.rank,
            'user_factors': model.userFactors.count(),
            'item_factors': model.itemFactors.count()
        }
        
        info_path = model_path / "model_info.json"
        with open(info_path, 'w') as f:
            json.dump(model_info, f, indent=2)
        print(f"‚úì Informaci√≥n del modelo guardada")
        
        # Calcular checksum
        checksum = self._calculate_directory_checksum(model_path)
        checksum_path = model_path / "checksum.txt"
        with open(checksum_path, 'w') as f:
            f.write(checksum)
        print(f"‚úì Checksum calculado: {checksum[:16]}...")
        
        print(f"\n‚úì Modelo guardado exitosamente en: {model_path}")
        print("="*60 + "\n")
        
        return model_path, self.metadata
    
    def _calculate_directory_checksum(self, directory):
        """
        Calcula checksum de un directorio
        """
        hash_md5 = hashlib.md5()
        
        for filepath in sorted(Path(directory).rglob('*')):
            if filepath.is_file():
                hash_md5.update(str(filepath).encode())
        
        return hash_md5.hexdigest()
    
    def load_model(self, model_path):
        """
        Carga un modelo guardado
        """
        print(f"\nCargando modelo desde: {model_path}")
        
        model_path = Path(model_path)
        spark_model_path = model_path / "spark_model"
        
        if not spark_model_path.exists():
            raise FileNotFoundError(f"No se encontr√≥ el modelo en: {spark_model_path}")
        
        # Cargar modelo
        model = ALSModel.load(str(spark_model_path))
        print(f"‚úì Modelo cargado")
        
        # Cargar metadatos
        metadata_path = model_path / "metadata.json"
        if metadata_path.exists():
            with open(metadata_path, 'r') as f:
                self.metadata = json.load(f)
            print(f"‚úì Metadatos cargados (versi√≥n {self.metadata.get('version', 'N/A')})")
        
        return model, self.metadata
    
    def list_models(self):
        """
        Lista todos los modelos guardados
        """
        models = []
        
        for folder in sorted(self.base_path.iterdir()):
            if folder.is_dir() and folder.name.startswith("als_model_v"):
                metadata_path = folder / "metadata.json"
                
                if metadata_path.exists():
                    with open(metadata_path, 'r') as f:
                        metadata = json.load(f)
                    models.append({
                        'name': folder.name,
                        'path': str(folder),
                        'version': metadata.get('version', 'N/A'),
                        'timestamp': metadata.get('timestamp', 'N/A'),
                        'metrics': metadata.get('metrics', {})
                    })
        
        return models
    
    def cleanup_old_versions(self, keep_versions=None):
        """
        Elimina versiones antiguas del modelo
        """
        if keep_versions is None:
            keep_versions = Config.MAX_MODEL_VERSIONS
        
        models = self.list_models()
        
        if len(models) <= keep_versions:
            print(f"‚úì Solo hay {len(models)} versiones, no se requiere limpieza")
            return
        
        # Ordenar por timestamp (m√°s recientes primero)
        models.sort(key=lambda x: x['timestamp'], reverse=True)
        
        # Eliminar versiones antiguas
        to_delete = models[keep_versions:]
        
        print(f"\nEliminando {len(to_delete)} versiones antiguas...")
        
        for model_info in to_delete:
            model_path = Path(model_info['path'])
            if model_path.exists():
                shutil.rmtree(model_path)
                print(f"  ‚úì Eliminado: {model_info['name']}")
        
        print(f"‚úì Limpieza completada, manteniendo {keep_versions} versiones")
    
    def create_download_package(self, model_path, output_name=None):
        """
        Crea un paquete comprimido para descarga
        """
        model_path = Path(model_path)
        
        if output_name is None:
            output_name = f"{model_path.name}_package"
        
        output_path = self.base_path / output_name
        
        print(f"\nCreando paquete de descarga...")
        print(f"Origen: {model_path}")
        print(f"Destino: {output_path}.tar.gz")
        
        # Crear archivo tar.gz
        import tarfile
        
        with tarfile.open(f"{output_path}.tar.gz", "w:gz") as tar:
            tar.add(model_path, arcname=model_path.name)
        
        package_size = os.path.getsize(f"{output_path}.tar.gz") / (1024 * 1024)
        
        print(f"‚úì Paquete creado: {output_path}.tar.gz")
        print(f"  Tama√±o: {package_size:.2f} MB")
        
        return f"{output_path}.tar.gz"

# Inicializar manager
model_manager = ModelManager()

In [None]:
# Guardar modelo con m√©tricas
model_path, metadata = model_manager.save_model(
    model=model,
    metrics=evaluator.metrics,
    training_params=trainer.training_metrics
)

In [None]:
# Guardar m√©tricas separadamente
metrics_file = evaluator.save_metrics()

In [None]:
# Listar modelos guardados
print("\nModelos guardados:")
print("="*80)
saved_models = model_manager.list_models()

for i, model_info in enumerate(saved_models, 1):
    print(f"\n{i}. {model_info['name']}")
    print(f"   Versi√≥n: {model_info['version']}")
    print(f"   Fecha: {model_info['timestamp']}")
    if 'rmse' in model_info['metrics']:
        print(f"   RMSE: {model_info['metrics']['rmse']:.4f}")

## 6. Sistema de Descarga del Modelo Entrenado

In [None]:
# Crear paquete de descarga
if saved_models:
    latest_model = saved_models[-1]
    package_path = model_manager.create_download_package(latest_model['path'])
    
    print(f"\nüì¶ Paquete listo para descarga:")
    print(f"   {package_path}")
    print(f"\nüí° Para descargar desde Kaggle:")
    print(f"   1. Ve a 'Output' en el panel derecho")
    print(f"   2. Descarga el archivo .tar.gz")
else:
    print("‚ö† No hay modelos guardados para empaquetar")

In [None]:
class LocalModelLoader:
    """
    Carga y usa el modelo en el sistema local (fuera de Kaggle)
    """
    
    def __init__(self, spark_session=None):
        self.spark = spark_session
        self.model = None
        self.metadata = None
    
    def extract_model_package(self, package_path, extract_to="./models"):
        """
        Extrae el paquete del modelo descargado de Kaggle
        """
        import tarfile
        
        extract_path = Path(extract_to)
        extract_path.mkdir(parents=True, exist_ok=True)
        
        print(f"Extrayendo modelo desde: {package_path}")
        print(f"Destino: {extract_path}")
        
        with tarfile.open(package_path, "r:gz") as tar:
            tar.extractall(path=extract_path)
        
        # Buscar el directorio del modelo
        extracted_folders = list(extract_path.iterdir())
        if extracted_folders:
            model_dir = extracted_folders[0]
            print(f"‚úì Modelo extra√≠do en: {model_dir}")
            return model_dir
        else:
            raise FileNotFoundError("No se encontr√≥ el modelo extra√≠do")
    
    def load_from_path(self, model_path):
        """
        Carga el modelo desde una ruta local
        """
        if self.spark is None:
            raise ValueError("Se requiere una SparkSession activa")
        
        model_path = Path(model_path)
        spark_model_path = model_path / "spark_model"
        
        print(f"\nCargando modelo local desde: {model_path}")
        
        if not spark_model_path.exists():
            raise FileNotFoundError(f"No se encontr√≥ spark_model en: {spark_model_path}")
        
        # Cargar modelo
        self.model = ALSModel.load(str(spark_model_path))
        print(f"‚úì Modelo ALS cargado")
        
        # Cargar metadatos
        metadata_path = model_path / "metadata.json"
        if metadata_path.exists():
            with open(metadata_path, 'r') as f:
                self.metadata = json.load(f)
            print(f"‚úì Metadatos cargados")
            print(f"   - Versi√≥n: {self.metadata.get('version', 'N/A')}")
            print(f"   - Entrenado: {self.metadata.get('timestamp', 'N/A')}")
            if 'metrics' in self.metadata and 'rmse' in self.metadata['metrics']:
                print(f"   - RMSE: {self.metadata['metrics']['rmse']:.4f}")
        
        return self.model
    
    def recommend_for_user(self, user_id, num_recommendations=10):
        """
        Genera recomendaciones para un usuario espec√≠fico
        """
        if self.model is None:
            raise ValueError("Primero debes cargar un modelo")
        
        # Crear DataFrame con el usuario
        user_df = self.spark.createDataFrame([(user_id,)], ["userId"])
        
        # Generar recomendaciones
        recommendations = self.model.recommendForUserSubset(user_df, num_recommendations)
        
        # Convertir a formato legible
        recs_collected = recommendations.collect()
        
        if not recs_collected:
            return []
        
        recommendations_list = []
        for row in recs_collected:
            for rec in row['recommendations']:
                recommendations_list.append({
                    'movieId': rec['movieId'],
                    'score': float(rec['rating'])
                })
        
        return recommendations_list
    
    def recommend_for_users(self, user_ids, num_recommendations=10):
        """
        Genera recomendaciones para m√∫ltiples usuarios
        """
        if self.model is None:
            raise ValueError("Primero debes cargar un modelo")
        
        # Crear DataFrame con los usuarios
        users_df = self.spark.createDataFrame([(uid,) for uid in user_ids], ["userId"])
        
        # Generar recomendaciones
        recommendations = self.model.recommendForUserSubset(users_df, num_recommendations)
        
        return recommendations
    
    def predict_rating(self, user_id, movie_id):
        """
        Predice el rating de un usuario para una pel√≠cula espec√≠fica
        """
        if self.model is None:
            raise ValueError("Primero debes cargar un modelo")
        
        # Crear DataFrame con el par usuario-pel√≠cula
        test_df = self.spark.createDataFrame(
            [(user_id, movie_id)], 
            ["userId", "movieId"]
        )
        
        # Predecir
        prediction = self.model.transform(test_df)
        
        # Obtener resultado
        result = prediction.select("prediction").collect()
        
        if result and result[0]['prediction'] is not None:
            return float(result[0]['prediction'])
        else:
            return None  # Cold start
    
    def batch_predict(self, user_movie_pairs):
        """
        Predice ratings para m√∫ltiples pares usuario-pel√≠cula
        
        Args:
            user_movie_pairs: Lista de tuplas (userId, movieId)
        """
        if self.model is None:
            raise ValueError("Primero debes cargar un modelo")
        
        # Crear DataFrame
        test_df = self.spark.createDataFrame(
            user_movie_pairs, 
            ["userId", "movieId"]
        )
        
        # Predecir
        predictions = self.model.transform(test_df)
        
        return predictions

# Ejemplo de uso en sistema local
print("="*60)
print("EJEMPLO DE USO EN SISTEMA LOCAL")
print("="*60)
print("""
# 1. Crear SparkSession en tu sistema local
from pyspark.sql import SparkSession

spark_local = SparkSession.builder \\
    .appName("ALS-Predictions") \\
    .config("spark.driver.memory", "4g") \\
    .getOrCreate()

# 2. Cargar el modelo descargado
loader = LocalModelLoader(spark_local)
model = loader.load_from_path("./models/als_model_v1_20251208_143022")

# 3. Generar recomendaciones
recommendations = loader.recommend_for_user(user_id=123, num_recommendations=10)
print(recommendations)

# 4. Predecir rating espec√≠fico
rating = loader.predict_rating(user_id=123, movie_id=456)
print(f"Rating predicho: {rating:.2f}")
""")

## 8. Reentrenamiento Incremental

In [None]:
class IncrementalTrainer:
    """
    Maneja el reentrenamiento incremental del modelo
    """
    
    def __init__(self, spark, model_manager):
        self.spark = spark
        self.model_manager = model_manager
        self.current_model = None
        self.training_history = []
    
    def check_retraining_needed(self, new_ratings_df, last_training_size):
        """
        Determina si se necesita reentrenamiento basado en nuevos datos
        """
        new_count = new_ratings_df.count()
        
        if last_training_size == 0:
            return True, "No hay modelo entrenado"
        
        growth_ratio = new_count / last_training_size
        
        if growth_ratio >= Config.INCREMENTAL_THRESHOLD:
            return True, f"Crecimiento de datos: {growth_ratio:.1%}"
        
        return False, f"Crecimiento insuficiente: {growth_ratio:.1%}"
    
    def incremental_retrain(self, existing_model, new_ratings_df, full_ratings_df=None):
        """
        Realiza reentrenamiento incremental
        
        Estrategia:
        1. Si hay pocos datos nuevos (< 10%): solo actualizar factores
        2. Si hay muchos datos nuevos (>= 10%): reentrenar completo
        """
        print("\n" + "="*60)
        print("REENTRENAMIENTO INCREMENTAL")
        print("="*60)
        
        new_count = new_ratings_df.count()
        
        if full_ratings_df is None:
            full_ratings_df = new_ratings_df
        
        total_count = full_ratings_df.count()
        
        print(f"\nNuevos ratings: {new_count:,}")
        print(f"Total ratings: {total_count:,}")
        print(f"Proporci√≥n nueva: {new_count/total_count:.1%}")
        
        # Decidir estrategia
        if new_count / total_count < Config.INCREMENTAL_THRESHOLD:
            print("\n‚Üí Estrategia: Actualizaci√≥n r√°pida (pocos datos nuevos)")
            return self._quick_update(existing_model, new_ratings_df, full_ratings_df)
        else:
            print("\n‚Üí Estrategia: Reentrenamiento completo (muchos datos nuevos)")
            return self._full_retrain(full_ratings_df)
    
    def _quick_update(self, existing_model, new_ratings_df, full_ratings_df):
        """
        Actualizaci√≥n r√°pida para pocos datos nuevos
        
        Nota: Spark ALS no soporta warm-start nativo,
        pero podemos re-entrenar con menos iteraciones
        """
        print("\nReentrenando con par√°metros reducidos...")
        
        # Obtener par√°metros del modelo existente
        rank = existing_model.rank
        
        # Re-entrenar con menos iteraciones
        als = ALS(
            rank=rank,
            maxIter=3,  # Pocas iteraciones para actualizaci√≥n r√°pida
            regParam=Config.DEFAULT_REG_PARAM,
            userCol="userId",
            itemCol="movieId",
            ratingCol="rating",
            coldStartStrategy="drop",
            nonnegative=True,
            seed=42
        )
        
        start_time = datetime.now()
        new_model = als.fit(full_ratings_df)
        duration = (datetime.now() - start_time).total_seconds()
        
        print(f"‚úì Actualizaci√≥n completada en {duration:.2f} segundos")
        
        return new_model, {
            'strategy': 'quick_update',
            'duration': duration,
            'iterations': 3
        }
    
    def _full_retrain(self, full_ratings_df):
        """
        Reentrenamiento completo con todos los datos
        """
        print("\nReentrenando modelo completo...")
        
        als = ALS(
            rank=Config.DEFAULT_RANK,
            maxIter=Config.DEFAULT_MAX_ITER,
            regParam=Config.DEFAULT_REG_PARAM,
            userCol="userId",
            itemCol="movieId",
            ratingCol="rating",
            coldStartStrategy="drop",
            nonnegative=True,
            seed=42,
            checkpointInterval=10
        )
        
        start_time = datetime.now()
        new_model = als.fit(full_ratings_df)
        duration = (datetime.now() - start_time).total_seconds()
        
        print(f"‚úì Reentrenamiento completo en {duration:.2f} segundos")
        
        return new_model, {
            'strategy': 'full_retrain',
            'duration': duration,
            'iterations': Config.DEFAULT_MAX_ITER
        }
    
    def auto_retrain_pipeline(self, ratings_df, force_retrain=False):
        """
        Pipeline autom√°tico de reentrenamiento
        
        Flujo:
        1. Buscar modelo existente
        2. Si no existe, entrenar nuevo modelo (quick)
        3. Si existe, evaluar si reentrenar
        4. Guardar nuevo modelo si mejora
        """
        print("\n" + "="*60)
        print("PIPELINE AUTOM√ÅTICO DE REENTRENAMIENTO")
        print("="*60)
        
        # Buscar modelo existente
        existing_models = self.model_manager.list_models()
        
        if not existing_models and not force_retrain:
            print("\n‚Üí No se encontr√≥ modelo existente")
            print("‚Üí Entrenando modelo inicial con par√°metros optimizados...")
            
            # Entrenar modelo inicial r√°pido
            als = ALS(
                rank=Config.DEFAULT_RANK,
                maxIter=Config.DEFAULT_MAX_ITER,
                regParam=Config.DEFAULT_REG_PARAM,
                userCol="userId",
                itemCol="movieId",
                ratingCol="rating",
                coldStartStrategy="drop",
                nonnegative=True,
                seed=42
            )
            
            start_time = datetime.now()
            new_model = als.fit(ratings_df)
            duration = (datetime.now() - start_time).total_seconds()
            
            print(f"‚úì Modelo inicial entrenado en {duration:.2f} segundos")
            
            # Evaluar
            train_temp, test_temp = ratings_df.randomSplit([0.8, 0.2], seed=42)
            evaluator_temp = ModelEvaluator(new_model, self.spark)
            metrics_temp = evaluator_temp.evaluate_regression_metrics(test_temp)
            
            # Guardar
            self.model_manager.save_model(
                model=new_model,
                metrics=metrics_temp,
                training_params={
                    'strategy': 'initial_training',
                    'duration': duration,
                    'rank': Config.DEFAULT_RANK,
                    'max_iter': Config.DEFAULT_MAX_ITER
                }
            )
            
            self.current_model = new_model
            
            return new_model, metrics_temp, 'initial_training'
        
        elif not existing_models and force_retrain:
            print("\n‚Üí Forzando entrenamiento inicial...")
            # Mismo flujo que arriba
            return self.auto_retrain_pipeline(ratings_df, force_retrain=False)
        
        else:
            print(f"\n‚Üí Encontrados {len(existing_models)} modelos existentes")
            
            # Cargar modelo m√°s reciente
            latest_model_info = existing_models[-1]
            print(f"‚Üí Cargando modelo: {latest_model_info['name']}")
            
            existing_model, metadata = self.model_manager.load_model(
                latest_model_info['path']
            )
            
            last_training_size = metadata.get('training_params', {}).get('train_size', 0)
            
            # Verificar si se necesita reentrenamiento
            current_size = ratings_df.count()
            needs_retrain, reason = self.check_retraining_needed(
                ratings_df, 
                last_training_size
            )
            
            print(f"\n‚Üí Evaluaci√≥n de reentrenamiento:")
            print(f"   Raz√≥n: {reason}")
            print(f"   ¬øReentrenar?: {'S√≠' if needs_retrain else 'No'}")
            
            if not needs_retrain and not force_retrain:
                print("\n‚úì No se requiere reentrenamiento")
                self.current_model = existing_model
                return existing_model, metadata.get('metrics', {}), 'no_retrain'
            
            # Realizar reentrenamiento
            print("\n‚Üí Iniciando reentrenamiento...")
            
            new_model, retrain_info = self._full_retrain(ratings_df)
            
            # Evaluar nuevo modelo
            train_temp, test_temp = ratings_df.randomSplit([0.8, 0.2], seed=42)
            evaluator_temp = ModelEvaluator(new_model, self.spark)
            new_metrics = evaluator_temp.evaluate_regression_metrics(test_temp)
            
            # Comparar con modelo anterior
            old_rmse = metadata.get('metrics', {}).get('rmse', float('inf'))
            new_rmse = new_metrics.get('rmse', float('inf'))
            
            print(f"\n‚Üí Comparaci√≥n de m√©tricas:")
            print(f"   RMSE anterior: {old_rmse:.4f}")
            print(f"   RMSE nuevo: {new_rmse:.4f}")
            print(f"   Mejora: {((old_rmse - new_rmse) / old_rmse * 100):.2f}%")
            
            if new_rmse <= old_rmse or force_retrain:
                print("\n‚úì Guardando nuevo modelo (mejor o forzado)...")
                
                self.model_manager.save_model(
                    model=new_model,
                    metrics=new_metrics,
                    training_params={
                        **retrain_info,
                        'train_size': current_size,
                        'previous_rmse': old_rmse
                    }
                )
                
                # Limpiar versiones antiguas
                self.model_manager.cleanup_old_versions()
                
                self.current_model = new_model
                
                return new_model, new_metrics, 'retrained'
            else:
                print("\n‚ö† Modelo nuevo no mejora, manteniendo modelo anterior")
                self.current_model = existing_model
                return existing_model, metadata.get('metrics', {}), 'kept_old'

# Inicializar trainer incremental
incremental_trainer = IncrementalTrainer(spark, model_manager)

## 9. Sistema de Actualizaci√≥n Autom√°tica

In [None]:
class AutoUpdateSystem:
    """
    Sistema de actualizaci√≥n autom√°tica para producci√≥n
    """
    
    def __init__(self, spark, model_manager, incremental_trainer):
        self.spark = spark
        self.model_manager = model_manager
        self.incremental_trainer = incremental_trainer
        self.update_log = []
    
    def schedule_info(self):
        """
        Informaci√≥n sobre c√≥mo programar actualizaciones autom√°ticas
        """
        print("="*80)
        print("PROGRAMACI√ìN DE ACTUALIZACIONES AUTOM√ÅTICAS")
        print("="*80)
        
        print("""
### Opci√≥n 1: Cron Job (Linux/Mac)

Crear script: /home/user/als_training/update_model.sh
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
#!/bin/bash
cd /home/user/als_training
source venv/bin/activate
python run_update.py >> logs/update_$(date +%Y%m%d).log 2>&1
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ

Agregar a crontab:
# Actualizar modelo cada d√≠a a las 2 AM
0 2 * * * /home/user/als_training/update_model.sh

# Actualizar modelo cada semana (domingo 3 AM)
0 3 * * 0 /home/user/als_training/update_model.sh


### Opci√≥n 2: Kaggle Notebooks Schedule

1. En tu notebook de Kaggle, habilita "Schedule Run"
2. Configura frecuencia: Daily, Weekly, etc.
3. El notebook se ejecutar√° autom√°ticamente


### Opci√≥n 3: Apache Airflow DAG

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

dag = DAG(
    'als_model_update',
    default_args={
        'owner': 'data-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    },
    schedule_interval='0 2 * * *',  # Diario 2 AM
    start_date=datetime(2025, 1, 1),
)

update_task = BashOperator(
    task_id='update_als_model',
    bash_command='python /path/to/run_update.py',
    dag=dag,
)


### Opci√≥n 4: Cloud Functions (AWS Lambda / GCP)

Configurar trigger schedule con CloudWatch Events o Cloud Scheduler
        """)
    
    def create_update_script(self, output_path="./run_update.py"):
        """
        Crea script Python para actualizaciones autom√°ticas
        """
        script_content = '''#!/usr/bin/env python3
"""
Script de actualizaci√≥n autom√°tica del modelo ALS
Ejecutar con: python run_update.py
"""

import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pathlib import Path

# Importar tus clases (ajustar rutas seg√∫n tu proyecto)
# from model_manager import ModelManager, IncrementalTrainer, DataLoader

def main():
    print(f"Inicio de actualizaci√≥n: {datetime.now()}")
    
    try:
        # 1. Crear SparkSession
        spark = SparkSession.builder \\
            .appName("ALS-Auto-Update") \\
            .config("spark.driver.memory", "4g") \\
            .config("spark.sql.shuffle.partitions", "200") \\
            .getOrCreate()
        
        print("‚úì Spark iniciado")
        
        # 2. Cargar datos nuevos
        # Ajustar seg√∫n tu fuente de datos
        ratings_path = "/path/to/ratings/data"
        ratings_df = spark.read.parquet(ratings_path)
        
        print(f"‚úì Datos cargados: {ratings_df.count():,} registros")
        
        # 3. Inicializar sistema
        model_manager = ModelManager()
        incremental_trainer = IncrementalTrainer(spark, model_manager)
        
        # 4. Ejecutar pipeline de reentrenamiento
        new_model, metrics, action = incremental_trainer.auto_retrain_pipeline(
            ratings_df=ratings_df,
            force_retrain=False
        )
        
        print(f"‚úì Pipeline completado: {action}")
        print(f"  RMSE: {metrics.get('rmse', 'N/A')}")
        
        # 5. Crear paquete de descarga si hay nuevo modelo
        if action in ['initial_training', 'retrained']:
            models = model_manager.list_models()
            if models:
                latest = models[-1]
                package = model_manager.create_download_package(latest['path'])
                print(f"‚úì Paquete creado: {package}")
        
        spark.stop()
        
        print(f"‚úì Actualizaci√≥n completada: {datetime.now()}")
        return 0
        
    except Exception as e:
        print(f"‚úó Error durante actualizaci√≥n: {e}")
        import traceback
        traceback.print_exc()
        return 1

if __name__ == "__main__":
    sys.exit(main())
'''
        
        output_path = Path(output_path)
        with open(output_path, 'w') as f:
            f.write(script_content)
        
        # Hacer ejecutable
        import os
        os.chmod(output_path, 0o755)
        
        print(f"‚úì Script de actualizaci√≥n creado: {output_path}")
        print(f"  Ejecutar con: python {output_path}")
        
        return output_path
    
    def simulate_streaming_update(self, base_ratings_df, new_data_ratio=0.05):
        """
        Simula actualizaci√≥n con nuevos datos (como streaming)
        """
        print("\n" + "="*60)
        print("SIMULACI√ìN DE ACTUALIZACI√ìN CON NUEVOS DATOS")
        print("="*60)
        
        total_count = base_ratings_df.count()
        
        # Simular "nuevos datos" tomando una muestra
        new_sample_size = int(total_count * new_data_ratio)
        
        print(f"\nSimulando {new_sample_size:,} nuevos ratings ({new_data_ratio:.1%})")
        
        # En producci√≥n, esto ser√≠an datos reales nuevos
        new_ratings = base_ratings_df.sample(fraction=new_data_ratio, seed=123)
        
        # Combinar con datos existentes
        combined_df = base_ratings_df.union(new_ratings).distinct()
        
        print(f"Total despu√©s de nuevos datos: {combined_df.count():,}")
        
        # Ejecutar pipeline de actualizaci√≥n
        updated_model, metrics, action = self.incremental_trainer.auto_retrain_pipeline(
            ratings_df=combined_df,
            force_retrain=False
        )
        
        # Registrar actualizaci√≥n
        self.update_log.append({
            'timestamp': datetime.now().isoformat(),
            'action': action,
            'new_data_ratio': new_data_ratio,
            'metrics': metrics
        })
        
        print(f"\n‚úì Simulaci√≥n completada")
        print(f"  Acci√≥n tomada: {action}")
        
        return updated_model, metrics
    
    def get_update_history(self):
        """
        Obtiene historial de actualizaciones
        """
        if not self.update_log:
            print("No hay historial de actualizaciones")
            return []
        
        print("\n" + "="*60)
        print("HISTORIAL DE ACTUALIZACIONES")
        print("="*60)
        
        for i, entry in enumerate(self.update_log, 1):
            print(f"\n{i}. {entry['timestamp']}")
            print(f"   Acci√≥n: {entry['action']}")
            print(f"   Datos nuevos: {entry['new_data_ratio']:.1%}")
            if 'rmse' in entry['metrics']:
                print(f"   RMSE: {entry['metrics']['rmse']:.4f}")
        
        return self.update_log
    
    def health_check(self):
        """
        Verifica el estado del sistema
        """
        print("\n" + "="*60)
        print("HEALTH CHECK DEL SISTEMA")
        print("="*60)
        
        checks = []
        
        # 1. Verificar modelos guardados
        models = self.model_manager.list_models()
        if models:
            checks.append(("‚úì", f"Modelos guardados: {len(models)}"))
            latest = models[-1]
            checks.append(("‚úì", f"√öltimo modelo: {latest['name']}"))
        else:
            checks.append(("‚úó", "No hay modelos guardados"))
        
        # 2. Verificar Spark
        try:
            spark_version = self.spark.version
            checks.append(("‚úì", f"Spark activo: v{spark_version}"))
        except:
            checks.append(("‚úó", "Spark no disponible"))
        
        # 3. Verificar directorios
        for path_name, path in [
            ("Modelos", Config.MODEL_PATH),
            ("M√©tricas", Config.METRICS_PATH),
            ("Logs", Config.LOGS_PATH)
        ]:
            if path.exists():
                checks.append(("‚úì", f"Directorio {path_name}: OK"))
            else:
                checks.append(("‚úó", f"Directorio {path_name}: No existe"))
        
        # Imprimir resultados
        for status, message in checks:
            print(f"  {status} {message}")
        
        # Resumen
        passed = sum(1 for s, _ in checks if s == "‚úì")
        total = len(checks)
        
        print(f"\n{'='*60}")
        print(f"RESULTADO: {passed}/{total} checks pasados")
        
        if passed == total:
            print("‚úì Sistema saludable")
        elif passed >= total * 0.7:
            print("‚ö† Sistema funcional con advertencias")
        else:
            print("‚úó Sistema requiere atenci√≥n")
        
        print("="*60)
        
        return passed == total

# Inicializar sistema de actualizaci√≥n
auto_update = AutoUpdateSystem(spark, model_manager, incremental_trainer)

## 10. Validaci√≥n y Monitoreo del Sistema

---

## üéì Gu√≠a de Uso Completa

### üìù Flujo de Trabajo Recomendado

#### **En Kaggle (Entrenamiento):**

1. **Primera Ejecuci√≥n:**
   - Ejecuta todas las celdas secuencialmente
   - El sistema detecta que no hay modelo y entrena uno nuevo (rank=10, iter=5)
   - Tiempo estimado: ~5-8 minutos
   
2. **Ejecuciones Posteriores:**
   - El sistema detecta modelo existente
   - Eval√∫a si se necesita reentrenamiento
   - Solo reentrena si hay suficientes datos nuevos

3. **Descarga:**
   - Ve a Output ‚Üí descarga el `.tar.gz`

---

#### **En Sistema Local (Predicci√≥n):**

```python
# 1. Instalar dependencias
pip install pyspark==3.5.0

# 2. Extraer modelo
tar -xzf als_model_v1_*.tar.gz -C ./models/

# 3. Usar modelo
from pyspark.sql import SparkSession
from local_model_loader import LocalModelLoader

spark = SparkSession.builder \
    .appName("ALS-Predictions") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

loader = LocalModelLoader(spark)
model = loader.load_from_path("./models/als_model_v1_...")

# Recomendar para usuario
recs = loader.recommend_for_user(user_id=123, num_recommendations=10)
print(recs)
```

---

### üîÑ Automatizaci√≥n en Producci√≥n

#### **Opci√≥n 1: Script Python Standalone**

```bash
# Ejecutar manualmente
python run_update.py

# O programar con cron
0 2 * * * /path/to/run_update.py >> logs/update.log 2>&1
```

#### **Opci√≥n 2: Kaggle Schedule**
- Configura "Schedule Run" en Kaggle
- El notebook se ejecuta autom√°ticamente
- Descarga modelos v√≠a API de Kaggle

#### **Opci√≥n 3: Integraci√≥n con tu Sistema**

```python
# En tu sistema Spark existente
from incremental_trainer import IncrementalTrainer
from model_manager import ModelManager

# Cargar ratings desde HDFS
ratings_df = spark.read.parquet("hdfs://namenode:9000/streams/ratings/raw")

# Ejecutar pipeline
manager = ModelManager("hdfs://namenode:9000/models/als")
trainer = IncrementalTrainer(spark, manager)

model, metrics, action = trainer.auto_retrain_pipeline(ratings_df)

# Servir recomendaciones
recommendations = model.recommendForAllUsers(10)
recommendations.write.parquet("hdfs://namenode:9000/recommendations/latest")
```

---

### üìä Monitoreo de M√©tricas

**M√©tricas Clave:**
- **RMSE < 0.90**: Excelente
- **RMSE 0.90-1.00**: Bueno
- **RMSE > 1.00**: Requiere optimizaci√≥n

**Precision@10 > 0.15**: Buen rendimiento para MovieLens

---

### üîß Troubleshooting

| Problema | Soluci√≥n |
|----------|----------|
| `OutOfMemoryError` | Reducir `rank` a 5-10 o `spark.driver.memory` |
| Entrenamiento muy lento | Reducir `maxIter` a 3-5 |
| Cold start en predicciones | Usar `coldStartStrategy="drop"` |
| Modelo no mejora | Aumentar datos de entrenamiento o ajustar `regParam` |

---

### üì¶ Estructura de Archivos Generados

```
/kaggle/working/
‚îú‚îÄ‚îÄ models/
‚îÇ   ‚îî‚îÄ‚îÄ als_model_v1_20251208_143022/
‚îÇ       ‚îú‚îÄ‚îÄ spark_model/           # Modelo Spark
‚îÇ       ‚îú‚îÄ‚îÄ metadata.json           # Metadatos y m√©tricas
‚îÇ       ‚îú‚îÄ‚îÄ model_info.json         # Info de factores
‚îÇ       ‚îî‚îÄ‚îÄ checksum.txt            # Validaci√≥n
‚îú‚îÄ‚îÄ metrics/
‚îÇ   ‚îî‚îÄ‚îÄ metrics_20251208_143022.json
‚îú‚îÄ‚îÄ logs/
‚îî‚îÄ‚îÄ run_update.py                   # Script de actualizaci√≥n
```

---

### üöÄ Escalabilidad y Mantenimiento

**Dise√±o Escalable:**
- ‚úÖ Versionado autom√°tico de modelos
- ‚úÖ Limpieza de versiones antiguas
- ‚úÖ Checkpoints para fault tolerance
- ‚úÖ Actualizaci√≥n incremental inteligente
- ‚úÖ M√©tricas hist√≥ricas

**Mantenimiento:**
- Revisa logs de actualizaci√≥n regularmente
- Monitorea degradaci√≥n de m√©tricas
- Ajusta `INCREMENTAL_THRESHOLD` seg√∫n necesidad
- Mant√©n m√°ximo 5 versiones (configurable)

---

**üéâ Sistema Listo para Producci√≥n!**

In [None]:
# Resumen final del sistema
print("\n" + "="*80)
print("RESUMEN FINAL DEL SISTEMA ALS")
print("="*80)

print("\nüìä Estad√≠sticas de Datos:")
print(f"  - Total de ratings: {stats.get('total_ratings', 'N/A'):,}")
print(f"  - Usuarios √∫nicos: {stats.get('unique_users', 'N/A'):,}")
print(f"  - Pel√≠culas √∫nicas: {stats.get('unique_movies', 'N/A'):,}")
print(f"  - Sparsity: {stats.get('sparsity', 0):.2%}")

print("\nüéØ Modelo Activo:")
models = model_manager.list_models()
if models:
    latest = models[-1]
    print(f"  - Versi√≥n: {latest['version']}")
    print(f"  - Nombre: {latest['name']}")
    print(f"  - Fecha: {latest['timestamp']}")
    if 'rmse' in latest['metrics']:
        print(f"  - RMSE: {latest['metrics']['rmse']:.4f}")
    if 'mae' in latest['metrics']:
        print(f"  - MAE: {latest['metrics']['mae']:.4f}")
else:
    print("  ‚ö† No hay modelos guardados")

print("\nüì¶ Archivos Generados:")
print(f"  - Directorio de modelos: {Config.MODEL_PATH}")
print(f"  - Directorio de m√©tricas: {Config.METRICS_PATH}")
print(f"  - Directorio de logs: {Config.LOGS_PATH}")

print("\nüöÄ Pr√≥ximos Pasos:")
print("""
1. DESCARGA DEL MODELO:
   - Ve a 'Output' en el panel derecho de Kaggle
   - Descarga el archivo .tar.gz del modelo
   
2. USO EN SISTEMA LOCAL:
   - Extrae el modelo: tar -xzf als_model_*.tar.gz
   - Carga con LocalModelLoader
   - Genera recomendaciones
   
3. PRODUCCI√ìN:
   - Configura actualizaci√≥n autom√°tica (cron/Airflow)
   - Implementa API REST para servir recomendaciones
   - Integra con sistema de cache (Redis)
   - Monitoreo continuo de m√©tricas
   
4. INTEGRACI√ìN CON TU SISTEMA:
   - Copia el modelo a: /home/abraham/Escritorio/PGVD/.../movies/models/
   - Crea servicio de recomendaciones
   - Conecta con Kafka para actualizaciones en tiempo real
""")

print("="*80)

In [None]:
# Simular actualizaci√≥n con nuevos datos (opcional)
# Descomenta para probar el flujo de actualizaci√≥n incremental

# print("\n" + "="*80)
# print("SIMULACI√ìN DE ACTUALIZACI√ìN INCREMENTAL")
# print("="*80)
# 
# updated_model, updated_metrics = auto_update.simulate_streaming_update(
#     base_ratings_df=ratings_df,
#     new_data_ratio=0.05  # 5% de nuevos datos
# )

In [None]:
# Realizar health check del sistema
system_healthy = auto_update.health_check()

In [None]:
# Mostrar informaci√≥n de programaci√≥n
auto_update.schedule_info()

In [None]:
# Crear script de actualizaci√≥n autom√°tica
update_script = auto_update.create_update_script(output_path=Config.BASE_PATH / "run_update.py")

In [None]:
# Probar pipeline autom√°tico de reentrenamiento
# Este es el punto de entrada principal del sistema

print("="*80)
print("EJECUTANDO PIPELINE AUTOM√ÅTICO")
print("="*80)
print("""
Este pipeline:
1. Verifica si existe un modelo entrenado
2. Si NO existe: entrena uno nuevo con rank=10, iter=5 (r√°pido)
3. Si existe: eval√∫a si necesita reentrenamiento
4. Guarda el mejor modelo autom√°ticamente
""")

# Ejecutar pipeline
final_model, final_metrics, action_taken = incremental_trainer.auto_retrain_pipeline(
    ratings_df=ratings_df,
    force_retrain=False  # Cambiar a True para forzar reentrenamiento
)

print(f"\n{'='*80}")
print(f"RESULTADO DEL PIPELINE: {action_taken.upper()}")
print(f"{'='*80}")

if final_metrics:
    print(f"\nM√©tricas del modelo activo:")
    if 'rmse' in final_metrics:
        print(f"  - RMSE: {final_metrics['rmse']:.4f}")
    if 'mae' in final_metrics:
        print(f"  - MAE: {final_metrics['mae']:.4f}")
    if 'r2' in final_metrics:
        print(f"  - R¬≤: {final_metrics['r2']:.4f}")

In [None]:
# Demostraci√≥n de predicci√≥n con el modelo actual
print("\n" + "="*60)
print("DEMOSTRACI√ìN DE PREDICCIONES")
print("="*60)

# Seleccionar usuarios de ejemplo
sample_users = train_df.select("userId").distinct().limit(3).collect()
user_ids = [row['userId'] for row in sample_users]

print(f"\nUsuarios de ejemplo: {user_ids}")

# Generar recomendaciones para cada usuario
for user_id in user_ids:
    print(f"\n{'‚îÄ'*60}")
    print(f"Recomendaciones para Usuario {user_id}:")
    print(f"{'‚îÄ'*60}")
    
    # Crear DataFrame con el usuario
    user_df = spark.createDataFrame([(user_id,)], ["userId"])
    
    # Generar top-5 recomendaciones
    recs = model.recommendForUserSubset(user_df, 5)
    recs_list = recs.collect()
    
    if recs_list:
        for i, rec in enumerate(recs_list[0]['recommendations'], 1):
            movie_id = rec['movieId']
            score = rec['rating']
            
            # Buscar t√≠tulo de la pel√≠cula si est√° disponible
            if movies_df is not None:
                movie_info = movies_df.filter(F.col("movieId") == movie_id).select("title").collect()
                title = movie_info[0]['title'] if movie_info else f"Movie {movie_id}"
            else:
                title = f"Movie {movie_id}"
            
            print(f"  {i}. {title} (Score: {score:.2f})")
    else:
        print("  No hay recomendaciones disponibles")

print("\n" + "="*60)

## 7. Carga del Modelo para Predicciones Locales