<h1 style="color: blue;">Imports</h1>

In [1]:
import findspark
findspark.init()
import pyspark
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, VectorAssembler, ChiSqSelector
from pyspark.sql.functions import col, sum as spark_sum, split, concat_ws, udf
from pyspark.sql.functions import lower
from pyspark.sql.functions import regexp_replace
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from datetime import datetime
from pyspark.sql.functions import col, sum as spark_sum, split, concat_ws, udf
import string
from pyspark.ml.feature import StopWordsRemover, StringIndexer, CountVectorizer, IDF, Tokenizer, HashingTF
import re
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.mllib.evaluation import MulticlassMetrics
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

<h1 style="color: blue;">Variables de contexte</h1>

In [2]:
spark1 = SparkSession.builder\
            .master("local[16]")\
            .appName("LR_Twitter")\
            .getOrCreate()

path = "twitter_training.csv"

schema = StructType([
    StructField("Tweet ID", IntegerType(), True),
    StructField("Entity", StringType(), True),
    StructField("Sentiment", StringType(), True),
    StructField("Tweet content", StringType(), True)])

In [3]:
df = spark1.read.csv(path,
                     inferSchema=True,
                     header=False,
                     schema=schema)

In [4]:
df.show()

+--------+-----------+---------+--------------------+
|Tweet ID|     Entity|Sentiment|       Tweet content|
+--------+-----------+---------+--------------------+
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|I am coming to th...|
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|im coming on bord...|
|    2401|Borderlands| Positive|im getting on bor...|
|    2401|Borderlands| Positive|im getting into b...|
|    2402|Borderlands| Positive|So I spent a few ...|
|    2402|Borderlands| Positive|So I spent a coup...|
|    2402|Borderlands| Positive|So I spent a few ...|
|    2402|Borderlands| Positive|So I spent a few ...|
|    2402|Borderlands| Positive|2010 So I spent a...|
|    2402|Borderlands| Positive|                 was|
|    2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|    2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|    2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|    2403|Borderlands|  Neut

<h1 style="color: blue;">Data cleaning</h1>

In [5]:
# Counting null values in each column
null_counts = df.select(*(spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))

# Displaying the count of null values in each column
null_counts.show()

+--------+------+---------+-------------+
|Tweet ID|Entity|Sentiment|Tweet content|
+--------+------+---------+-------------+
|       0|     0|        0|          686|
+--------+------+---------+-------------+



In [6]:
# Dropping rows with null values
df = df.na.drop()

