In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("pipeline_twitter4") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/03 12:42:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_twitter = spark.read.csv("../twitter_training.csv", header=False, inferSchema=True)

In [4]:
# Provide column names manually (replace with actual column names)
columns = ["Tweet ID", "Entity", "Sentiment", "Tweet content"]
df_twitter = df_twitter.toDF(*columns)

In [5]:
df_twitter = df_twitter.drop("Tweet ID")
df_twitter = df_twitter.drop("Entity")

In [6]:
df_twitter = df_twitter.dropna(subset=["Tweet content"])
df_twitter.toPandas()

Unnamed: 0,Sentiment,Tweet content
0,Positive,im getting on borderlands and i will murder yo...
1,Positive,I am coming to the borders and I will kill you...
2,Positive,im getting on borderlands and i will kill you ...
3,Positive,im coming on borderlands and i will murder you...
4,Positive,im getting on borderlands 2 and i will murder ...
...,...,...
73991,Positive,Just realized that the Windows partition of my...
73992,Positive,Just realized that my Mac window partition is ...
73993,Positive,Just realized the windows partition of my Mac ...
73994,Positive,Just realized between the windows partition of...


In [7]:
# Import required modules
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

# Define the clean_and_lowercase function
def clean_and_lowercase(text):
    # Convert the text to lowercase
    text_lower = text.lower()
    # Remove special characters, punctuation, and unnecessary symbols
    cleaned_text = re.sub(r'[^a-zA-Z\s]', '', text_lower)
    # Return the cleaned text
    return cleaned_text

# Define the UDF
clean_and_lowercase_udf = udf(clean_and_lowercase, StringType())

# Apply the UDF to the 'Tweet content' column
df_twitter = df_twitter.withColumn("cleaned_tweet", clean_and_lowercase_udf("Tweet content"))

# Filter out rows where the cleaned tweet is empty
df_twitter = df_twitter.filter(df_twitter.cleaned_tweet != " ")

In [8]:
df_twitter.toPandas()

                                                                                

Unnamed: 0,Sentiment,Tweet content,cleaned_tweet
0,Positive,im getting on borderlands and i will murder yo...,im getting on borderlands and i will murder yo...
1,Positive,I am coming to the borders and I will kill you...,i am coming to the borders and i will kill you...
2,Positive,im getting on borderlands and i will kill you ...,im getting on borderlands and i will kill you all
3,Positive,im coming on borderlands and i will murder you...,im coming on borderlands and i will murder you...
4,Positive,im getting on borderlands 2 and i will murder ...,im getting on borderlands and i will murder y...
...,...,...,...
73798,Positive,Just realized that the Windows partition of my...,just realized that the windows partition of my...
73799,Positive,Just realized that my Mac window partition is ...,just realized that my mac window partition is ...
73800,Positive,Just realized the windows partition of my Mac ...,just realized the windows partition of my mac ...
73801,Positive,Just realized between the windows partition of...,just realized between the windows partition of...


