# Spark + PyTorch + MLflow: Deep Learning Distribuido

## Objetivos
- Entrenar redes neuronales con PyTorch en Spark
- DistribuciÃ³n de entrenamiento usando Spark
- Tracking de experimentos con MLflow
- Implementar transfer learning con modelos pre-entrenados

## Conceptos Clave
- **PyTorch**: Framework de deep learning
- **Spark**: DistribuciÃ³n del entrenamiento
- **MLflow**: GestiÃ³n de experimentos y modelos
- **Petastorm**: Biblioteca para datasets distribuidos

## Casos de Uso
- ClasificaciÃ³n de imÃ¡genes a gran escala
- Procesamiento de NLP distribuido
- PredicciÃ³n de series temporales
- Sistemas de recomendaciÃ³n

## 1. Setup e Importaciones

In [None]:
# Importaciones
import os
import sys
import warnings
warnings.filterwarnings('ignore')

# Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import ArrayType, FloatType, StructType, StructField, IntegerType

# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torch.nn.functional as F

# MLflow
import mlflow
import mlflow.pytorch
from mlflow.tracking import MlflowClient

# Utilidades
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import matplotlib.pyplot as plt
from tqdm import tqdm

print(f"âœ“ PyTorch version: {torch.__version__}")
print(f"âœ“ CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"âœ“ CUDA version: {torch.version.cuda}")
    print(f"âœ“ Device: {torch.cuda.get_device_name(0)}")

## 2. ConfiguraciÃ³n de Spark y MLflow

In [None]:
# Crear SparkSession
spark = SparkSession.builder \
    .appName("PyTorch-Spark-MLflow") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Configurar MLflow
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("spark-pytorch-deep-learning")

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"âœ“ Spark version: {spark.version}")
print(f"âœ“ MLflow tracking URI: {mlflow.get_tracking_uri()}")
print(f"âœ“ PyTorch device: {device}")

## 3. Generar Datos con Spark

In [None]:
def generate_data():
    """Generar dataset sintÃ©tico para clasificaciÃ³n binaria"""
    print("Generando datos sintÃ©ticos...")
    
    # Generar datos de clasificaciÃ³n
    X, y = make_classification(
        n_samples=50000,
        n_features=20,
        n_informative=15,
        n_redundant=5,
        n_classes=2,
        random_state=42,
        flip_y=0.1  # AÃ±adir algo de ruido
    )
    
    # Crear DataFrame de pandas
    feature_cols = [f'feature_{i}' for i in range(X.shape[1])]
    df_pandas = pd.DataFrame(X, columns=feature_cols)
    df_pandas['label'] = y
    
    # Convertir a Spark DataFrame
    df_spark = spark.createDataFrame(df_pandas)
    
    print(f"âœ“ Dataset creado: {df_spark.count()} registros")
    print(f"âœ“ Features: {len(feature_cols)}")
    
    # DistribuciÃ³n de clases
    class_dist = df_spark.groupBy('label').count().toPandas()
    print("\nðŸ“Š DistribuciÃ³n de clases:")
    print(class_dist)
    
    return df_spark, feature_cols

# Generar datos
df_spark, feature_cols = generate_data()
df_spark.show(5)

## 4. Definir Arquitectura de Red Neuronal

In [None]:
class DeepClassifier(nn.Module):
    """Red neuronal profunda para clasificaciÃ³n binaria"""
    
    def __init__(self, input_size, hidden_sizes=[128, 64, 32], dropout=0.3):
        super(DeepClassifier, self).__init__()
        
        # Capas
        layers = []
        
        # Input layer
        layers.append(nn.Linear(input_size, hidden_sizes[0]))
        layers.append(nn.BatchNorm1d(hidden_sizes[0]))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(dropout))
        
        # Hidden layers
        for i in range(len(hidden_sizes) - 1):
            layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
            layers.append(nn.BatchNorm1d(hidden_sizes[i+1]))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
        
        # Output layer
        layers.append(nn.Linear(hidden_sizes[-1], 1))
        layers.append(nn.Sigmoid())
        
        self.network = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.network(x)
    
    def num_parameters(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)

# Crear modelo
input_size = len(feature_cols)
model = DeepClassifier(input_size=input_size, hidden_sizes=[128, 64, 32], dropout=0.3)
model = model.to(device)

print("ðŸ§  Arquitectura del Modelo:")
print(model)
print(f"\nâœ“ ParÃ¡metros totales: {model.num_parameters():,}")

## 5. Preparar Datos para PyTorch

