# Módulo 7: Apache Spark + MLflow para ML Distribuido

## Teoría: ML Distribuido con Spark

### ¿Por qué Spark para ML?

**Apache Spark** es un framework de procesamiento distribuido que permite:
- Manejar datasets que no caben en memoria
- Procesamiento paralelo en clusters
- Escalabilidad horizontal
- Integración con Hadoop ecosystem

### Spark MLlib

Biblioteca de ML de Spark con:
- **Algoritmos distribuidos**: Regresión lineal, Random Forest, K-Means, etc.
- **Pipelines**: Workflows de ML reproducibles
- **Feature engineering**: Transformadores distribuidos
- **Evaluación**: Métricas para modelos distribuidos

### Arquitectura Spark

```
┌─────────────────────────────────────────┐
│          Driver Program                 │
│  ┌────────────────────────────────┐    │
│  │      SparkContext              │    │
│  └────────────┬───────────────────┘    │
└───────────────┼────────────────────────┘
                │
       ┌────────┴────────┐
       │  Cluster Manager │
       └────────┬─────────┘
                │
    ┌───────────┼───────────┐
    │           │           │
 ┌──▼──┐    ┌──▼──┐    ┌──▼──┐
 │Worker│    │Worker│    │Worker│
 │      │    │      │    │      │
 │Executor│  │Executor│  │Executor│
 └──────┘    └──────┘    └──────┘
```

### Spark + MLflow

**Integración poderosa**:
1. **Spark MLlib** entrena modelos distribuidos
2. **MLflow** trackea experimentos y registra modelos
3. **MLflow** puede desplegar modelos Spark

### Conceptos Clave de Spark

#### 1. RDD (Resilient Distributed Dataset)
```python
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda x: x * 2).collect()
```

#### 2. DataFrame
```python
df = spark.read.csv("data.csv", header=True)
df.filter(df.age > 21).show()
```

#### 3. Pipeline
```python
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorizer, model])
```

## Objetivos
- Configurar Spark con MLflow
- ML distribuido con MLlib
- Pipelines de Spark
- Tracking de experimentos Spark
- Registro de modelos Spark en MLflow

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import mlflow
import mlflow.spark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

spark = SparkSession.builder \
    .appName("MLflow-Spark-Workshop") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"MLflow version: {mlflow.__version__}")

In [None]:
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("spark-mllib-distributed")

## 1. Crear Dataset Distribuido

In [None]:
from sklearn.datasets import make_classification

X, y = make_classification(
    n_samples=100000,
    n_features=20,
    n_informative=15,
    n_redundant=5,
    random_state=42
)

pandas_df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(20)])
pandas_df['label'] = y

spark_df = spark.createDataFrame(pandas_df)

print(f"Total rows: {spark_df.count():,}")
print(f"Partitions: {spark_df.rdd.getNumPartitions()}")
spark_df.printSchema()
spark_df.show(5)

## 2. Pipeline de ML con Spark

In [None]:
train_df, test_df = spark_df.randomSplit([0.8, 0.2], seed=42)

print(f"Train: {train_df.count():,} rows")
print(f"Test: {test_df.count():,} rows")

In [None]:
feature_cols = [f'feature_{i}' for i in range(20)]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=False
)

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=100
)

pipeline = Pipeline(stages=[assembler, scaler, lr])

## 3. Entrenamiento con MLflow Tracking

In [None]:
with mlflow.start_run(run_name="spark_logistic_regression") as run:
    
    mlflow.log_param("model_type", "LogisticRegression")
    mlflow.log_param("max_iter", 100)
    mlflow.log_param("train_samples", train_df.count())
    mlflow.log_param("test_samples", test_df.count())
    mlflow.log_param("num_features", 20)
    mlflow.log_param("spark_version", spark.version)
    
    model = pipeline.fit(train_df)
    
    predictions = model.transform(test_df)
    
    evaluator_acc = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="accuracy"
    )
    
    evaluator_auc = BinaryClassificationEvaluator(
        labelCol="label",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )
    
    accuracy = evaluator_acc.evaluate(predictions)
    auc = evaluator_auc.evaluate(predictions)
    
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("auc", auc)
    
    mlflow.spark.log_model(model, "spark_model")
    
    mlflow.set_tag("framework", "Spark MLlib")
    mlflow.set_tag("distributed", "True")
    
    print(f"Accuracy: {accuracy:.4f}")
    print(f"AUC: {auc:.4f}")
    print(f"Run ID: {run.info.run_id}")

## 4. Comparación de Modelos Distribuidos

In [None]:
models_config = [
    {
        "name": "RandomForest",
        "model": RandomForestClassifier(
            featuresCol="features",
            labelCol="label",
            numTrees=50,
            maxDepth=10
        ),
        "params": {"numTrees": 50, "maxDepth": 10}
    },
    {
        "name": "GradientBoostedTrees",
        "model": GBTClassifier(
            featuresCol="features",
            labelCol="label",
            maxIter=50,
            maxDepth=5
        ),
        "params": {"maxIter": 50, "maxDepth": 5}
    }
]

