we are working in a local development environment (not in a distributed Spark cluster) so we should imoprt findspark

# Imports

In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover,StringIndexer
from pyspark.sql.functions import col, sum as spark_sum
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import CountVectorizer
from xgboost.spark import SparkXGBClassifier

# Variables de contexte

In [2]:
spark= SparkSession.builder.config("spark.storage.memoryFraction", "0.6").appName('Twitter_NB').getOrCreate()

24/04/30 23:57:28 WARN Utils: Your hostname, sasamg-HP-Laptop-15s-eq2xxx resolves to a loopback address: 127.0.1.1; using 192.168.100.236 instead (on interface wlo1)
24/04/30 23:57:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/30 23:57:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/30 23:57:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/30 23:57:29 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
training_path = 'twitter_training.csv'

### we defined a StructType. This allows us when reading the CSV containing the data, to tell Spark to load the data according to the schema defined above.

In [4]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("game", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("tweet", StringType(), True)
])

In [5]:
dataset = spark.read.csv(training_path, inferSchema=True, schema = schema)
dataset.printSchema()

root
 |-- id: integer (nullable = true)
 |-- game: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- tweet: string (nullable = true)



In [6]:
dataset.show()

                                                                                

+----+-----------+---------+--------------------+
|  id|       game|sentiment|               tweet|
+----+-----------+---------+--------------------+
|2401|Borderlands| Positive|im getting on bor...|
|2401|Borderlands| Positive|I am coming to th...|
|2401|Borderlands| Positive|im getting on bor...|
|2401|Borderlands| Positive|im coming on bord...|
|2401|Borderlands| Positive|im getting on bor...|
|2401|Borderlands| Positive|im getting into b...|
|2402|Borderlands| Positive|So I spent a few ...|
|2402|Borderlands| Positive|So I spent a coup...|
|2402|Borderlands| Positive|So I spent a few ...|
|2402|Borderlands| Positive|So I spent a few ...|
|2402|Borderlands| Positive|2010 So I spent a...|
|2402|Borderlands| Positive|                 was|
|2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|2403|Borderlands|  Neutral|Rock-Hard La Vita...|
|2403|Borderlands|  Neutral|Live Rock - Hard ...|


In [7]:
dataset.describe().show()

24/04/30 23:57:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------------+---------------+----------+--------------------+
|summary|               id|           game| sentiment|               tweet|
+-------+-----------------+---------------+----------+--------------------+
|  count|            74682|          74682|     74682|               73996|
|   mean|6432.586165341046|           NULL|      NULL|                 3.2|
| stddev|3740.427870177445|           NULL|      NULL|   2.007130147392398|
|    min|                1|         Amazon|Irrelevant|                    |
|    max|            13200|johnson&johnson|  Positive|ðŸ§» at Home Depot ...|
+-------+-----------------+---------------+----------+--------------------+



                                                                                

In [8]:
null_counts = dataset.select(*(spark_sum(col(c).isNull().cast("int")).alias(c) for c in dataset.columns))
null_counts.show()

+---+----+---------+-----+
| id|game|sentiment|tweet|
+---+----+---------+-----+
|  0|   0|        0|  686|
+---+----+---------+-----+



# Text Preprocessing

In [9]:
print("The training dataset contains {} samples.".format(dataset.count()))

The training dataset contains 74682 samples.


In [10]:
dataset = dataset.dropna(subset=["tweet"])

#### The 'tweet' column in our dataset is in string format. Therefore, we cannot directly use it for training. First, we need to tokenize it, which we achieve with a tokenizer. Then, we convert these words into vectors using HashingTF. In the notebook's later part, we will see that CountVectorizer is used instead of this method. These two are completely separate methods, and both can be used. By applying these methods, we prepare our 'text' column for training by applying IDF to it. Finally, we label the target column with StringIndexer and convert it to double.

    1. Tokinizer : text -> words
    2. HashingTF : filtered_words -> tf
    3. IDF : tf -> features
    