In [None]:
def prepare_pytorch_data(df_spark, feature_cols):
    """Convertir Spark DataFrame a tensores de PyTorch"""
    print("Preparando datos para PyTorch...")
    
    # Convertir a Pandas (en producciÃ³n, usar Petastorm para grandes datasets)
    df_pandas = df_spark.toPandas()
    
    # Separar features y labels
    X = df_pandas[feature_cols].values
    y = df_pandas['label'].values.reshape(-1, 1)
    
    # Normalizar features
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    
    # Split train/val/test
    X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.2, random_state=42)
    
    # Convertir a tensores
    X_train_t = torch.FloatTensor(X_train).to(device)
    y_train_t = torch.FloatTensor(y_train).to(device)
    
    X_val_t = torch.FloatTensor(X_val).to(device)
    y_val_t = torch.FloatTensor(y_val).to(device)
    
    X_test_t = torch.FloatTensor(X_test).to(device)
    y_test_t = torch.FloatTensor(y_test).to(device)
    
    # Crear datasets
    train_dataset = TensorDataset(X_train_t, y_train_t)
    val_dataset = TensorDataset(X_val_t, y_val_t)
    test_dataset = TensorDataset(X_test_t, y_test_t)
    
    # Crear dataloaders
    batch_size = 256
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    
    print(f"âœ“ Train: {len(train_dataset)} samples")
    print(f"âœ“ Val: {len(val_dataset)} samples")
    print(f"âœ“ Test: {len(test_dataset)} samples")
    print(f"âœ“ Batch size: {batch_size}")
    
    return train_loader, val_loader, test_loader, scaler

# Preparar datos
train_loader, val_loader, test_loader, scaler = prepare_pytorch_data(df_spark, feature_cols)

## 6. Funciones de Entrenamiento

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    """Entrenar por una Ã©poca"""
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        
        # Forward pass
        output = model(data)
        loss = criterion(output, target)
        
        # Backward pass
        loss.backward()
        optimizer.step()
        
        # MÃ©tricas
        total_loss += loss.item()
        predicted = (output > 0.5).float()
        correct += (predicted == target).sum().item()
        total += target.size(0)
    
    avg_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / total
    
    return avg_loss, accuracy

def validate(model, val_loader, criterion, device):
    """Validar modelo"""
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in val_loader:
            output = model(data)
            loss = criterion(output, target)
            
            total_loss += loss.item()
            predicted = (output > 0.5).float()
            correct += (predicted == target).sum().item()
            total += target.size(0)
    
    avg_loss = total_loss / len(val_loader)
    accuracy = 100. * correct / total
    
    return avg_loss, accuracy

print("âœ“ Funciones de entrenamiento definidas")

## 7. Entrenamiento con MLflow Tracking

In [None]:
def train_with_mlflow(model, train_loader, val_loader, epochs=20):
    """Entrenar modelo con tracking en MLflow"""
    
    # HiperparÃ¡metros
    learning_rate = 0.001
    weight_decay = 1e-5
    
    # Criterio y optimizador
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
    
    # Iniciar MLflow run
    with mlflow.start_run(run_name="deep-classifier-pytorch") as run:
        
        # Log parÃ¡metros
        params = {
            'model_type': 'DeepClassifier',
            'input_size': input_size,
            'hidden_sizes': str([128, 64, 32]),
            'dropout': 0.3,
            'learning_rate': learning_rate,
            'weight_decay': weight_decay,
            'batch_size': train_loader.batch_size,
            'optimizer': 'Adam',
            'epochs': epochs,
            'device': str(device)
        }
        mlflow.log_params(params)
        
        # MÃ©tricas de seguimiento
        train_losses = []
        val_losses = []
        train_accs = []
        val_accs = []
        
        best_val_loss = float('inf')
        
        print("\nðŸš€ Iniciando entrenamiento...\n")
        
        # Training loop
        for epoch in range(epochs):
            # Train
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            
            # Validate
            val_loss, val_acc = validate(model, val_loader, criterion, device)
            
            # Scheduler
            scheduler.step(val_loss)
            
            # Guardar mÃ©tricas
            train_losses.append(train_loss)
            val_losses.append(val_loss)
            train_accs.append(train_acc)
            val_accs.append(val_acc)
            
            # Log a MLflow
            mlflow.log_metrics({
                'train_loss': train_loss,
                'train_accuracy': train_acc,
                'val_loss': val_loss,
                'val_accuracy': val_acc,
                'learning_rate': optimizer.param_groups[0]['lr']
            }, step=epoch)
            
            # Print progress
            print(f"Epoch {epoch+1}/{epochs}:")
            print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
            print(f"  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
            print(f"  LR: {optimizer.param_groups[0]['lr']:.6f}")
            print()
            
            # Guardar mejor modelo
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), 'best_model.pth')
                print(f"  âœ“ Mejor modelo guardado (val_loss: {val_loss:.4f})\n")
        
        # Log del modelo
        mlflow.pytorch.log_model(model, "model")
        
        # Log del mejor modelo
        mlflow.log_artifact('best_model.pth')
        
        # MÃ©tricas finales
        mlflow.log_metrics({
            'final_train_loss': train_losses[-1],
            'final_val_loss': val_losses[-1],
            'best_val_loss': best_val_loss,
            'final_train_acc': train_accs[-1],
            'final_val_acc': val_accs[-1]
        })
        
        # Tags
        mlflow.set_tags({
            'framework': 'pytorch',
            'task': 'binary_classification',
            'distributed': 'spark'
        })
        
        print(f"\nâœ“ Entrenamiento completado")
        print(f"âœ“ Run ID: {run.info.run_id}")
        print(f"âœ“ Mejor val_loss: {best_val_loss:.4f}")
        
        return train_losses, val_losses, train_accs, val_accs, run.info.run_id

