In [None]:
# 1. Crear la SparkSession
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("TitanicSparkMLlib")
    .getOrCreate()
)

spark

## 2. Cargar los datos del Titanic

Asumimos que el archivo `titanic.csv` está en la **misma carpeta** que este notebook (`M8/S4`).

Usaremos `spark.read.csv` con `header=True` e `inferSchema=True` para que Spark detecte automáticamente los tipos de datos.

In [None]:
# 2.1 Cargar el archivo titanic.csv
from pyspark.sql import functions as F

data_path = "titanic.csv"  # ruta relativa al notebook

df_raw = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(data_path)
)

df_raw.count(), df_raw.printSchema()

In [None]:
# 2.2 Mostrar algunas filas de ejemplo
df_raw.show(10, truncate=False)

## 3. Análisis exploratorio básico (EDA)

Realizaremos un EDA sencillo para entender el dataset:

- Distribución de la variable objetivo de clasificación: `Survived`.
- Valores nulos por columna.
- Estadísticos básicos de variables numéricas.

In [None]:
# 3.1 Distribución de la variable objetivo de clasificación (Survived)
df_raw.groupBy("Survived").count().orderBy("Survived").show()

# 3.2 Conteo de filas totales
print("Total de filas:", df_raw.count())

# 3.3 Valores nulos por columna
null_counts = df_raw.select([F.count(F.when(F.col(c).isNull() | F.isnan(c), c)).alias(c) for c in df_raw.columns])
null_counts.show(truncate=False)

# 3.4 Estadísticos descriptivos de algunas columnas numéricas
num_cols = ["Age", "Fare", "SibSp", "Parch"]
df_raw.select(num_cols).describe().show()

## 4. Limpieza y preprocesamiento de datos

Para construir modelos en MLlib necesitamos:

- Seleccionar columnas relevantes.
- Manejar valores nulos (por ejemplo, imputar medias en numéricas).
- Convertir variables categóricas a numéricas mediante `StringIndexer` y `OneHotEncoder`.
- Unir todas las características en un único vector `features` con `VectorAssembler`.

En este ejemplo usaremos como variables de entrada:

- Numéricas: `Pclass`, `Age`, `SibSp`, `Parch`, `Fare`.
- Categóricas: `Sex`, `Embarked`.

Para clasificación, la etiqueta será `Survived`. Para regresión, usaremos `Fare` como variable objetivo.

In [None]:
# 4.1 Seleccionar columnas de interés y manejar nulos de forma simple
selected_cols = ["Survived", "Pclass", "Sex", "Age", "SibSp", "Parch", "Fare", "Embarked"]

df = df_raw.select([c for c in selected_cols])

# Imputamos Age y Fare con la media (simple para el ejemplo)
age_mean = df.select(F.avg(F.col("Age")).alias("mean")).collect()[0][0]
fare_mean = df.select(F.avg(F.col("Fare")).alias("mean")).collect()[0][0]

df = df.fillna({"Age": age_mean, "Fare": fare_mean})

# Para columnas categóricas, reemplazamos nulos con un valor fijo ("Unknown")
df = df.fillna({"Embarked": "Unknown"})

df.show(10, truncate=False)

## 5. Transformadores de características (feature engineering)

A continuación definimos los **transformadores** que convertirán nuestras columnas brutas en un vector numérico que los modelos puedan usar:

1. `StringIndexer` para convertir `Sex` y `Embarked` en índices numéricos.
2. `OneHotEncoder` para transformar esos índices en vectores *one-hot*.
3. `VectorAssembler` para unir columnas numéricas y vectores one-hot en una sola columna `features`.

In [None]:
# 5.1 Definir columnas numéricas y categóricas
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

label_col_cls = "Survived"
numeric_cols = ["Pclass", "Age", "SibSp", "Parch", "Fare"]
categorical_cols = ["Sex", "Embarked"]

# Indexers para variables categóricas
indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=f"{c}_idx",
        handleInvalid="keep"
    )
    for c in categorical_cols
]

# OneHotEncoder para las columnas indexadas
encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in categorical_cols],
    outputCols=[f"{c}_ohe" for c in categorical_cols]
)

# VectorAssembler para unir todo en un único vector de características
assembler_inputs = numeric_cols + [f"{c}_ohe" for c in categorical_cols]
assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol="features"
)

# Guardamos todos los transformadores de características en una lista para reutilizarlos
feature_stages = indexers + [encoder, assembler]
feature_stages

## 6. Modelo de **clasificación**: predecir `Survived`

Usaremos `LogisticRegression` como modelo de clasificación binaria.

Pasos principales:

1. Dividir los datos en entrenamiento y prueba.
2. Construir un `Pipeline` que incluya:
   - `StringIndexer` para la etiqueta `Survived` (columna `label`).
   - Los transformadores de características (`feature_stages`).
   - El modelo `LogisticRegression`.
3. Ajustar el modelo y evaluar su desempeño.

In [None]:
# 6.1 Definir el pipeline de clasificación con LogisticRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Indexer para la etiqueta (Survived) -> label
label_indexer_cls = StringIndexer(
    inputCol=label_col_cls,
    outputCol="label",
    handleInvalid="keep"
)

# Modelo de regresión logística
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=50,
    regParam=0.0,
    elasticNetParam=0.0
)

# Pipeline completo de clasificación
pipeline_cls = Pipeline(
    stages=[label_indexer_cls] + feature_stages + [lr]
)