In [11]:
tokenizer = Tokenizer(inputCol="tweet", outputCol="words")
cv = CountVectorizer(inputCol="words", outputCol="TF")
idf = IDF(inputCol="TF", outputCol="features")

#### now we will convert these categorical Target (column = sentiment) into numerical indices. we will use StringIndexer for this purpose.
    4. IDF : StringIndexer : target -> label

In [12]:
label = StringIndexer(inputCol="sentiment", outputCol="label", handleInvalid="skip")
nb = NaiveBayes(featuresCol="features", labelCol="label")

### If we start with logistic regression, we can put all these pre-processing steps in a pipeline to make it easier to run.

In [14]:
pipeline = Pipeline(stages=[tokenizer, cv, idf, label, nb])

## Chargement du dataset et sÃ©paration train/test

In [15]:
train_set, test_set = dataset.randomSplit([0.8, 0.2], seed=42)

# Modeling
### Logistic Regression modelÂ¶

In [17]:
pipeline_model = pipeline.fit(train_set)
predictions = pipeline_model.transform(test_set)

24/04/30 23:59:24 WARN DAGScheduler: Broadcasting large task binary with size 1938.7 KiB
24/04/30 23:59:25 WARN DAGScheduler: Broadcasting large task binary with size 1923.3 KiB
                                                                                

In [18]:
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_weighted_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_weighted_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

In [19]:
paramGrid = ParamGridBuilder() \
    .addGrid(nb.smoothing, [0.0, 0.5, 1.0]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_accuracy,
                          numFolds=3) 

cvModel = crossval.fit(train_set)
predictions = cvModel.transform(test_set)

24/04/30 23:59:28 WARN DAGScheduler: Broadcasting large task binary with size 1651.5 KiB
24/04/30 23:59:29 WARN DAGScheduler: Broadcasting large task binary with size 1639.8 KiB
24/04/30 23:59:30 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
24/04/30 23:59:32 WARN DAGScheduler: Broadcasting large task binary with size 1651.5 KiB
24/04/30 23:59:32 WARN DAGScheduler: Broadcasting large task binary with size 1639.8 KiB
24/04/30 23:59:33 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
24/04/30 23:59:35 WARN DAGScheduler: Broadcasting large task binary with size 1651.5 KiB
24/04/30 23:59:35 WARN DAGScheduler: Broadcasting large task binary with size 1639.8 KiB
24/04/30 23:59:36 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
24/04/30 23:59:38 WARN DAGScheduler: Broadcasting large task binary with size 1643.6 KiB
24/04/30 23:59:38 WARN DAGScheduler: Broadcasting large task binary with size 1631.9 KiB
24/04/30 23:59:39 WARN DAGSche

In [20]:
accuracy_lr = evaluator_accuracy.evaluate(predictions)
f1_score_lr = evaluator_f1.evaluate(predictions)
weighted_precision_lr = evaluator_weighted_precision.evaluate(predictions)
weighted_recall_lr = evaluator_weighted_recall.evaluate(predictions)

24/04/30 23:59:57 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/04/30 23:59:57 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/04/30 23:59:58 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/04/30 23:59:58 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB


In [21]:
print("Accuracy:", accuracy_lr)
print("F1 Score:", f1_score_lr)
print("Weighted Precision:", weighted_precision_lr)
print("Weighted Recall:", weighted_recall_lr)

Accuracy: 0.8411732531108984
F1 Score: 0.8410991709322988
Weighted Precision: 0.8429710973919331
Weighted Recall: 0.8411732531108984


In [22]:
pipeline_model.write().overwrite().save("logistique_pipeline.pkl")

24/05/01 00:00:46 WARN TaskSetManager: Stage 166 contains a task of very large size (1370 KiB). The maximum recommended task size is 1000 KiB.
24/05/01 00:00:47 WARN TaskSetManager: Stage 170 contains a task of very large size (1051 KiB). The maximum recommended task size is 1000 KiB.
24/05/01 00:00:48 WARN TaskSetManager: Stage 178 contains a task of very large size (2095 KiB). The maximum recommended task size is 1000 KiB.
