In [80]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F

spark = (SparkSession.builder 
    .appName("titanic_reto") 
    .config("spark.sql.ansi.enabled", "false")
    .getOrCreate()
)

## Pipeline 

1. Obtener las filas categóricas y numéricas
2. Empleamos indexer y one hot encoding
3. Estandarizamos los datos
4. Entrenamos el modelo

In [81]:
data_titanic = spark.read.csv("train.csv", header = True, inferSchema = True)

In [82]:
data_titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [83]:
data_titanic.printSchema() 

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [84]:
data_titanic.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [85]:
cat_col = [column for column in ["Sex", "Embarked"] if column in data_titanic.columns]
numeric_columns = [column for column, type in data_titanic.dtypes if type in ["int", "double"] and column not in ["PassengerId", "Survived"]]

In [86]:
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in cat_col
]

encoders = [
    OneHotEncoder(inputCols=[f"{c}_idx"], outputCols=[f"{c}_oh"], handleInvalid="keep")
    for c in cat_col
]

imputer = Imputer(inputCols=numeric_columns, outputCols=[f"{c}_imp" for c in numeric_columns])
feature_cols = [f"{c}_imp" for c in numeric_columns] + [f"{c}_oh" for c in cat_col]

In [87]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")

In [88]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=False, withStd=True)

svc = LinearSVC(
    featuresCol="scaledFeatures",
    labelCol="label",
    maxIter=100,
    regParam=0.05,
    standardization=False 
)

pipeline = Pipeline(stages=[*indexers, *encoders, imputer, assembler, scaler, svc])

In [89]:
numeric_data = data_titanic[numeric_columns]

In [90]:
df = data_titanic.withColumn("label", F.col("Survived").cast("double"))
train, test = df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train)
pred  = model.transform(test)

In [91]:
pred.select("PassengerId","label","prediction","rawPrediction").show(10, truncate=False)

+-----------+-----+----------+----------------------------------------+
|PassengerId|label|prediction|rawPrediction                           |
+-----------+-----+----------+----------------------------------------+
|3          |1.0  |1.0       |[-0.9992630453943617,0.9992630453943617]|
|7          |0.0  |0.0       |[0.9918137785619028,-0.9918137785619028]|
|9          |1.0  |1.0       |[-0.9783133639971993,0.9783133639971993]|
|14         |0.0  |0.0       |[1.0841668235403317,-1.0841668235403317]|
|20         |1.0  |1.0       |[-1.004509508527744,1.004509508527744]  |
|24         |1.0  |0.0       |[0.9844591532800553,-0.9844591532800553]|
|30         |0.0  |0.0       |[1.0079574967408202,-1.0079574967408202]|
|36         |0.0  |0.0       |[1.0116708582634706,-1.0116708582634706]|
|46         |0.0  |0.0       |[1.0078956410651276,-1.0078956410651276]|
|47         |0.0  |0.0       |[1.029179001647377,-1.029179001647377]  |
+-----------+-----+----------+----------------------------------

In [92]:
e_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
e_f1  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

acc = e_acc.evaluate(pred)
f1  = e_f1.evaluate(pred)

print(f"Accuracy: {acc:.4f} | F1: {f1:.4f}")

Accuracy: 0.7793 | F1: 0.7771


In [98]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

spark = SparkSession.builder \
    .appName("Titanic-RF-SparkML") \
    .getOrCreate()

TRAIN_PATH = "train.csv"
TEST_PATH  = "test.csv"

train = spark.read.csv(TRAIN_PATH, header=True, inferSchema=True)
test  = spark.read.csv(TEST_PATH,  header=True, inferSchema=True)

train = train.withColumn("Survived", F.col("Survived").cast("double"))
test  = test.withColumn("Survived", F.lit(None).cast("double"))

full_df = train.unionByName(test, allowMissingColumns=True)

title_regex = F.regexp_extract(F.col("Name"), r",\s*([^\.]+)\.", 1)
full_df = full_df.withColumn("TitleRaw", F.trim(title_regex))

rare_titles = ["Lady","Countess","Capt","Col","Don","Dr","Major","Rev","Sir","Jonkheer","Dona"]
full_df = full_df.withColumn(
    "Title",
    F.when(F.col("TitleRaw").isin("Mlle","Ms"), "Miss")
     .when(F.col("TitleRaw") == "Mme", "Mrs")
     .when(F.col("TitleRaw").isin(rare_titles), "Rare")
     .otherwise(F.col("TitleRaw"))
)

full_df = full_df.withColumn("FamilySize", (F.col("SibSp") + F.col("Parch") + F.lit(1)).cast("double"))
full_df = full_df.withColumn("IsAlone", F.when(F.col("FamilySize") == 1, 1.0).otherwise(0.0))

numeric_cols = ["Age", "Fare", "Pclass", "SibSp", "Parch", "FamilySize", "IsAlone"]
full_df = full_df.withColumn("Pclass", F.col("Pclass").cast("double"))