# Entrenar modelo
train_losses, val_losses, train_accs, val_accs, run_id = train_with_mlflow(
    model, train_loader, val_loader, epochs=20
)

## 8. VisualizaciÃ³n de Resultados

In [None]:
# Crear visualizaciones
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Loss
axes[0].plot(train_losses, label='Train Loss', linewidth=2)
axes[0].plot(val_losses, label='Val Loss', linewidth=2)
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss', fontsize=12)
axes[0].set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)

# Accuracy
axes[1].plot(train_accs, label='Train Accuracy', linewidth=2)
axes[1].plot(val_accs, label='Val Accuracy', linewidth=2)
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('Accuracy (%)', fontsize=12)
axes[1].set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
axes[1].legend(fontsize=10)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('training_curves.png', dpi=150, bbox_inches='tight')
plt.show()

# Log imagen a MLflow
with mlflow.start_run(run_id=run_id):
    mlflow.log_artifact('training_curves.png')

print("âœ“ Curvas de entrenamiento guardadas")

## 9. EvaluaciÃ³n en Test Set

In [None]:
def evaluate_test_set(model, test_loader, device):
    """EvaluaciÃ³n completa en test set"""
    model.eval()
    
    all_preds = []
    all_targets = []
    all_probs = []
    
    with torch.no_grad():
        for data, target in test_loader:
            output = model(data)
            predicted = (output > 0.5).float()
            
            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(target.cpu().numpy())
            all_probs.extend(output.cpu().numpy())
    
    # Convertir a arrays
    all_preds = np.array(all_preds).flatten()
    all_targets = np.array(all_targets).flatten()
    all_probs = np.array(all_probs).flatten()
    
    # Calcular mÃ©tricas
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
    
    metrics = {
        'test_accuracy': accuracy_score(all_targets, all_preds),
        'test_precision': precision_score(all_targets, all_preds),
        'test_recall': recall_score(all_targets, all_preds),
        'test_f1': f1_score(all_targets, all_preds),
        'test_auc_roc': roc_auc_score(all_targets, all_probs)
    }
    
    # Matriz de confusiÃ³n
    cm = confusion_matrix(all_targets, all_preds)
    
    # Log a MLflow
    with mlflow.start_run(run_id=run_id):
        mlflow.log_metrics(metrics)
    
    # Mostrar resultados
    print("\n" + "="*50)
    print("RESULTADOS EN TEST SET")
    print("="*50)
    
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    
    print("\nðŸ“Š Matriz de ConfusiÃ³n:")
    print(cm)
    print()
    
    return metrics, cm

# Cargar mejor modelo
model.load_state_dict(torch.load('best_model.pth'))

# Evaluar
test_metrics, confusion_mat = evaluate_test_set(model, test_loader, device)

## 10. Inferencia con Spark UDF

Aplicar el modelo entrenado a datos distribuidos en Spark

In [None]:
# Crear funciÃ³n de predicciÃ³n
def create_prediction_udf(model, scaler, feature_cols, device):
    """Crear UDF para predicciones en Spark"""
    
    @pandas_udf(FloatType())
    def predict_udf(*features):
        # Combinar features
        X = np.column_stack(features)
        
        # Normalizar
        X_scaled = scaler.transform(X)
        
        # Convertir a tensor
        X_tensor = torch.FloatTensor(X_scaled).to(device)
        
        # Predecir
        model.eval()
        with torch.no_grad():
            predictions = model(X_tensor).cpu().numpy().flatten()
        
        return pd.Series(predictions)
    
    return predict_udf