# Checking for null values after dropping
null_counts = df.select(*(spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
null_counts.show()

+--------+------+---------+-------------+
|Tweet ID|Entity|Sentiment|Tweet content|
+--------+------+---------+-------------+
|       0|     0|        0|            0|
+--------+------+---------+-------------+



In [7]:
# Print out rows with null values in the "Tweet content" column
print("Rows with null values in 'Tweet content' column:")
df.filter(col("Tweet content").isNull()).show(truncate=False)

Rows with null values in 'Tweet content' column:
+--------+------+---------+-------------+
|Tweet ID|Entity|Sentiment|Tweet content|
+--------+------+---------+-------------+
+--------+------+---------+-------------+



In [8]:
# Counting duplicated rows
duplicated_count = df.groupBy(df.columns).count().where(col("count") > 1).count()

print("Number of duplicated rows:", duplicated_count)

Number of duplicated rows: 1989


In [9]:
# Dropping duplicates
df = df.dropDuplicates()

# Checking for duplicated rows after dropping
duplicated_count = df.groupBy(df.columns).count().where(col("count") > 1).count()

print("Number of duplicated rows after dropping:", duplicated_count)

Number of duplicated rows after dropping: 0


In [10]:
# Filter out rows with null or empty values in the "Tweet content" column
df = df.filter((col("Tweet content").isNotNull()) & (col("Tweet content") != ""))

In [11]:
def filter_non_string(df, column):
    """
    Filter out rows with non-string values in the specified column.
    Convert non-string values to strings.
    """
    # Drop rows with null values in the specified column
    df = df.dropna(subset=[column])
    
    # Convert non-string values to strings in the specified column
    df = df.withColumn(column, col(column).cast("string"))
    
    return df

In [12]:
def normalize_text(df, column):
    """Convert text in the specified column to lowercase."""
    df = df.withColumn(column, lower(col(column)))
    return df

In [13]:
def remove_html_tags(df, column):
    """Remove HTML tags from text in the specified column."""
    df = df.withColumn(column, regexp_replace(col(column), '<.*?>', ''))
    return df

In [14]:
def remove_urls(df, column):
    """Remove URLs or hyperlinks from text in the specified column."""
    df = df.withColumn(column, regexp_replace(col(column), 'http\\S+|www\\S+', ''))
    return df

In [15]:
def remove_numbers(df, column):
    """Exclude numerical digits from text in the specified column."""
    df = df.withColumn(column, regexp_replace(col(column), '\\d+', ''))
    return df

In [16]:
def remove_punctuation(df, column):
    """Remove punctuation marks from text in the specified column."""
    # Define regular expression pattern to match punctuation marks
    punctuation_pattern = "[" + re.escape(string.punctuation) + "]"
    # Remove punctuation marks using regexp_replace
    df = df.withColumn(column, regexp_replace(col(column), punctuation_pattern, ''))
    return df

In [17]:
def remove_stopwords(df, column):
    """Eliminate common stopwords from the tokenized text in the specified column."""
    remover = StopWordsRemover(inputCol=column, outputCol=column+"_filtered")
    df = remover.transform(df).drop(column)
    df = df.withColumnRenamed(column+"_filtered", column)
    return df

In [18]:
def remove_emojis(df, column):
    """Remove emojis from the text in the specified column."""
    emoji_pattern = "[" + u"\U0001F600-\U0001F64F" + u"\U0001F300-\U0001F5FF" + u"\U0001F680-\U0001F6FF" + u"\U0001F1E0-\U0001F1FF" + u"\U00002500-\U00002BEF" + u"\U00002702-\U000027B0" + u"\U000024C2-\U0001F251" + u"\U0001f926-\U0001f937" + u"\U00010000-\U0010ffff" + u"\u2640-\u2642" + u"\u2600-\u2B55" + u"\u200d" + u"\u23cf" + u"\u23e9" + u"\u231a" + u"\ufe0f" + u"\u3030" + "]+"
    # Join array of strings into a single string
    df = df.withColumn(column, concat_ws(" ", col(column)))
    # Remove emojis from the text
    df = df.withColumn(column, regexp_replace(col(column), emoji_pattern, ''))
    return df

In [19]:
def preprocess_text(df):
    df = filter_non_string(df, 'Tweet content')
    df = normalize_text(df, 'Tweet content')
    df = remove_html_tags(df, 'Tweet content')
    df = remove_urls(df, 'Tweet content')
    df = remove_numbers(df, 'Tweet content')
    df = remove_punctuation(df, 'Tweet content')
    return df

# Usage:
data_processed = preprocess_text(df)
data_processed.show()

+--------+--------------------+----------+--------------------+
|Tweet ID|              Entity| Sentiment|       Tweet content|
+--------+--------------------+----------+--------------------+
|    2504|         Borderlands|  Positive|    im so fucking in|
|    2600|         Borderlands|  Positive|i want to say tha...|
|    2716|         Borderlands|   Neutral|this would be an ...|
|    2729|         Borderlands|   Neutral|back on my dry bo...|
|    2741|         Borderlands|  Positive|been mad inactive...|
|    2763|         Borderlands|  Negative|not to say that t...|
|    1614|CallOfDutyBlackop...|  Positive|this sounds like ...|
|    1638|CallOfDutyBlackop...|  Negative|gonna fucking be ass|
|    1700|CallOfDutyBlackop...|Irrelevant|you can’t say thi...|
|    1715|CallOfDutyBlackop...|  Negative|i tried the new a...|
|    1765|CallOfDutyBlackop...|  Positive|sooo hyped for wh...|
|    1889|CallOfDutyBlackop...|Irrelevant|i give up too tir...|
|    1913|CallOfDutyBlackop...|  Positiv

In [20]:
# Tokenize text
tokenizer = Tokenizer(inputCol="Tweet content", outputCol="Tweet content_token")
data_processed = tokenizer.transform(data_processed)

In [21]:
def preprocess_tokens(df):
    df = remove_stopwords(df, 'Tweet content_token')
    df = remove_emojis(df, 'Tweet content_token')
    return df

data_processed = preprocess_tokens(data_processed)
data_processed.show()

+--------+--------------------+----------+--------------------+--------------------+
|Tweet ID|              Entity| Sentiment|       Tweet content| Tweet content_token|
+--------+--------------------+----------+--------------------+--------------------+
|    2504|         Borderlands|  Positive|    im so fucking in|          im fucking|
|    2600|         Borderlands|  Positive|i want to say tha...|      want say thank|
|    2716|         Borderlands|   Neutral|this would be an ...|amazing casting y...|
|    2729|         Borderlands|   Neutral|back on my dry bo...|back dry borderla...|
|    2741|         Borderlands|  Positive|been mad inactive...|mad inactive toda...|
|    2763|         Borderlands|  Negative|not to say that t...|say older games f...|
|    1614|CallOfDutyBlackop...|  Positive|this sounds like ...|sounds like reall...|
|    1638|CallOfDutyBlackop...|  Negative|gonna fucking be ass|   gonna fucking ass|
|    1700|CallOfDutyBlackop...|Irrelevant|you can’t say thi...|ca

In [22]:
# Tokenize text
tokenizer = Tokenizer(inputCol="Tweet content_token", outputCol="Tweet tokens")
data_processed = tokenizer.transform(data_processed)

# Drop the original column "Tweet content_token"
data_processed = data_processed.drop("Tweet content_token")
data_processed = data_processed.drop("Tweet content")

data_processed.show()

+--------+--------------------+----------+--------------------+
|Tweet ID|              Entity| Sentiment|        Tweet tokens|
+--------+--------------------+----------+--------------------+
|    2504|         Borderlands|  Positive|       [im, fucking]|
|    2600|         Borderlands|  Positive|  [want, say, thank]|
|    2716|         Borderlands|   Neutral|[amazing, casting...|
|    2729|         Borderlands|   Neutral|[back, dry, borde...|
|    2741|         Borderlands|  Positive|[mad, inactive, t...|
|    2763|         Borderlands|  Negative|[say, older, game...|
|    1614|CallOfDutyBlackop...|  Positive|[sounds, like, re...|
|    1638|CallOfDutyBlackop...|  Negative|[gonna, fucking, ...|
|    1700|CallOfDutyBlackop...|Irrelevant|[can’t, say, shit...|
|    1715|CallOfDutyBlackop...|  Negative|[tried, new, auto...|
|    1765|CallOfDutyBlackop...|  Positive|[sooo, hyped, lov...|
|    1889|CallOfDutyBlackop...|Irrelevant|[give, tired, rng...|
|    1913|CallOfDutyBlackop...|  Positiv

In [23]:
# Initialize TF vectorizer
hashingTF = HashingTF(inputCol="Tweet tokens", outputCol="rawFeatures")

# Transform the data
featurizedData = hashingTF.transform(data_processed)

# Initialize IDF vectorizer
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Fit and transform the data
idfModel = idf.fit(featurizedData)
tfidf_matrix = idfModel.transform(featurizedData)

# Convert the sparse matrix to a dense array
to_array = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))
tfidf_array = tfidf_matrix.withColumn('features', to_array('features'))

In [24]:
data_processed.show()

+--------+--------------------+----------+--------------------+
|Tweet ID|              Entity| Sentiment|        Tweet tokens|
+--------+--------------------+----------+--------------------+
|    2504|         Borderlands|  Positive|       [im, fucking]|
|    2600|         Borderlands|  Positive|  [want, say, thank]|
|    2716|         Borderlands|   Neutral|[amazing, casting...|
|    2729|         Borderlands|   Neutral|[back, dry, borde...|
|    2741|         Borderlands|  Positive|[mad, inactive, t...|
|    2763|         Borderlands|  Negative|[say, older, game...|
|    1614|CallOfDutyBlackop...|  Positive|[sounds, like, re...|
|    1638|CallOfDutyBlackop...|  Negative|[gonna, fucking, ...|
|    1700|CallOfDutyBlackop...|Irrelevant|[can’t, say, shit...|
|    1715|CallOfDutyBlackop...|  Negative|[tried, new, auto...|
|    1765|CallOfDutyBlackop...|  Positive|[sooo, hyped, lov...|
|    1889|CallOfDutyBlackop...|Irrelevant|[give, tired, rng...|
|    1913|CallOfDutyBlackop...|  Positiv

In [25]:
data_processed = data_processed.withColumnRenamed("Tweet tokens", "Tweet content") \
                               .withColumnRenamed("EntityIndex", "Entity") \
                               .withColumnRenamed("SentimentIndex", "Sentiment")

# Show the renamed DataFrame
data_processed.show()

+--------+--------------------+----------+--------------------+
|Tweet ID|              Entity| Sentiment|       Tweet content|
+--------+--------------------+----------+--------------------+
|    2504|         Borderlands|  Positive|       [im, fucking]|
|    2600|         Borderlands|  Positive|  [want, say, thank]|
|    2716|         Borderlands|   Neutral|[amazing, casting...|
|    2729|         Borderlands|   Neutral|[back, dry, borde...|
|    2741|         Borderlands|  Positive|[mad, inactive, t...|
|    2763|         Borderlands|  Negative|[say, older, game...|
|    1614|CallOfDutyBlackop...|  Positive|[sounds, like, re...|
|    1638|CallOfDutyBlackop...|  Negative|[gonna, fucking, ...|
|    1700|CallOfDutyBlackop...|Irrelevant|[can’t, say, shit...|
|    1715|CallOfDutyBlackop...|  Negative|[tried, new, auto...|
|    1765|CallOfDutyBlackop...|  Positive|[sooo, hyped, lov...|
|    1889|CallOfDutyBlackop...|Irrelevant|[give, tired, rng...|
|    1913|CallOfDutyBlackop...|  Positiv

<h1 style="color: blue;">Chargement du dataset et séparation train/test</h1>

In [None]:
# Split the data
train, test = tfidf_matrix.randomSplit([0.8, 0.2], seed=42)

# Fit the model
lr = LogisticRegression(featuresCol='features', labelCol='SentimentIndex')
model = lr.fit(train)

<h1 style="color: blue;">HashingTF - IDF (paramètres par défaut)</h1>

In [12]:
hashtf = HashingTF(inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features")

label_stringIdx = StringIndexer(inputCol = "Sentiment", outputCol = "label")

lr = LogisticRegression()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

pipeline = Pipeline(stages=[hashtf, idf, label_stringIdx, lr])

In [13]:
%%time

st = datetime.utcnow()
pipelineFit = pipeline.fit(train_set)
print('Training time:', datetime.utcnow() - st)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)



Training time: 0:03:01.070135
Accuracy: 0.8295262267343486
Precision: 0.8295932053518089
Recall: 0.8295262267343486
CPU times: total: 234 ms
Wall time: 3min 22s


In [14]:
from pyspark.ml import PipelineModel

# Assuming 'pipelineFit' is your trained PipelineModel
model_path = "model" 
pipelineFit.save(model_path)

print("Model saved successfully at:", model_path)

Model saved successfully at: model
