In [None]:
# --- Imports & session -------------------------------------------------------
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import math, time, os, shutil, random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql.window import Window

# --- Session Spark -----------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("NaiveBayes_MapReduce_like")
    .getOrCreate()
)

# Réduit le bruit des logs Spark pour ne garder que les avertissements et erreurs.
spark.sparkContext.setLogLevel("WARN")

# --- Hyperparam / config par défaut -----------------------------------------
ALPHA = 1.0       # Lissage de Laplace appliqué aux proba conditionnelles
seed = 7          # Graine pour la reproductibilité
train_ratio = 0.7 # Protocole : 70% train / 30% test
has_header = False
sep = ","

# --- Données & sorties intermédiaires ---------------------------------------
data_path = "poker-hand-training-true.data"  # Jeu UCI Poker-Hand
out_dir = "/tmp/nb_counts"                   # Dossier pour stocker les comptages « MapReduce-like »

# Remise à zéro des sorties intermédiaires
if os.path.exists(out_dir):
    shutil.rmtree(out_dir)
os.makedirs(out_dir, exist_ok=True)

In [None]:
def prepare_columns(df, has_header: bool):
    """
    Prépare un DataFrame pour l'entraînement :
    - la DERNIÈRE colonne est la colonne cible (label),
    - renomme proprement les colonnes si le fichier n'a pas d'en-tête,
      afin d'obtenir: features = f0, f1, ..., f{n-1} et label = "label".

    Paramètres
    ----------
    df : pyspark.sql.DataFrame
        Données d'entrée (colonnes ordonnées: features puis label en dernier).
    has_header : bool
        True si le fichier source contient un en-tête déjà exploitable.

    Retour
    ------
    (df, feature_cols, label_col) : tuple
        - df : DataFrame éventuellement renommé,
        - feature_cols : liste des noms de colonnes de caractéristiques,
        - label_col : nom de la colonne cible.
    """
    cols = df.columns  # ordre des colonnes tel que lu

    if has_header:
        # Cas "avec en-tête" : on garde les noms tels quels.
        # Hypothèse: la dernière colonne du fichier est bien le label.
        label_col = cols[-1]
        feature_cols = cols[:-1]
    else:
        # Cas "sans en-tête" : la dernière colonne = label, le reste = features
        label_col = cols[-1]
        feature_cols = cols[:-1]

        # Renommer les features en f0, f1, ..., et le label en "label"
        # -> rend le pipeline plus simple et déterministe
        ren = (
            [F.col(c).alias(f"f{i}") for i, c in enumerate(feature_cols)]
            + [F.col(label_col).alias("label")]
        )
        df = df.select(*ren)

        # Recrée les listes de noms après renommage
        feature_cols = [f"f{i}" for i in range(len(feature_cols))]
        label_col = "label"

    return df, feature_cols, label_col


In [None]:
# --- Lecture brute -----------------------------------------------------------
raw = (
    spark.read
    .option("header", has_header)   # True si le CSV a une ligne d’en-tête
    .option("inferSchema", True)    # Infère automatiquement les types
    .option("sep", sep)             # Séparateur de colonnes (",")
    .csv(data_path)                 # Fichier source
)

# --- Préparation des noms de colonnes (et label en dernière position) -------
df, feature_cols, label_col = prepare_columns(raw, has_header)

# --- Mise au bon type pour Spark ML -----------------------------------------
# Les algos/transformers Spark ML attendent des types numériques (DoubleType).
# On caste donc toutes les colonnes (features + label) en double.
for c in feature_cols + [label_col]:
    df = df.withColumn(c, F.col(c).cast("double"))

# --- Cache pour accélérer les étapes suivantes ------------------------------
# On met en cache car le DataFrame sera réutilisé plusieurs fois (splits, comptages, etc.).
df.cache()

# On vérifie visuellement le schéma/échantillon
df.show(5)

# Traçabilité : liste des features et nom du label
print("Features:", feature_cols)
print("Label   :", label_col)