# Crear UDF
predict_udf = create_prediction_udf(model, scaler, feature_cols, device)

# Aplicar predicciones
df_with_predictions = df_spark.withColumn(
    'prediction_prob',
    predict_udf(*feature_cols)
).withColumn(
    'prediction',
    when(col('prediction_prob') > 0.5, 1).otherwise(0)
)

print("\nðŸ“Š Predicciones con Spark:")
df_with_predictions.select('label', 'prediction', 'prediction_prob').show(10)

# Calcular accuracy en todo el dataset
accuracy = df_with_predictions.filter(col('label') == col('prediction')).count() / df_with_predictions.count()
print(f"\nâœ“ Accuracy en todo el dataset: {accuracy:.4f}")

## 11. Guardar Modelo para ProducciÃ³n

In [None]:
# Registrar modelo en MLflow Model Registry
model_name = "pytorch-deep-classifier"

with mlflow.start_run(run_id=run_id):
    # Crear signature
    from mlflow.models.signature import infer_signature
    
    sample_input = next(iter(test_loader))[0][:5].cpu().numpy()
    model.eval()
    with torch.no_grad():
        sample_output = model(torch.FloatTensor(sample_input).to(device)).cpu().numpy()
    
    signature = infer_signature(sample_input, sample_output)
    
    # Log modelo con signature
    mlflow.pytorch.log_model(
        model,
        "production_model",
        signature=signature,
        registered_model_name=model_name
    )

print(f"\nâœ“ Modelo registrado: {model_name}")
print(f"âœ“ Run ID: {run_id}")
print(f"âœ“ Para cargar el modelo: mlflow.pytorch.load_model('runs:/{run_id}/production_model')")

## 12. Resumen y Conclusiones

In [None]:
print("="*70)
print("RESUMEN: SPARK + PYTORCH + MLFLOW")
print("="*70)

print("\nðŸŽ¯ Logros:")
print("  âœ“ Red neuronal profunda con PyTorch")
print(f"  âœ“ Modelo con {model.num_parameters():,} parÃ¡metros")
print("  âœ“ Entrenamiento con early stopping y learning rate scheduling")
print("  âœ“ Tracking completo con MLflow")
print("  âœ“ Inferencia distribuida con Spark UDFs")

print("\nðŸ“Š MÃ©tricas Finales:")
for metric, value in test_metrics.items():
    print(f"  â€¢ {metric}: {value:.4f}")

print("\nðŸ”— MLflow:")
print(f"  â€¢ Experiment: spark-pytorch-deep-learning")
print(f"  â€¢ Run ID: {run_id}")
print(f"  â€¢ Model: {model_name}")

print("\nðŸ’¡ Casos de Uso:")
print("  â€¢ ClasificaciÃ³n de imÃ¡genes a gran escala")
print("  â€¢ Procesamiento de NLP con transformers")
print("  â€¢ DetecciÃ³n de anomalÃ­as en tiempo real")
print("  â€¢ Sistemas de recomendaciÃ³n")

print("\nðŸš€ PrÃ³ximos Pasos:")
print("  1. Implementar arquitecturas mÃ¡s complejas (ResNet, Transformers)")
print("  2. Usar Petastorm para datasets muy grandes")
print("  3. Implementar distributed training con Horovod")
print("  4. Deploy con MLflow Model Serving")
print("  5. A/B testing de modelos")

print("\n" + "="*70)

## Cleanup

In [None]:
# Limpiar archivos temporales
import os
if os.path.exists('best_model.pth'):
    os.remove('best_model.pth')
if os.path.exists('training_curves.png'):
    os.remove('training_curves.png')

# spark.stop()
print("âœ“ Cleanup completado")

## Recursos Adicionales

### DocumentaciÃ³n
- [PyTorch Documentation](https://pytorch.org/docs/stable/index.html)
- [MLflow PyTorch](https://mlflow.org/docs/latest/python_api/mlflow.pytorch.html)
- [Petastorm](https://github.com/uber/petastorm)
- [Horovod](https://github.com/horovod/horovod)

### Ejercicios
1. Implementar una arquitectura CNN para imÃ¡genes
2. Usar transfer learning con modelos pre-entrenados
3. Implementar attention mechanisms
4. Crear un modelo de NLP con embeddings
5. Implementar hyperparameter tuning con Optuna + MLflow