In [1]:
# Importer les bibliothèques nécessaires
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, regexp_replace, lower, concat_ws
from pyspark.sql.functions import  expr, when
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.types import StringType
import re
from pyspark.ml.feature import  StringIndexer, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression,LinearSVC,OneVsRest, DecisionTreeClassifier, RandomForestClassifier, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import joblib

In [2]:
spark = SparkSession.builder \
    .appName("Twitter2") \
    .getOrCreate()

In [3]:
spark

In [5]:
# Définir le schéma (noms et types de colonnes)
schema = StructType([
    StructField("Tweet ID", IntegerType(), True),
    StructField("Entity", StringType(), True),
    StructField("Sentiment", StringType(), True),
    StructField("Tweet content", StringType(), True),
])
# Charger les données à partir du fichier CSV sans en-tête, avec le schéma défini
sentiment_data = spark.read.csv('twitter_training.csv', header=False, schema=schema)

In [6]:
sentiment_data.printSchema()

root
 |-- Tweet ID: integer (nullable = true)
 |-- Entity: string (nullable = true)
 |-- Sentiment: string (nullable = true)
 |-- Tweet content: string (nullable = true)



In [329]:
# Afficher les premières lignes de votre DataFrame
sentiment_data.show()


+--------+-----------+---------+--------------------+
|Tweet ID|     Entity|Sentiment|       Tweet content|
+--------+-----------+---------+--------------------+
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|I am coming to th...|
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|im coming on bord...|
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|im getting into b...|
|    2402|Borderlands| Positive|So I spent a few ...|
|    2402|Borderlands| Positive|So I spent a coup...|
|    2402|Borderlands| Positive|So I spent a few ...|
|    2402|Borderlands| Positive|So I spent a few ...|
|    2402|Borderlands| Positive|2010 So I spent a...|
|    2402|Borderlands| Positive|                 was|
|    2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|    2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|    2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|    2403|Borderlands|  Neut

In [7]:
sentiment_data = sentiment_data.drop("Tweet ID")
sentiment_data = sentiment_data.drop("Entity")

In [331]:
sentiment_data.show()

+---------+--------------------+
|Sentiment|       Tweet content|
+---------+--------------------+
| Positive|im getting on bor...|
| Positive|I am coming to th...|
| Positive|im getting on bor...|
| Positive|im coming on bord...|
| Positive|im getting on bor...|
| Positive|im getting into b...|
| Positive|So I spent a few ...|
| Positive|So I spent a coup...|
| Positive|So I spent a few ...|
| Positive|So I spent a few ...|
| Positive|2010 So I spent a...|
| Positive|                 was|
|  Neutral|Rock-Hard La Varl...|
|  Neutral|Rock-Hard La Varl...|
|  Neutral|Rock-Hard La Varl...|
|  Neutral|Rock-Hard La Vita...|
|  Neutral|Live Rock - Hard ...|
|  Neutral|I-Hard like me, R...|
| Positive|that was the firs...|
| Positive|this was the firs...|
+---------+--------------------+
only showing top 20 rows



In [332]:
sentiment_data.count()


74682

In [8]:
sentiment_data = sentiment_data.dropna(subset=["Tweet content"])

In [334]:
sentiment_data.count()

73824

In [9]:
# Nettoyer le texte pour supprimer les caractères non alphabétiques et 
sentiment_data = sentiment_data.withColumn('Cleaned Tweet content', regexp_replace(col('Tweet content'), r'[^a-zA-Z\s]', ''))
sentiment_data = sentiment_data.withColumn('Cleaned Tweet content', lower(col('Cleaned Tweet content')))

In [10]:
#  filtrer les lignes où le tweet nettoyé est vide
sentiment_data = sentiment_data.filter(sentiment_data['Cleaned Tweet content'] != " ")

In [338]:
sentiment_data.count()

73802

In [339]:
sentiment_data.show()