+---+----+---+----+---+----+---+----+---+----+-----+
| f0|  f1| f2|  f3| f4|  f5| f6|  f7| f8|  f9|label|
+---+----+---+----+---+----+---+----+---+----+-----+
|1.0|10.0|1.0|11.0|1.0|13.0|1.0|12.0|1.0| 1.0|  9.0|
|2.0|11.0|2.0|13.0|2.0|10.0|2.0|12.0|2.0| 1.0|  9.0|
|3.0|12.0|3.0|11.0|3.0|13.0|3.0|10.0|3.0| 1.0|  9.0|
|4.0|10.0|4.0|11.0|4.0| 1.0|4.0|13.0|4.0|12.0|  9.0|
|4.0| 1.0|4.0|13.0|4.0|12.0|4.0|11.0|4.0|10.0|  9.0|
+---+----+---+----+---+----+---+----+---+----+-----+
only showing top 5 rows

Features: ['f0', 'f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'f9']
Label   : label


In [None]:
# --- Split entraînement / test ----------------------------------------------
# randomSplit sépare aléatoirement selon des poids (approx.)
train_df, test_df = df.randomSplit([train_ratio, 1 - train_ratio], seed=seed)

# Ces DataFrames seront réutilisés (comptages, entraînement, évaluation) :
# on les met en cache pour éviter de relire/calculer plusieurs fois.
train_df.cache()
test_df.cache()

# count() force l’évaluation paresseuse (lazy) de Spark, matérialise le cache
# et affiche les tailles effectives des splits.
print("Train =", train_df.count(), " / Test =", test_df.count())


Train = 17483  / Test = 7527


In [None]:
# 1) Détection des colonnes de features f0..fN
feature_cols = sorted(
    [c for c in train_df.columns if c.startswith("f")],
    key=lambda x: int(x[1:]) if x[1:].isdigit() else 10**9
)
assert feature_cols, "Aucune colonne 'f*' détectée dans train_df."

# 2) Normalisation (Naïve Bayes discret)
def normalize_df(df):
    """Cast features/label en string; renomme label -> 'y'."""
    cols = [F.col(c).cast("string").alias(c) for c in feature_cols]
    return df.select(*cols, F.col("label").cast("string").alias("y"))

# 3) Passage wide -> long
def to_long(df, keep_cols):
    """Wide → long: une ligne par (pos, value); conserve keep_cols."""
    arr = F.array(*[F.col(c) for c in feature_cols])
    return df.select(*keep_cols, F.posexplode(arr).alias("pos", "value"))

# 4) Redimensionnement du jeu d’entraînement (pour la scalabilité)
def scale_dataset(df, factor):
    """Resize: <1 sous-échant.; =1 inchangé; >1 sur-échant. avec remise."""
    if factor == 1.0:
        return df
    return df.sample(withReplacement=(factor > 1.0), fraction=factor, seed=42)

# 5) Datasets normalisés
train_norm = normalize_df(train_df).cache(); train_norm.count()
test_norm  = normalize_df(test_df ).cache(); test_norm.count()


7527

## RDD

In [None]:
# === Naive Bayes RDD (discret) avec lissage de Laplace ALPHA=1.0 =============

def rdd_train(train_df_norm):
    """Construit le modèle: logprior, logcond, valeur par défaut, classes, |V_i|."""
    # RDD de (y, [(pos, value), ...]) pour compter facilement par (i,v,c)
    def row_to_pair(row):
        feats = [(i, getattr(row, feature_cols[i])) for i in range(len(feature_cols))]
        return (row['y'], feats)

    rdd = train_df_norm.rdd.map(row_to_pair).cache()

    # Priors N_c puis P(c) lissé
    N_c = rdd.map(lambda yc: (yc[0], 1)).reduceByKey(lambda a, b: a + b)
    N = N_c.map(lambda kv: kv[1]).sum()
    classes = N_c.map(lambda kv: kv[0]).collect()
    C = len(classes)
    logprior = N_c.mapValues(lambda nc: math.log((nc + ALPHA) / (N + ALPHA * C))).collectAsMap()

    # Comptes N_{i,v,c}
    ivc = (
        rdd.flatMap(lambda yc: [((i, v, yc[0]), 1) for (i, v) in yc[1]])
           .reduceByKey(lambda a, b: a + b)
    )

    # Dénominateurs N_{i,*,c} pour chaque (i,c)
    ic = (
        ivc.map(lambda kv: ((kv[0][0], kv[0][2]), kv[1]))
           .reduceByKey(lambda a, b: a + b)
           .collectAsMap()
    )

    # Tailles de vocabulaires |V_i|
    Vi = (
        rdd.flatMap(lambda yc: [((i, v), 1) for (i, v) in yc[1]])
           .distinct()
           .map(lambda kv: (kv[0][0], 1))
           .reduceByKey(lambda a, b: a + b)
           .collectAsMap()
    )

    # log P(v|c,i) lissé
    def to_logcond(rec):
        (i, v, c), n = rec
        denom = ic[(i, c)]
        V = Vi[i]
        return ((i, v, c), math.log((n + ALPHA) / (denom + ALPHA * V)))

    logcond = ivc.map(to_logcond).collectAsMap()

    # Valeur par défaut pour v non vu: log(α / (N_{i,*,c} + α|V_i|))
    default_logcond = {
        (i, c): math.log(ALPHA / (ic[(i, c)] + ALPHA * Vi[i]))
        for (i, c) in ic.keys()
    }

    return {
        "logprior": logprior,
        "logcond": logcond,
        "default": default_logcond,
        "classes": classes,
        "Vi": Vi,
    }

def rdd_predict(test_df_norm, model):
    """Retourne un RDD de (pred, y_true) en scorant en log-espace."""
    logprior = model["logprior"]; logcond = model["logcond"]; default = model["default"]
    classes = model["classes"]

    def score_row(row):
        feats = [(i, getattr(row, feature_cols[i])) for i in range(len(feature_cols))]
        best_c, best_s = None, -1e100
        for c in classes:
            s = logprior[c]
            for (i, v) in feats:
                s += logcond.get((i, v, c), default[(i, c)])
            if s > best_s:
                best_s, best_c = s, c
        return (best_c, row['y'])

    return test_df_norm.rdd.map(score_row)

def rdd_run(train_df_norm, test_df_norm):
    """Entraîne, infère, renvoie (accuracy, train_time, infer_time)."""
    t0 = time.time()
    model = rdd_train(train_df_norm)
    train_time = time.time() - t0

    t1 = time.time()
    preds = rdd_predict(test_df_norm, model).cache()
    _ = preds.count()  # force l'exécution pour mesurer l'inférence
    infer_time = time.time() - t1

    accuracy = preds.map(lambda pr: 1 if pr[0] == pr[1] else 0).mean()
    return float(accuracy), train_time, infer_time


In [None]:
# Exécute le pipeline RDD NB et affiche les métriques principales
acc, ttrain, tinfer = rdd_run(train_norm, test_norm)
print(f"[RDD NB] accuracy={acc:.4f} | train_time={ttrain:.3f}s | infer_time={tinfer:.3f}s")

[RDD NB] accuracy=0.4925 | train_time=4.057s | infer_time=0.811s


## DATAFRAME

In [None]:
# === Naive Bayes DataFrame (discret), Laplace ALPHA=1.0 ======================

def df_train(train_df_norm):
    """Calcule prior/logprior, logcond et valeurs par défaut en format DF."""
    # Format long: une ligne par (pos, value) observé
    train_long = to_long(train_df_norm, keep_cols=["y"]).cache(); train_long.count()

    # Priors P(c) lissés
    prior = train_long.groupBy("y").count().cache(); prior.count()
    N = prior.agg(F.sum("count").alias("N")).first()["N"]
    C = prior.count()
    prior_log = (
        prior.withColumn(
            "logprior",
            F.log((F.col("count") + F.lit(ALPHA)) / (F.lit(N) + F.lit(ALPHA * C)))
        ).select("y", "logprior")
    )

    # Comptes conditionnels
    ivc = (
        train_long.groupBy("pos", "value", "y").count()
                  .withColumnRenamed("count", "n_ivc")
                  .cache()
    ); ivc.count()

    ic = ivc.groupBy("pos", "y").agg(F.sum("n_ivc").alias("n_i_star_c")).cache(); ic.count()
    Vi = train_long.groupBy("pos").agg(F.countDistinct("value").alias("V")).cache(); Vi.count()

    # log P(v|c,i) lissé
    cond = (
        ivc.join(ic, ["pos", "y"])
           .join(Vi, ["pos"])
           .withColumn(
               "logcond",
               F.log((F.col("n_ivc") + F.lit(ALPHA)) /
                     (F.col("n_i_star_c") + F.lit(ALPHA) * F.col("V")))
           )
           .select("pos", "value", "y", "logcond")
           .cache()
    ); cond.count()

    # Valeur par défaut pour v non vu
    default = (
        ic.join(Vi, ["pos"])
          .withColumn(
              "default_logcond",
              F.log(F.lit(ALPHA) / (F.col("n_i_star_c") + F.lit(ALPHA) * F.col("V")))
          )
          .select("pos", "y", "default_logcond")
          .cache()
    ); default.count()

    return {"prior_log": prior_log, "cond": cond, "default": default}


def df_predict(test_df_norm, model):
    """Score en log-espace + argmax par id; retourne (accuracy, preds DF)."""
    # Id unique + label vrai
    test_rows = (
        test_df_norm.withColumn("id", F.monotonically_increasing_id())
                    .withColumnRenamed("y", "y_true")
                    .cache()
    ); test_rows.count()

    # Format long
    test_long = to_long(test_rows, keep_cols=["id", "y_true"]).cache(); test_long.count()

    # Espace des classes
    classes = model["prior_log"].select(F.col("y").alias("y")).cache(); classes.count()

    # Étendre (id,pos,value) à toutes les classes
    base = (
        test_long.select("id", "pos", "value").dropDuplicates(["id", "pos", "value"])
                 .crossJoin(classes)
                 .cache()
    ); base.count()

    # Joins: logcond si observé, sinon default_logcond -> somme + prior
    joined = (
        base.join(model["cond"], ["pos", "value", "y"], "left")
            .join(model["default"], ["pos", "y"], "left")
            .withColumn("logc", F.coalesce(F.col("logcond"), F.col("default_logcond")))
            .groupBy("id", "y").agg(F.sum("logc").alias("sum_logc"))
            .join(model["prior_log"], ["y"])
            .withColumn("score", F.col("sum_logc") + F.col("logprior"))
            .cache()
    ); joined.count()

    # Argmax(y) par id
    w = Window.partitionBy("id").orderBy(F.col("score").desc())
    pred = (
        joined.withColumn("rn", F.row_number().over(w))
              .where("rn = 1")
              .select("id", F.col("y").alias("pred"))
              .cache()
    ); pred.count()

    # Accuracy
    acc = (
        pred.join(test_rows.select("id", "y_true"), "id")
            .withColumn("ok", (F.col("pred") == F.col("y_true")).cast("int"))
            .agg(F.avg("ok")).first()[0]
    )

    return float(acc), pred


def df_run(train_df_norm, test_df_norm):
    """Entraîne DF NB puis infère; renvoie (acc, train_time, infer_time)."""
    t0 = time.time()
    model = df_train(train_df_norm)
    # Matérialiser le "modèle" DF
    model["cond"].count(); model["default"].count(); model["prior_log"].count()
    train_time = time.time() - t0

    t1 = time.time()
    acc, pred = df_predict(test_df_norm, model)
    pred.count()  # force l’exécution pour mesurer l’inférence
    infer_time = time.time() - t1

    return acc, train_time, infer_time


In [None]:
# Exécute le pipeline DF NB et affiche les métriques principales
acc, ttrain, tinfer = df_run(train_norm, test_norm)
print(f"[DataFrame NB] accuracy={acc:.4f} | train_time={ttrain:.3f}s | infer_time={tinfer:.3f}s")

[DataFrame NB] accuracy=0.4924 | train_time=9.464s | infer_time=16.036s


## Spark ML

In [None]:
# 1) Vectoriser les f* en une colonne "features" --------------------------------
# Récupère toutes les colonnes de features (f0, f1, ..., fN), triées par index numérique.
feature_cols = sorted(
    [c for c in train_df.columns if c.startswith("f")],
    key=lambda x: int(x[1:]) if x[1:].isdigit() else 10**9
)

# Assemble les colonnes f* en un unique vecteur dense "features".
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Applique l’assembleur aux splits et ne garde que (features, label).
train_ml = assembler.transform(train_df).select("features", "label")
test_ml  = assembler.transform(test_df ).select("features", "label")

# 2) Entraîner et évaluer le NaiveBayes Spark ML
t0 = time.time()
nb = NaiveBayes(
    modelType="multinomial",  # Adapté aux features non négatives.
    smoothing=1.0,            # Lissage de Laplace (α = 1).
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction"
)
model = nb.fit(train_ml)             # Entraînement
train_time = time.time() - t0        # Temps d'entraînement (s)

# Inférence sur le set de test
t1 = time.time()
pred = model.transform(test_ml).select("prediction", "label").cache()
pred.count()
infer_time = time.time() - t1        # Temps d'inférence (s)

# Accuracy
acc = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
).evaluate(pred)

