In [0]:
%python
       
       
# ============================================================
# 1. Charger les données Spark
# ============================================================

train_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/ngow_lakehouse/ml_sandbox/data/train.csv")

# ============================================================
# 2. Nettoyage : suppression des colonnes causant du data leakage
# (très courant avec le dataset Spaceship Titanic)
# ============================================================

cols_to_remove = ["PassengerId", "Cabin", "Name"]
train_df = train_df.drop(*cols_to_remove)

# ============================================================
# 3. Identifier les colonnes numériques et catégorielles
# ============================================================

numeric_cols = [c for c, t in train_df.dtypes if t in ("int", "double", "float", "bigint")]
categorical_cols = [c for c, t in train_df.dtypes if t == "string" and c != "HomePlanet"]

target_col = "HomePlanet"

print("Numériques :", numeric_cols)
print("Catégorielles :", categorical_cols)

# ============================================================
# 4. Construire le pipeline Spark ML optimisé
# ============================================================

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Indexation de la variable cible
label_indexer = StringIndexer(
    inputCol="HomePlanet", 
    outputCol="label",
    handleInvalid="keep"
)

# Indexage et encodage des colonnes catégorielles
indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=c + "_idx",
        handleInvalid="keep"
    ) for c in categorical_cols
]

encoders = [
    OneHotEncoder(
        inputCols=[c + "_idx"],
        outputCols=[c + "_ohe"]
    ) for c in categorical_cols
]

# Assemblage des features
assembler = VectorAssembler(
    inputCols=numeric_cols + [c + "_ohe" for c in categorical_cols],
    outputCol="features"
)

# RandomForest amélioré
rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees=200,
    maxDepth=15,
    maxBins=64,
    seed=42
)

# Pipeline final
pipeline = Pipeline(stages=[label_indexer] + indexers + encoders + [assembler, rf])

# ============================================================
# 5. Split & Entraînement
# ============================================================

train_df = train_df.na.fill(0, subset=numeric_cols)
try:
    del model
except:
    pass
train_data, test_data = train_df.randomSplit([0.8, 0.2], seed=42)



model = pipeline.fit(train_data)

# ============================================================
# 6. Évaluation du modèle Spark ML
# ============================================================

predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)

print("Nouvelle accuracy Spark ML :", accuracy)