# 6.2 Dividir en train y test
train_df_cls, test_df_cls = df.randomSplit([0.8, 0.2], seed=42)
print("Filas train (cls):", train_df_cls.count())
print("Filas test (cls):", test_df_cls.count())

# 6.3 Entrenar el modelo de clasificación
model_cls = pipeline_cls.fit(train_df_cls)

# 6.4 Obtener predicciones sobre el conjunto de prueba
predictions_cls = model_cls.transform(test_df_cls)
predictions_cls.select("Survived", "probability", "prediction").show(10, truncate=False)

## 7. Evaluación del modelo de clasificación

Usaremos varias métricas para evaluar el modelo de clasificación:

- `BinaryClassificationEvaluator` con métrica AUC (área bajo la curva ROC).
- `MulticlassClassificationEvaluator` para calcular *accuracy* y *F1-score*.

In [None]:
# 7.1 Métricas para clasificación
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Evaluador binario (AUC)
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = evaluator_auc.evaluate(predictions_cls)
print(f"AUC (ROC): {auc:.4f}")

# Evaluador multiclase para accuracy
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator_acc.evaluate(predictions_cls)
print(f"Accuracy: {accuracy:.4f}")

# F1-score
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)
f1 = evaluator_f1.evaluate(predictions_cls)
print(f"F1-score: {f1:.4f}")

## 8. Modelo de **regresión**: predecir `Fare`

Ahora construiremos un modelo de **regresión** para predecir la tarifa `Fare` en función de las mismas variables explicativas.

Para ello:

1. Usaremos el mismo conjunto de transformadores de características (`feature_stages`).
2. Definiremos la etiqueta de regresión a partir de la columna `Fare`.
3. Entrenaremos un `RandomForestRegressor`.
4. Evaluaremos el modelo con métricas como RMSE y R².

In [None]:
# 8.1 Preparar datos para regresión (predicción de Fare)
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Creamos un nuevo DataFrame con la etiqueta de regresión llamada 'label'
df_reg = df.withColumnRenamed("Fare", "label")

# 8.2 Definir el modelo de regresión
rf_reg = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=8,
    seed=42
)

# 8.3 Pipeline de regresión: solo transformadores de características + modelo
pipeline_reg = Pipeline(
    stages=feature_stages + [rf_reg]
)

# 8.4 Dividir en train y test para regresión
train_df_reg, test_df_reg = df_reg.randomSplit([0.8, 0.2], seed=42)
print("Filas train (reg):", train_df_reg.count())
print("Filas test (reg):", test_df_reg.count())

# 8.5 Entrenar modelo de regresión
model_reg = pipeline_reg.fit(train_df_reg)

# 8.6 Predicciones en test para regresión
predictions_reg = model_reg.transform(test_df_reg)
predictions_reg.select("label", "prediction").show(10, truncate=False)

# 8.7 Evaluar con RMSE y R2
evaluator_rmse = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator_rmse.evaluate(predictions_reg)
print(f"RMSE: {rmse:.4f}")

evaluator_r2 = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="r2"
)
r2 = evaluator_r2.evaluate(predictions_reg)
print(f"R2: {r2:.4f}")

## 9. Guardar y reutilizar modelos entrenados (opcional)

En proyectos reales es habitual **guardar** los modelos entrenados para reutilizarlos sin tener que reentrenar cada vez.

Spark MLlib permite guardar modelos y pipelines con el método `.write().save(path)` y luego cargarlos con `.load(path)`.

In [None]:
# 9.1 Guardar el modelo de clasificación (pipeline completo)
import shutil
from pathlib import Path

save_path_cls = Path("models/titanic_logreg_pipeline")

# Borramos el directorio si ya existe (para no tener errores al sobrescribir)
if save_path_cls.exists():
    shutil.rmtree(save_path_cls)

model_cls.write().save(str(save_path_cls))
print(f"Modelo de clasificación guardado en: {save_path_cls}")

# 9.2 Ejemplo de cómo se cargaría de nuevo (comentado para no ejecutarlo por defecto)
# from pyspark.ml.pipeline import PipelineModel
# loaded_model_cls = PipelineModel.load(str(save_path_cls))
# loaded_model_cls.transform(test_df_cls).select("Survived", "prediction").show(5)

## 10. Conclusiones y próximos pasos

En este notebook hemos visto un flujo relativamente completo de trabajo con **Spark MLlib** usando el dataset Titanic:

- Carga y EDA básico con DataFrames.
- Limpieza e imputación sencilla de valores nulos.
- Preparación de datos con `StringIndexer`, `OneHotEncoder`, `VectorAssembler`.
- Construcción de *pipelines* reutilizables.
- Entrenamiento y evaluación de un modelo de **clasificación** (LogisticRegression).
- Entrenamiento y evaluación de un modelo de **regresión** (RandomForestRegressor).
- Ejemplo de guardado de modelos entrenados.

Como **ejercicios adicionales** podrías:

- Probar otros algoritmos de clasificación (p.ej. `RandomForestClassifier`, `GBTClassifier`).
- Probar otros algoritmos de regresión (p.ej. `LinearRegression`, `GBTRegressor`).
- Añadir **validación cruzada** (`CrossValidator`) y **búsqueda de hiperparámetros** (`ParamGridBuilder`).
- Incluir más variables (por ejemplo, procesar el campo `Name` o `Cabin`).
- Implementar un flujo de *ML Ops* donde este modelo se despliegue como servicio.

Este cuaderno puede servirte como **plantilla base** para futuros proyectos de Machine Learning con Spark.