print(f"[Spark ML NB] accuracy={acc:.4f} | train_time={train_time:.3f}s | infer_time={infer_time:.3f}s")


[Spark ML NB] accuracy=0.5025 | train_time=1.956s | infer_time=0.928s


## Scalabilité

In [None]:
# === Grille d'expériences =====================================================

fractions = [0.25, 0.5, 1.0, 2.0]   # redimensionne l'ensemble d'entraînement : 25%, 50%, 100%, 200% (200% = échantillonnage avec remise)
base_parts = max(train_df.rdd.getNumPartitions(), 2)
parts_grid = [base_parts, base_parts * 2, base_parts * 4]   # différents niveaux de parallélisme (nb de partitions / shuffles)

rows = []
for fr in fractions:
    tr = scale_dataset(train_norm, fr)  # redimensionne l'entraînement (<=1 sous-échant., >1 sur-échant.)
    for p in parts_grid:
        # Régler le parallélisme (nombre de partitions dans les shuffles)
        spark.conf.set("spark.sql.shuffle.partitions", p)

        # Harmoniser le partitionnement des datasets et matérialiser
        tr_p = tr.repartition(p).cache();  tr_p.count()
        te_p = test_norm.repartition(p).cache(); te_p.count()

        # --- Variante RDD ----
        acc, ttrain, tinfer = rdd_run(tr_p, te_p)
        rows.append(("RDD", fr, p, tr_p.count(), te_p.count(), acc, ttrain, tinfer))

        # --- Variante DataFrame ----
        acc, ttrain, tinfer = df_run(tr_p, te_p)
        rows.append(("DataFrame", fr, p, tr_p.count(), te_p.count(), acc, ttrain, tinfer))

        # Libérer le cache pour l'itération suivante
        tr_p.unpersist(); te_p.unpersist()

