
!["process"](etl_process_styled.gif)
Le script charge le dataset, nettoie/encode diagnosis en label (1/-1), assemble les features en listes float. 
Puis entraîne un perceptron distribué implémenté en Estimator/Model Spark ML :
à chaque époque il prédit (UDF), sélectionne les erreurs, agrège via RDD map/reduce les mises à jour et applique : 
``` bash
w += lr * sum(y*x) / b += lr * sum(y).

In [5]:
# ========================================================================
# 1. Initialisation Spark
# ========================================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

spark = SparkSession.builder.appName("BCWPerceptron").getOrCreate()

# ========================================================================
# 2. Chargement et nettoyage des données
# ========================================================================
# Charger le fichier CSV avec les en-têtes et inférence des types
df = spark.read.csv("bcw_data.csv", header=True, inferSchema=True)

# Supprimer colonnes inutiles : "id" et la colonne vide "Unnamed: 32"
df = df.drop("id", "_c32")

# Créer la colonne label : "M" = 1, "B" = -1
df = df.withColumn("label", when(col("diagnosis") == "M", 1).otherwise(-1))

# Supprimer la colonne diagnosis (inutile après transformation)
df = df.drop("diagnosis")

# Vérifier si certaines colonnes ne sont pas numériques
bad_cols = [c for c, t in df.dtypes if t not in ("double", "int")]
print("Colonnes non numériques détectées:", bad_cols)

# Supprimer toute colonne non numérique restante
df = df.drop(*bad_cols)

# ========================================================================
# 3. Assemblage des features
# ========================================================================
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

# Sélectionner toutes les colonnes sauf "label" comme features
feature_cols = [c for c in df.columns if c != "label"]

# Transformer en vecteur Spark ML
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")

# UDF pour convertir DenseVector Spark -> liste Python (numpy friendly)
def to_array(v):
    return [float(x) for x in v]

to_array_udf = udf(to_array, ArrayType(DoubleType()))

# Ajouter une colonne "features" au format tableau (list de floats)
df = assembler.transform(df) \
              .withColumn("features", to_array_udf(col("features_vec"))) \
              .select("label", "features")

# ========================================================================
# 4. Implémentation du Perceptron
# ========================================================================
import numpy as np
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
from pyspark.ml import Estimator, Model, Pipeline
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.param.shared import Param, Params, TypeConverters

# ---- Modèle final (Transformer) ----
class PerceptronModel(Model, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, weights, bias):
        super(PerceptronModel, self).__init__()
        self.weights = weights
        self.bias = bias

    def _predict_udf(self):
        w = self.weights
        b = self.bias
        def predict(features):
            return 1 if np.dot(w, features) + b >= 0 else -1
        return udf(predict, IntegerType())

    def _transform(self, df):
        predict_udf = self._predict_udf()
        return df.withColumn("prediction", predict_udf(col("features")))

# ---- Estimateur (entraîneur) ----
class PerceptronClassifier(Estimator, DefaultParamsReadable, DefaultParamsWritable):
    lr = Param(Params._dummy(), "lr", "learning rate", typeConverter=TypeConverters.toFloat)
    epochs = Param(Params._dummy(), "epochs", "number of epochs", typeConverter=TypeConverters.toInt)

    def __init__(self, lr=0.1, epochs=10):
        super(PerceptronClassifier, self).__init__()
        self._setDefault(lr=0.1, epochs=10)
        self._set(lr=lr, epochs=epochs)

    def _fit(self, df):
        lr = self.getOrDefault(self.lr)
        epochs = self.getOrDefault(self.epochs)

        # Taille du vecteur de features
        dim = len(df.first()["features"])
        w = np.zeros(dim)
        b = 0.0

        for epoch in range(epochs):
            # Calcul des prédictions locales
            def predict_local(features):
                return 1 if np.dot(w, features) + b >= 0 else -1
            predict_udf = udf(predict_local, IntegerType())
            df_pred = df.withColumn("prediction", predict_udf(col("features")))

            # Identifier les erreurs de classification
            df_err = df_pred.withColumn(
                "error", when(col("label") * col("prediction") <= 0, lit(1)).otherwise(lit(0))
            )

            # Calcul des mises à jour via RDD
            updates = df_err.rdd.map(lambda row: (
                row["error"] * row["label"],  
                np.array(row["features"]) if row["error"] == 1 else np.zeros(dim)
            ))

            # Somme des corrections
            total_update = updates.reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))

            # Mise à jour des poids et du biais
            b += lr * total_update[0]
            w += lr * total_update[1]

            print(f"Epoch {epoch+1}/{epochs} -> erreurs={total_update[0]}")

        return PerceptronModel(weights=w, bias=b)

# ========================================================================
# 5. Pipeline et entraînement
# ========================================================================
perceptron = PerceptronClassifier(lr=0.01, epochs=20)
pipeline = Pipeline(stages=[perceptron])

# Entraîner le modèle
model = pipeline.fit(df)

# Prédictions sur le dataset
df_pred = model.transform(df)
df_pred.show(10)

# ========================================================================
# 6. Évaluation : Accuracy
# ========================================================================
from pyspark.sql.functions import avg

accuracy = df_pred.withColumn(
    "correct", when(col("label") == col("prediction"), 1).otherwise(0)
).agg(avg("correct")).collect()[0][0]

print(f"✅ Accuracy = {accuracy*100:.2f}%")


Colonnes non numériques détectées: ['_c32']
Epoch 1/20 -> erreurs=-357
Epoch 2/20 -> erreurs=-357
Epoch 3/20 -> erreurs=-357
Epoch 4/20 -> erreurs=-357
Epoch 5/20 -> erreurs=-357
Epoch 6/20 -> erreurs=-357
Epoch 7/20 -> erreurs=-357
Epoch 8/20 -> erreurs=-357
Epoch 9/20 -> erreurs=-357
Epoch 10/20 -> erreurs=-357
Epoch 11/20 -> erreurs=-357
Epoch 12/20 -> erreurs=-357
Epoch 13/20 -> erreurs=-357
Epoch 14/20 -> erreurs=-357
Epoch 15/20 -> erreurs=-357
Epoch 16/20 -> erreurs=-357
Epoch 17/20 -> erreurs=-357
Epoch 18/20 -> erreurs=-357
Epoch 19/20 -> erreurs=-357
Epoch 20/20 -> erreurs=-357
+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|    1|[17.99, 10.38, 12...|         1|
|    1|[20.57, 17.77, 13...|         1|
|    1|[19.69, 21.25, 13...|         1|
|    1|[11.42, 20.38, 77...|         1|
|    1|[20.29, 14.34, 13...|         1|
|    1|[12.45, 15.7, 82....|         1|
|    1|[18.25, 19.98, 11...|         1|
|    