+---------+--------------------+---------------------+
|Sentiment|       Tweet content|Cleaned Tweet content|
+---------+--------------------+---------------------+
| Positive|im getting on bor...| im getting on bor...|
| Positive|I am coming to th...| i am coming to th...|
| Positive|im getting on bor...| im getting on bor...|
| Positive|im coming on bord...| im coming on bord...|
| Positive|im getting on bor...| im getting on bor...|
| Positive|im getting into b...| im getting into b...|
| Positive|So I spent a few ...| so i spent a few ...|
| Positive|So I spent a coup...| so i spent a coup...|
| Positive|So I spent a few ...| so i spent a few ...|
| Positive|So I spent a few ...| so i spent a few ...|
| Positive|2010 So I spent a...|  so i spent a few...|
| Positive|                 was|                  was|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|
|  Neutral

In [296]:
sentiment_data.show()

+---------+--------------------+---------------------+
|Sentiment|       Tweet content|Cleaned Tweet content|
+---------+--------------------+---------------------+
| Positive|im getting on bor...| im getting on bor...|
| Positive|I am coming to th...| i am coming to th...|
| Positive|im getting on bor...| im getting on bor...|
| Positive|im coming on bord...| im coming on bord...|
| Positive|im getting on bor...| im getting on bor...|
| Positive|im getting into b...| im getting into b...|
| Positive|So I spent a few ...| so i spent a few ...|
| Positive|So I spent a coup...| so i spent a coup...|
| Positive|So I spent a few ...| so i spent a few ...|
| Positive|So I spent a few ...| so i spent a few ...|
| Positive|2010 So I spent a...|  so i spent a few...|
| Positive|                 was|                  was|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|
|  Neutral

In [12]:
# Convertir les étiquettes de sentiment (sentiment labels) en indices numériques
sentiment_data = sentiment_data.withColumn('label', when(col('Sentiment')=='Positive', 1)
                .when(col('Sentiment')=='Negative',2)
              .when(col('Sentiment')=='Neutral',0).when(col('Sentiment')=='Irrelevant',3))

In [298]:
sentiment_data.show()

+---------+--------------------+---------------------+-----+
|Sentiment|       Tweet content|Cleaned Tweet content|label|
+---------+--------------------+---------------------+-----+
| Positive|im getting on bor...| im getting on bor...|    1|
| Positive|I am coming to th...| i am coming to th...|    1|
| Positive|im getting on bor...| im getting on bor...|    1|
| Positive|im coming on bord...| im coming on bord...|    1|
| Positive|im getting on bor...| im getting on bor...|    1|
| Positive|im getting into b...| im getting into b...|    1|
| Positive|So I spent a few ...| so i spent a few ...|    1|
| Positive|So I spent a coup...| so i spent a coup...|    1|
| Positive|So I spent a few ...| so i spent a few ...|    1|
| Positive|So I spent a few ...| so i spent a few ...|    1|
| Positive|2010 So I spent a...|  so i spent a few...|    1|
| Positive|                 was|                  was|    1|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|    0|
|  Neutral|Rock-Hard La 

In [13]:
# Tokenization: Diviser le texte en tokens (mots)
tokenizer = Tokenizer(inputCol='Cleaned Tweet content', outputCol='words')
# Suppression des stop words (mots vides)
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
# Créer un CountVectorizer
count_vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='raw_features')
# Créer un IDF
idf = IDF(inputCol='raw_features', outputCol='features')

In [300]:
sentiment_data.show()

+---------+--------------------+---------------------+-----+
|Sentiment|       Tweet content|Cleaned Tweet content|label|
+---------+--------------------+---------------------+-----+
| Positive|im getting on bor...| im getting on bor...|    1|
| Positive|I am coming to th...| i am coming to th...|    1|
| Positive|im getting on bor...| im getting on bor...|    1|
| Positive|im coming on bord...| im coming on bord...|    1|
| Positive|im getting on bor...| im getting on bor...|    1|
| Positive|im getting into b...| im getting into b...|    1|
| Positive|So I spent a few ...| so i spent a few ...|    1|
| Positive|So I spent a coup...| so i spent a coup...|    1|
| Positive|So I spent a few ...| so i spent a few ...|    1|
| Positive|So I spent a few ...| so i spent a few ...|    1|
| Positive|2010 So I spent a...|  so i spent a few...|    1|
| Positive|                 was|                  was|    1|
|  Neutral|Rock-Hard La Varl...| rockhard la varlo...|    0|
|  Neutral|Rock-Hard La 