with mlflow.start_run(run_name="spark_model_comparison") as parent_run:
    
    mlflow.set_tag("experiment_type", "model_comparison")
    
    results = []
    
    for config in models_config:
        with mlflow.start_run(run_name=config["name"], nested=True):
            
            mlflow.log_param("model_type", config["name"])
            for param_name, param_value in config["params"].items():
                mlflow.log_param(param_name, param_value)
            
            pipeline_stages = [assembler, scaler, config["model"]]
            pipeline = Pipeline(stages=pipeline_stages)
            
            model = pipeline.fit(train_df)
            
            predictions = model.transform(test_df)
            
            accuracy = evaluator_acc.evaluate(predictions)
            auc = evaluator_auc.evaluate(predictions)
            
            mlflow.log_metric("accuracy", accuracy)
            mlflow.log_metric("auc", auc)
            
            mlflow.spark.log_model(model, f"spark_{config['name']}_model")
            
            results.append({
                "model": config["name"],
                "accuracy": accuracy,
                "auc": auc
            })
            
            print(f"{config['name']}: Accuracy={accuracy:.4f}, AUC={auc:.4f}")
    
    results_df = pd.DataFrame(results)
    print("\nResults:")
    print(results_df)

## 5. Hyperparameter Tuning Distribuido con CrossValidator

In [None]:
with mlflow.start_run(run_name="spark_crossvalidation_tuning"):
    
    rf = RandomForestClassifier(
        featuresCol="features",
        labelCol="label"
    )
    
    pipeline = Pipeline(stages=[assembler, scaler, rf])
    
    paramGrid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [20, 50]) \
        .addGrid(rf.maxDepth, [5, 10]) \
        .build()
    
    crossval = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator_acc,
        numFolds=3
    )
    
    mlflow.log_param("cv_folds", 3)
    mlflow.log_param("param_grid_size", len(paramGrid))
    
    cvModel = crossval.fit(train_df)
    
    best_model = cvModel.bestModel
    
    predictions = best_model.transform(test_df)
    
    accuracy = evaluator_acc.evaluate(predictions)
    auc = evaluator_auc.evaluate(predictions)
    
    mlflow.log_metric("best_accuracy", accuracy)
    mlflow.log_metric("best_auc", auc)
    
    mlflow.spark.log_model(best_model, "best_spark_model")
    
    print(f"Best model Accuracy: {accuracy:.4f}")
    print(f"Best model AUC: {auc:.4f}")

## 6. Cargar y Usar Modelo Spark desde MLflow

In [None]:
experiment = mlflow.get_experiment_by_name("spark-mllib-distributed")
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])

best_run = runs.loc[runs['metrics.accuracy'].idxmax()]
best_run_id = best_run['run_id']

print(f"Best run: {best_run['tags.mlflow.runName']}")
print(f"Accuracy: {best_run['metrics.accuracy']:.4f}")

model_uri = f"runs:/{best_run_id}/spark_model"
loaded_model = mlflow.spark.load_model(model_uri)

sample_predictions = loaded_model.transform(test_df.limit(10))
sample_predictions.select("label", "prediction", "probability").show()

In [None]:
spark.stop()

## Resumen del Módulo 7

### Conceptos Clave:

1. **Spark para ML Distribuido**
   - Procesamiento de big data
   - ML en cluster
   - Escalabilidad horizontal

2. **Pipelines de Spark**
   - VectorAssembler: combinar features
   - Transformers: preprocesamiento
   - Estimators: modelos ML

3. **MLflow + Spark**
   - `mlflow.spark.log_model()`: guardar pipelines Spark
   - `mlflow.spark.load_model()`: cargar modelos
   - Tracking de experimentos distribuidos

4. **CrossValidator**
   - Hyperparameter tuning distribuido
   - Grid search paralelo
   - K-fold CV en cluster

### Ventajas de Spark + MLflow:
- Entrenar en datasets masivos
- Reproducibilidad en entornos distribuidos
- Deployment de modelos Spark
- Integración con ecosistema Hadoop

## Fin del Taller

¡Has completado el taller de MLOps con MLflow!

### Habilidades adquiridas:
✓ Tracking de experimentos con MLflow
✓ ML con scikit-learn, TensorFlow, PyTorch
✓ Deep Learning (CNN, RNN/LSTM)
✓ Reinforcement Learning
✓ Hyperparameter tuning
✓ Model Registry
✓ Orquestación con Airflow
✓ ML distribuido con Spark
✓ Deployment de modelos

### Próximos pasos:
1. Practica con tus propios datasets
2. Implementa CI/CD para modelos
3. Explora MLflow en producción
4. Contribuye a proyectos open source