# Résultats agrégés (un DF pour affichage)
schema = ["approach","train_fraction","partitions","n_train","n_test",
          "accuracy","train_time_s","infer_time_s"]
res = spark.createDataFrame(rows, schema).orderBy("approach","train_fraction","partitions")
n = res.count()
res.show(n, truncate=False)


+---------+--------------+----------+-------+------+-------------------+------------------+------------------+
|approach |train_fraction|partitions|n_train|n_test|accuracy           |train_time_s      |infer_time_s      |
+---------+--------------+----------+-------+------+-------------------+------------------+------------------+
|DataFrame|0.25          |2         |4474   |7527  |0.4810681546432842 |16.66138195991516 |26.374945402145386|
|DataFrame|0.25          |4         |4474   |7527  |0.4810681546432842 |11.070682764053345|26.290027618408203|
|DataFrame|0.25          |8         |4474   |7527  |0.4810681546432842 |10.7231125831604  |20.868717908859253|
|DataFrame|0.5           |2         |8813   |7527  |0.48505380629732964|8.716960906982422 |15.216153860092163|
|DataFrame|0.5           |4         |8813   |7527  |0.48505380629732964|5.920252084732056 |14.192898750305176|
|DataFrame|0.5           |8         |8813   |7527  |0.48505380629732964|7.961805105209351 |14.933992862701416|
|