In [14]:
# Diviser les données en ensembles d'entraînement et de test (80% pour l'entraînement et 20% pour le test)
train_data, test_data = sentiment_data.randomSplit([0.8, 0.2], seed=42)

In [302]:
# Modèle de régression logistique
lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_pipeline = Pipeline(stages=[tokenizer, remover, count_vectorizer, idf, lr])
# Entraîner le modèle de régression logistique
lr_model = lr_pipeline.fit(train_data)

# Prédictions et évaluation sur l'ensemble de test
lr_predictions = lr_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy: {lr_accuracy}")

Logistic Regression Accuracy: 0.8182005070924416


In [None]:
lr_model.tosave("lr_model")

In [21]:
# Define the LinearSVC model
svm = LinearSVC(featuresCol='features', labelCol='label')

# Define the OneVsRest classifier
one_vs_rest = OneVsRest(classifier=svm, featuresCol='features', labelCol='label', predictionCol='svm_prediction')

# Create the pipeline with necessary stages
svm_pipeline = Pipeline(stages=[
    tokenizer,  # Tokenizer stage
    remover,  # StopWordsRemover stage
    count_vectorizer,  # CountVectorizer stage
    idf,  # IDF stage
    one_vs_rest  # OneVsRest stage with LinearSVC
])

# Train the model
svm_model = svm_pipeline.fit(train_data)

# Make predictions on test data
svm_predictions = svm_model.transform(test_data)

Py4JJavaError: An error occurred while calling o3878.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1824.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1824.0 (TID 5409) (192.168.56.1 executor driver): java.net.SocketException: Software caused connection abort: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.flush(Unknown Source)
	at java.io.DataOutputStream.flush(Unknown Source)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:454)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:78)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:76)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy$lzycompute(MulticlassMetrics.scala:188)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy(MulticlassMetrics.scala:188)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:153)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Software caused connection abort: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.flush(Unknown Source)
	at java.io.DataOutputStream.flush(Unknown Source)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:454)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='svm_prediction', metricName='accuracy')
svm_accuracy = evaluator.evaluate(svm_predictions)
print(f"Model accuracy: {svm_accuracy}")

In [17]:
# Pipeline pour Decision Tree
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')
dt_pipeline = Pipeline(stages=[tokenizer, remover, count_vectorizer, idf, dt])

# Entraîner le modèle d'arbre de décision
dt_model = dt_pipeline.fit(train_data)

# Prédictions et évaluation sur l'ensemble de test
dt_predictions = dt_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
dt_accuracy = evaluator.evaluate(dt_predictions)
print(f"Decision Tree Accuracy: {dt_accuracy}")

Decision Tree Accuracy: 0.3482044956140351


In [18]:
# Configurer RandomForestClassifier
rf = RandomForestClassifier(featuresCol='features', labelCol='label', maxDepth=10)

# Construire le pipeline RandomForest
rf_pipeline = Pipeline(stages=[tokenizer, remover, count_vectorizer, idf, rf])

# Entraîner le modèle sur les données d'entraînement
rf_model = rf_pipeline.fit(train_data)

# Faire des prédictions sur les données de test
rf_predictions = rf_model.transform(test_data)

# Évaluer le modèle sur les données de test
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
rf_accuracy = evaluator.evaluate(rf_predictions)

print(f'Accuracy of Random Forest model: {rf_accuracy}')

Accuracy of Random Forest model: 0.41248629385964913


In [19]:
# Configurer NaiveBayes
nb = NaiveBayes(featuresCol='features', labelCol='label', smoothing=1.0, modelType='multinomial')

# Construire le pipeline NaiveBayes
nb_pipeline = Pipeline(stages=[tokenizer, remover, count_vectorizer, idf, nb])

# Entraîner le modèle sur les données d'entraînement
nb_model = nb_pipeline.fit(train_data)

# Faire des prédictions sur les données de test
nb_predictions = nb_model.transform(test_data)

# Évaluer le modèle sur les données de test
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
nb_accuracy = evaluator.evaluate(nb_predictions)

print(f'Accuracy of Naive Bayes model: {nb_accuracy}')

Accuracy of Naive Bayes model: 0.7972176535087719


In [351]:
spark.stop()