In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import GBTClassifier, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 1. Initialisation de Spark
print("Initialisation de Spark...")
spark = SparkSession.builder \
    .appName("Prédiction Catégorie Véhicule avec GBT OVR") \
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .enableHiveSupport() \
    .getOrCreate()

# 2. Charger les données d'entraînement
print("Chargement des données d'entraînement...")
df_train = spark.sql("SELECT * FROM concessionnaire.client_immat_with_cat")
print(f"Nombre de lignes initiales : {df_train.count()}")

# 3. Indexer la colonne cible
print("Indexation de la colonne cible...")
label_indexer = StringIndexer(inputCol="categorie", outputCol="indexed_categorie", handleInvalid="keep")
df_train = label_indexer.fit(df_train).transform(df_train)

# 4. Gestion des classes déséquilibrées (suréchantillonnage)
print("Gestion des classes déséquilibrées (suréchantillonnage)...")
class_counts = df_train.groupBy("indexed_categorie").count()
class_counts.show()
max_count = class_counts.agg({"count": "max"}).collect()[0][0]

balanced_df = df_train
for row in class_counts.collect():
    class_id, count = row["indexed_categorie"], row["count"]
    print(f"Classe {class_id} : {count} lignes (Max : {max_count})")
    if count < max_count:
        fraction = max_count / count
        print(f"Suréchantillonnage de la classe {class_id} avec une fraction de {fraction:.2f}")
        additional_df = df_train.filter(col("indexed_categorie") == class_id).sample(withReplacement=True, fraction=fraction, seed=42)
        balanced_df = balanced_df.union(additional_df)

print(f"Nombre de lignes après équilibrage : {balanced_df.count()}")

# 5. Indexer les colonnes catégoriques
print("Indexation des colonnes catégoriques...")
categorical_cols = ["sexe", "situationfamilliale", "deuxiemevoiture"]
indexers = [StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="keep") for col in categorical_cols]

# 6. Assembler les features
print("Assemblage des features...")
assembler = VectorAssembler(
    inputCols=["age", "taux", "nbenfantacharge"] + [col + "_indexed" for col in categorical_cols],
    outputCol="featuress"
)

# 7. Modèle GBT avec One-vs-Rest
print("Configuration du modèle GBT avec One-vs-Rest...")
gbt = GBTClassifier(
    labelCol="indexed_categorie",
    featuresCol="featuress",
    maxIter=50,
    maxDepth=10
)

ovr = OneVsRest(classifier=gbt, labelCol="indexed_categorie", featuresCol="featuress", predictionCol="ovr_prediction")

# 8. Pipeline
print("Construction du pipeline...")
pipeline = Pipeline(stages=indexers + [assembler, ovr])

# 9. Séparer les données en entraînement et test
print("Séparation des données en entraînement et test...")
train_df, test_df = balanced_df.randomSplit([0.8, 0.2], seed=42)
print(f"Nombre de lignes dans l'ensemble d'entraînement : {train_df.count()}")
print(f"Nombre de lignes dans l'ensemble de test : {test_df.count()}")


Initialisation de Spark...
Chargement des données d'entraînement...
Nombre de lignes initiales : 86303
Indexation de la colonne cible...
Gestion des classes déséquilibrées (suréchantillonnage)...
+-----------------+-----+
|indexed_categorie|count|
+-----------------+-----+
|              0.0|27240|
|              1.0|15543|
|              4.0|10523|
|              3.0|12114|
|              2.0|12888|
|              6.0| 2858|
|              5.0| 5137|
+-----------------+-----+
Classe 0.0 : 27240 lignes (Max : 27240)
Classe 1.0 : 15543 lignes (Max : 27240)
Suréchantillonnage de la classe 1.0 avec une fraction de 1.75
Classe 4.0 : 10523 lignes (Max : 27240)
Suréchantillonnage de la classe 4.0 avec une fraction de 2.59
Classe 3.0 : 12114 lignes (Max : 27240)
Suréchantillonnage de la classe 3.0 avec une fraction de 2.25
Classe 2.0 : 12888 lignes (Max : 27240)
Suréchantillonnage de la classe 2.0 avec une fraction de 2.11
Classe 6.0 : 2858 lignes (Max : 27240)
Suréchantillonnage de la classe

In [None]:
# 10. Entraîner le modèle
print("Entraînement du modèle...")
model = pipeline.fit(train_df)
print("Entraînement terminé.")

Entraînement du modèle...


In [None]:
model.save("hdfs://namenode:9000/models/ovr_gbt_modelk9")
print("Modèle GBT avec OVR sauvegardé dans HDFS.")