In [9]:
# Création des étapes de prétraitement
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
indexer = StringIndexer(inputCol="Sentiment", outputCol="label")
tokenizer = Tokenizer(inputCol="cleaned_tweet", outputCol="tokens")
stop_words_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tweet")
cv = CountVectorizer(inputCol="filtered_tweet", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")



In [10]:
from pyspark.ml import Pipeline
# Add indexer, lemmatization, and the rest of the pipeline stages
df_twitter = indexer.fit(df_twitter).transform(df_twitter)
df_twitter.toPandas()

                                                                                

Unnamed: 0,Sentiment,Tweet content,cleaned_tweet,label
0,Positive,im getting on borderlands and i will murder yo...,im getting on borderlands and i will murder yo...,1.0
1,Positive,I am coming to the borders and I will kill you...,i am coming to the borders and i will kill you...,1.0
2,Positive,im getting on borderlands and i will kill you ...,im getting on borderlands and i will kill you all,1.0
3,Positive,im coming on borderlands and i will murder you...,im coming on borderlands and i will murder you...,1.0
4,Positive,im getting on borderlands 2 and i will murder ...,im getting on borderlands and i will murder y...,1.0
...,...,...,...,...
73798,Positive,Just realized that the Windows partition of my...,just realized that the windows partition of my...,1.0
73799,Positive,Just realized that my Mac window partition is ...,just realized that my mac window partition is ...,1.0
73800,Positive,Just realized the windows partition of my Mac ...,just realized the windows partition of my mac ...,1.0
73801,Positive,Just realized between the windows partition of...,just realized between the windows partition of...,1.0


In [11]:
data_preprocessing_pipeline = Pipeline(stages=[tokenizer, stop_words_remover, cv, idf])
preprocessing_model = data_preprocessing_pipeline.fit(df_twitter)

                                                                                

In [12]:
df_transformed = preprocessing_model.transform(df_twitter)
df_transformed.toPandas()

24/05/03 12:42:57 WARN DAGScheduler: Broadcasting large task binary with size 1129.1 KiB
                                                                                

Unnamed: 0,Sentiment,Tweet content,cleaned_tweet,label,tokens,filtered_tweet,raw_features,features
0,Positive,im getting on borderlands and i will murder yo...,im getting on borderlands and i will murder yo...,1.0,"[im, getting, on, borderlands, and, i, will, m...","[im, getting, borderlands, murder]","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 2.8109838048277376, 0.0, 0.0, 0.0, ..."
1,Positive,I am coming to the borders and I will kill you...,i am coming to the borders and i will kill you...,1.0,"[i, am, coming, to, the, borders, and, i, will...","[coming, borders, kill]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,Positive,im getting on borderlands and i will kill you ...,im getting on borderlands and i will kill you all,1.0,"[im, getting, on, borderlands, and, i, will, k...","[im, getting, borderlands, kill]","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 2.8109838048277376, 0.0, 0.0, 0.0, ..."
3,Positive,im coming on borderlands and i will murder you...,im coming on borderlands and i will murder you...,1.0,"[im, coming, on, borderlands, and, i, will, mu...","[im, coming, borderlands, murder]","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 2.8109838048277376, 0.0, 0.0, 0.0, ..."
4,Positive,im getting on borderlands 2 and i will murder ...,im getting on borderlands and i will murder y...,1.0,"[im, getting, on, borderlands, , and, i, will,...","[im, getting, borderlands, , murder]","(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.769450479137928, 0.0, 2.8109838048277376, 0..."
...,...,...,...,...,...,...,...,...
73798,Positive,Just realized that the Windows partition of my...,just realized that the windows partition of my...,1.0,"[just, realized, that, the, windows, partition...","[realized, windows, partition, mac, like, , ye...","(1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.769450479137928, 0.0, 0.0, 2.81256198123465..."
73799,Positive,Just realized that my Mac window partition is ...,just realized that my mac window partition is ...,1.0,"[just, realized, that, my, mac, window, partit...","[realized, mac, window, partition, , years, be...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.769450479137928, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
73800,Positive,Just realized the windows partition of my Mac ...,just realized the windows partition of my mac ...,1.0,"[just, realized, the, windows, partition, of, ...","[realized, windows, partition, mac, , years, b...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.769450479137928, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
73801,Positive,Just realized between the windows partition of...,just realized between the windows partition of...,1.0,"[just, realized, between, the, windows, partit...","[realized, windows, partition, mac, like, , ye...","(2.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.538900958275856, 0.0, 0.0, 2.81256198123465..."


In [None]:
# Specify the path where you want to save the model
model_path = "preprocessing_pipeline1"
# Save the preprocessing model
preprocessing_model.save(model_path)

In [13]:
# Split the data into train and test sets
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)

In [14]:
train_data.toPandas()

24/05/03 12:43:09 WARN DAGScheduler: Broadcasting large task binary with size 1147.2 KiB
                                                                                

Unnamed: 0,Sentiment,Tweet content,cleaned_tweet,label,tokens,filtered_tweet,raw_features,features
0,Irrelevant,. . . . . . Go MSC,go msc,3.0,"[, , , , , , , , , go, msc]","[, , , , , , , , , go, msc]","(9.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(6.925054312241352, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
1,Irrelevant,. . Amazing,amazing,3.0,"[, , , , amazing]","[, , , , amazing]","(4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.077801916551712, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
2,Irrelevant,. I need this in my life so badly,i need this in my life so badly,3.0,"[, , i, need, this, in, my, life, so, badly]","[, , need, life, badly]","(2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.538900958275856, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
3,Irrelevant,. The special thanks go to . @HansrajMeena. ...,the special thanks go to hansrajmeena for ...,3.0,"[, , the, special, thanks, go, to, , hansrajme...","[, , special, thanks, go, , hansrajmeena, , , ...","(8.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(6.155603833103424, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
4,Irrelevant,. Why people say this challenge so hard.. Did...,why people say this challenge so hard did it...,3.0,"[, , why, people, say, this, challenge, so, ha...","[, , people, say, challenge, hard, realizingtw...","(2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.538900958275856, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
...,...,...,...,...,...,...,...,...
59206,Positive,• Me who didnt bought Death Stranding Rhandler...,me who didnt bought death stranding rhandlerr...,1.0,"[, me, who, didnt, bought, death, stranding, r...","[, didnt, bought, death, stranding, rhandlerr,...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.769450479137928, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
59207,Positive,″ Wow,wow,1.0,"[, wow]","[, wow]","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.769450479137928, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
59208,Positive,🤯Night City wire was absolutely breath taking....,night city wire was absolutely breath taking ...,1.0,"[night, city, wire, was, absolutely, breath, t...","[night, city, wire, absolutely, breath, taking...","(3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(2.308351437413784, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
59209,Positive,🤯Night in wire was absolutely breath taking.. ...,night in wire was absolutely breath taking hy...,1.0,"[night, in, wire, was, absolutely, breath, tak...","[night, wire, absolutely, breath, taking, , hy...","(2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.538900958275856, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."


In [16]:
from pyspark.ml.classification import OneVsRest
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a LinearSVC object
svm = LinearSVC(maxIter=10, regParam=0.1, featuresCol="features", labelCol="label")

# Create an OneVsRest object
ovr = OneVsRest(classifier=svm)

# Train the OneVsRest model
ovr_model = ovr.fit(train_data)

# Make predictions on the test data
predictions = ovr_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
#ovr_model.save("model")
print("Accuracy SVM:", accuracy)

24/05/03 12:44:22 WARN DAGScheduler: Broadcasting large task binary with size 1145.0 KiB
24/05/03 12:44:24 WARN DAGScheduler: Broadcasting large task binary with size 1145.9 KiB
24/05/03 12:44:25 WARN DAGScheduler: Broadcasting large task binary with size 1180.2 KiB
24/05/03 12:44:25 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:25 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:25 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:25 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:26 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:26 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:26 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:26 WARN DAGScheduler: Broadcasting large task binary with size 1180.9 KiB
24/05/03 12:44:26 WAR

Accuracy SVM: 0.8451891447368421


                                                                                

# TEST

In [None]:
from pyspark.ml import PipelineModel
from pyspark.ml.classification import OneVsRestModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

# Create a SparkSession
spark = SparkSession.builder \
    .appName("pipeline_twitter4") \
    .getOrCreate()

# Load the pipeline model from disk
model = OneVsRestModel.load("model")
input_string = "Hey , i am gone murder you all"
pipeline_model = PipelineModel.load("preprocessing_pipeline1/")# Create a DataFrame with a single column named "Tweet_content"

In [None]:
# Create a DataFrame with a single column named "Tweet_content"
df = spark.createDataFrame([(input_string,)], ["Tweet_content"])
df.toPandas()

In [None]:
def clean_and_lowercase(text):
    # Convert the text to lowercase
    text_lower = text.lower()
    # Remove special characters, punctuation, and unnecessary symbols
    cleaned_text = re.sub(r'[^a-zA-Z\s]', '', text_lower)
    # Return the cleaned text
    return cleaned_text

# Define the UDF
clean_and_lowercase_udf = udf(clean_and_lowercase, StringType())

In [None]:
df_cleaned = df.withColumn("cleaned_tweet", clean_and_lowercase_udf("Tweet_content"))

In [None]:
df_cleaned.toPandas()

In [None]:
out_pipeline = pipeline_model.transform(df_cleaned)

In [None]:
out_pipeline.toPandas()

In [None]:
new_prediction = model.transform(out_pipeline)

In [None]:
new_prediction.toPandas()