In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, lit
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, NGram
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [None]:
# Créer une session Spark
spark = SparkSession.builder \
    .appName("ReviewSentimentClassifier") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()



In [None]:
# Configurer le niveau de log pour réduire les sorties
spark.sparkContext.setLogLevel("WARN")

# Charger les données (prétraitées en pandas et sauvegardées en CSV)
df = spark.read.csv("../data/cleaned_reviews.csv", header=True, inferSchema=True)

# Vérifier le schéma et les données nulles
print("Schema:")
df.printSchema()



In [None]:
# Vérification des valeurs nulles
print("\nNombre de lignes total:", df.count())
null_counts = df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns])
print("Valeurs nulles par colonne:")
null_counts.show()

# Assurer que label est en format numérique et éliminer toute valeur aberrante
df = df.filter((col("label") == 0) | (col("label") == 1) | (col("label") == 2))
df = df.withColumn("label", col("label").cast("double"))

# Remplacer les valeurs nulles dans la colonne "lemmatized_text" par une chaîne vide
df = df.fillna({'lemmatized_text': ''})

In [None]:
# Afficher des statistiques sur les classes
print("\nDistribution des classes:")
class_counts = df.groupBy("label").count().orderBy("label")
class_counts.show()

# Calculer les poids pour équilibrer les classes
total = df.count()
class_weights = class_counts.collect()
weights_dict = {row["label"]: total/row["count"] for row in class_weights}
print("\nPoids par classe:")
for label, weight in weights_dict.items():
    print(f"Classe {label}: {weight:.4f}")

In [None]:
# Ajouter une colonne de poids pour l'algorithme de classification
df = df.withColumn("weight", 
    when(col("label") == 0.0, lit(weights_dict[0.0]))
    .when(col("label") == 1.0, lit(weights_dict[1.0]))
    .when(col("label") == 2.0, lit(weights_dict[2.0]))
    .otherwise(lit(1.0))
)

In [None]:
# Séparer les données en 80/10/10 (train/validation/test)
# Premier split: 80% train, 20% temp
train_df, temp_df = df.randomSplit([0.8, 0.2], seed=42)
# Deuxième split: diviser les 20% restants en deux parts égales (validation/test)
validation_df, test_df = temp_df.randomSplit([0.5, 0.5], seed=42)

print(f"\nDonnées d'entraînement: {train_df.count()} lignes")
print(f"Données de validation: {validation_df.count()} lignes")
print(f"Données de test: {test_df.count()} lignes")

In [None]:
# Tokenisation
tokenizer = Tokenizer(inputCol="lemmatized_text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

In [None]:
# Ajouter extraction de bi-grammes pour capturer les phrases
bigram = NGram(n=2, inputCol="filtered_words", outputCol="bigrams")

In [None]:
# TF-IDF avec plus de features
hashingTF = HashingTF(inputCol="bigrams", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [None]:
# Indexation de la classe avec gestion des valeurs nulles
label_indexer = StringIndexer(
    inputCol="label", 
    outputCol="indexedLabel", 
    handleInvalid="skip"
)

# Classifieur avec paramètres optimisés et utilisation des poids
lr = LogisticRegression(
    featuresCol="features", 
    labelCol="indexedLabel", 
    weightCol="weight",
    maxIter=20,
    regParam=0.1,
    elasticNetParam=0.5
)

# Pipeline complète
pipeline = Pipeline(stages=[tokenizer, remover, bigram, hashingTF, idf, label_indexer, lr])


In [None]:
try:
    # Entraîner le modèle sur les données d'entraînement
    print("\nEntraînement du modèle en cours...")
    model = pipeline.fit(train_df)
    
    # Évaluer sur les données de validation
    print("Évaluation du modèle sur l'ensemble de validation...")
    val_predictions = model.transform(validation_df)
    
    # Calculer les métriques d'évaluation sur la validation
    evaluator = MulticlassClassificationEvaluator(
        labelCol="indexedLabel", 
        predictionCol="prediction", 
        metricName="f1"
    )
    val_f1 = evaluator.evaluate(val_predictions)
    
    evaluator.setMetricName("accuracy")
    val_accuracy = evaluator.evaluate(val_predictions)
    
    print(f"\nRésultats de validation:")
    print(f"F1-score (validation): {val_f1:.4f}")
    print(f"Précision (validation): {val_accuracy:.4f}")
    
    # Matrice de confusion sur la validation
    print("\nMatrice de confusion (validation):")
    val_predictions.groupBy("label", "prediction").count().orderBy("label", "prediction").show()
    
    # Évaluation finale sur l'ensemble de test
    print("\nÉvaluation finale sur l'ensemble de test...")
    test_predictions = model.transform(test_df)
    
    # Afficher quelques prédictions
    print("\nExemples de prédictions (test):")
    test_predictions.select("lemmatized_text", "label", "prediction", "probability").show(5, truncate=30)
    
    # Afficher la distribution des prédictions sur l'ensemble de test
    print("\nDistribution des prédictions (test):")
    test_predictions.groupBy("prediction").count().orderBy("prediction").show()
    
    # Matrice de confusion simplifiée sur le test
    print("\nMatrice de confusion (test):")
    test_predictions.groupBy("label", "prediction").count().orderBy("label", "prediction").show()
    
    # Calculer les métriques finales sur l'ensemble de test
    test_f1 = evaluator.setMetricName("f1").evaluate(test_predictions)
    test_accuracy = evaluator.setMetricName("accuracy").evaluate(test_predictions)
    test_recall = evaluator.setMetricName("weightedRecall").evaluate(test_predictions)
    
    print(f"\nRésultats d'évaluation finaux (test):")
    print(f"F1-score: {test_f1:.4f}")
    print(f"Précision: {test_accuracy:.4f}")
    print(f"Recall pondéré: {test_recall:.4f}")
    
    # Sauvegarder le modèle
    model_path = "../model/balanced_sentiment_model"
    model.write().overwrite().save(model_path)
    print(f"\nModèle sauvegardé avec succès à: {model_path}")
    
except Exception as e:
    print(f"\nErreur pendant l'entraînement: {e}")
    import traceback
    traceback.print_exc()
finally:
    spark.stop()