imputer = Imputer(
    strategy="median",
    inputCols=["Age", "Fare"],
    outputCols=["Age_imp", "Fare_imp"]
)

cat_cols = ["Sex", "Embarked", "Title"]

indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in cat_cols
]

encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in cat_cols],
    outputCols=[f"{c}_oh" for c in cat_cols],
    handleInvalid="keep"
)

feature_cols = [
    "Pclass",
    "Age_imp",
    "SibSp",
    "Parch",
    "Fare_imp",
    "FamilySize",
    "IsAlone",
    "Sex_oh",
    "Embarked_oh",
    "Title_oh"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

rf = RandomForestClassifier(
    labelCol="Survived",
    featuresCol="features",
    numTrees=200,
    maxDepth=8,
    minInstancesPerNode=2,
    seed=42
)

stages = [imputer] + indexers + [encoder, assembler, rf]
pipeline = Pipeline(stages=stages)

train_df = full_df.filter(F.col("Survived").isNotNull())
test_df  = full_df.filter(F.col("Survived").isNull())

train_split, valid_split = train_df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_split)

valid_pred = model.transform(valid_split)

bin_eval = BinaryClassificationEvaluator(
    labelCol="Survived",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = bin_eval.evaluate(valid_pred)

acc_eval = MulticlassClassificationEvaluator(
    labelCol="Survived",
    predictionCol="prediction",
    metricName="accuracy"
)
acc = acc_eval.evaluate(valid_pred)

print(f"AUC (valid): {auc:.4f}")
print(f"Accuracy (valid): {acc:.4f}")

test_pred = model.transform(test_df)

submission = (
    test_pred
    .select("PassengerId", F.col("prediction").cast(T.IntegerType()).alias("Survived"))
    .orderBy("PassengerId")
)

output_path = "submission_rf.csv"
(
    submission
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .csv("submission_rf_tmp")
)

# Renombrar el único part file a submission_rf.csv desde Python "normal"
# si estás en un entorno donde puedas usar utilidades del sistema:
import os, shutil
for f in os.listdir("submission_rf_tmp"):
    if f.startswith("part-") and f.endswith(".csv"):
        shutil.move(os.path.join("submission_rf_tmp", f), output_path)
        break
shutil.rmtree("submission_rf_tmp")

print("Archivo de envío guardado como:", output_path)

rf_model = model.stages[-1]  # RandomForestClassifierModel
importances = rf_model.featureImportances

# Para mapear importancias a columnas, necesitamos expandir las OH:
# Calculamos el tamaño de cada OHE para construir nombres expandibles
# (esto es aproximado; si quieres exactitud, inspecciona los attrs del encoder).
# Aquí mostramos un enfoque simple: reporta importancias sumando por grupo.
base_cols = ["Pclass","Age_imp","SibSp","Parch","Fare_imp","FamilySize","IsAlone"]
oh_groups = ["Sex_oh", "Embarked_oh", "Title_oh"]

# Como resumen: imprimimos importancias de las columnas base (primeras posiciones)
# y dejamos una nota de que OHE se divide en varias columnas internas.
print("\nImportancias (resumen):")
for i, col in enumerate(base_cols):
    # orden aproximado según VectorAssembler: base_cols primero
    print(f" - {col}: importancia aproximada disponible en el vector (ver OHE aparte)")
print(" - Variables OHE (Sex, Embarked, Title) se distribuyen en múltiples posiciones del vector.\n")

spark.stop()

25/09/05 19:11:54 WARN DAGScheduler: Broadcasting large task binary with size 1295.8 KiB
25/09/05 19:11:54 WARN DAGScheduler: Broadcasting large task binary with size 1770.0 KiB
25/09/05 19:11:55 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/09/05 19:11:58 WARN DAGScheduler: Broadcasting large task binary with size 1501.5 KiB
25/09/05 19:11:58 WARN DAGScheduler: Broadcasting large task binary with size 1513.3 KiB


AUC (valid): 0.9187
Accuracy (valid): 0.8345


25/09/05 19:11:59 WARN DAGScheduler: Broadcasting large task binary with size 1482.9 KiB
25/09/05 19:11:59 WARN DAGScheduler: Broadcasting large task binary with size 1490.7 KiB
25/09/05 19:11:59 WARN DAGScheduler: Broadcasting large task binary with size 1706.2 KiB


Archivo de envío guardado como: submission_rf.csv

Importancias (resumen):
 - Pclass: importancia aproximada disponible en el vector (ver OHE aparte)
 - Age_imp: importancia aproximada disponible en el vector (ver OHE aparte)
 - SibSp: importancia aproximada disponible en el vector (ver OHE aparte)
 - Parch: importancia aproximada disponible en el vector (ver OHE aparte)
 - Fare_imp: importancia aproximada disponible en el vector (ver OHE aparte)
 - FamilySize: importancia aproximada disponible en el vector (ver OHE aparte)
 - IsAlone: importancia aproximada disponible en el vector (ver OHE aparte)
 - Variables OHE (Sex, Embarked, Title) se distribuyen en múltiples posiciones del vector.

