In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("pipeline_twitter4") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/11 12:20:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_twitter = spark.read.csv("../twitter_training.csv", header=False, inferSchema=True)

                                                                                

In [4]:
# Provide column names manually (replace with actual column names)
columns = ["Tweet ID", "Entity", "Sentiment", "TweetContent"]
df_twitter = df_twitter.toDF(*columns)

In [5]:
df_twitter = df_twitter.drop("Tweet ID")
df_twitter = df_twitter.drop("Entity")

In [6]:
df_twitter = df_twitter.dropna(subset=["TweetContent"])
df_twitter.toPandas()

                                                                                

Unnamed: 0,Sentiment,TweetContent
0,Positive,im getting on borderlands and i will murder yo...
1,Positive,I am coming to the borders and I will kill you...
2,Positive,im getting on borderlands and i will kill you ...
3,Positive,im coming on borderlands and i will murder you...
4,Positive,im getting on borderlands 2 and i will murder ...
...,...,...
73991,Positive,Just realized that the Windows partition of my...
73992,Positive,Just realized that my Mac window partition is ...
73993,Positive,Just realized the windows partition of my Mac ...
73994,Positive,Just realized between the windows partition of...


In [7]:
# Import required modules
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

# Define the clean_and_lowercase function
def clean_and_lowercase(text):
    # Convert the text to lowercase
    text_lower = text.lower()
    # Remove special characters, punctuation, and unnecessary symbols
    cleaned_text = re.sub(r'[^a-zA-Z\s]', '', text_lower)
    # Return the cleaned text
    return cleaned_text

# Define the UDF
clean_and_lowercase_udf = udf(clean_and_lowercase, StringType())

# Apply the UDF to the 'Tweet content' column
df_twitter = df_twitter.withColumn("cleaned_tweet", clean_and_lowercase_udf("TweetContent"))

# Filter out rows where the cleaned tweet is empty
df_twitter = df_twitter.filter(df_twitter.cleaned_tweet != " ")

In [8]:
df_twitter.toPandas()

                                                                                

Unnamed: 0,Sentiment,TweetContent,cleaned_tweet
0,Positive,im getting on borderlands and i will murder yo...,im getting on borderlands and i will murder yo...
1,Positive,I am coming to the borders and I will kill you...,i am coming to the borders and i will kill you...
2,Positive,im getting on borderlands and i will kill you ...,im getting on borderlands and i will kill you all
3,Positive,im coming on borderlands and i will murder you...,im coming on borderlands and i will murder you...
4,Positive,im getting on borderlands 2 and i will murder ...,im getting on borderlands and i will murder y...
...,...,...,...
73798,Positive,Just realized that the Windows partition of my...,just realized that the windows partition of my...
73799,Positive,Just realized that my Mac window partition is ...,just realized that my mac window partition is ...
73800,Positive,Just realized the windows partition of my Mac ...,just realized the windows partition of my mac ...
73801,Positive,Just realized between the windows partition of...,just realized between the windows partition of...


In [9]:
# Création des étapes de prétraitement
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
indexer = StringIndexer(inputCol="Sentiment", outputCol="label")
tokenizer = Tokenizer(inputCol="cleaned_tweet", outputCol="tokens")
stop_words_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tweet")
cv = CountVectorizer(inputCol="filtered_tweet", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")

In [10]:
from pyspark.ml import Pipeline
# Add indexer, lemmatization, and the rest of the pipeline stages
df_twitter = indexer.fit(df_twitter).transform(df_twitter)
df_twitter.toPandas()

                                                                                

Unnamed: 0,Sentiment,TweetContent,cleaned_tweet,label
0,Positive,im getting on borderlands and i will murder yo...,im getting on borderlands and i will murder yo...,1.0
1,Positive,I am coming to the borders and I will kill you...,i am coming to the borders and i will kill you...,1.0
2,Positive,im getting on borderlands and i will kill you ...,im getting on borderlands and i will kill you all,1.0
3,Positive,im coming on borderlands and i will murder you...,im coming on borderlands and i will murder you...,1.0
4,Positive,im getting on borderlands 2 and i will murder ...,im getting on borderlands and i will murder y...,1.0
...,...,...,...,...
73798,Positive,Just realized that the Windows partition of my...,just realized that the windows partition of my...,1.0
73799,Positive,Just realized that my Mac window partition is ...,just realized that my mac window partition is ...,1.0
73800,Positive,Just realized the windows partition of my Mac ...,just realized the windows partition of my mac ...,1.0
73801,Positive,Just realized between the windows partition of...,just realized between the windows partition of...,1.0


In [11]:
# Split the data into train and test sets
train_data, test_data = df_twitter.randomSplit([0.8, 0.2], seed=42)

In [12]:
train_data.toPandas()

                                                                                

Unnamed: 0,Sentiment,TweetContent,cleaned_tweet,label
0,Irrelevant,. . . . . . Go MSC,go msc,3.0
1,Irrelevant,. . Amazing,amazing,3.0
2,Irrelevant,. I need this in my life so badly,i need this in my life so badly,3.0
3,Irrelevant,. The special thanks go to . @HansrajMeena. ...,the special thanks go to hansrajmeena for ...,3.0
4,Irrelevant,. Why people say this challenge so hard.. Did...,why people say this challenge so hard did it...,3.0
...,...,...,...,...
59206,Positive,• Me who didnt bought Death Stranding Rhandler...,me who didnt bought death stranding rhandlerr...,1.0
59207,Positive,″ Wow,wow,1.0
59208,Positive,🤯Night City wire was absolutely breath taking....,night city wire was absolutely breath taking ...,1.0
59209,Positive,🤯Night in wire was absolutely breath taking.. ...,night in wire was absolutely breath taking hy...,1.0


In [14]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import OneVsRest, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.mllib.evaluation import MulticlassMetrics

# Assuming you have defined tokenizer, stop_words_remover, cv, idf, train_data, and test_data earlier in your code

# Create a LinearSVC object
svm = LinearSVC(maxIter=10, regParam=0.1, featuresCol="features", labelCol="label")

# Create an OneVsRest object
ovr = OneVsRest(classifier=svm)

# Create a Pipeline for data preprocessing and classification
data_preprocessing_pipeline = Pipeline(stages=[tokenizer, stop_words_remover, cv, idf, ovr])

# Fit the pipeline to the training data
pipeline_model = data_preprocessing_pipeline.fit(train_data)

# Make predictions on the test data
predictions = pipeline_model.transform(test_data)

# Initialize a SparkSession
spark = SparkSession.builder.appName("ROC_curve").getOrCreate()

# Evaluate the model using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Convert predictions and labels to RDD
prediction_and_label = predictions.select("prediction", "label").rdd.map(lambda row: (float(row["prediction"]), float(row["label"])))

# Get metrics for multiclass classification
metrics = MulticlassMetrics(prediction_and_label)

# Get ROC curve
roc = metrics.roc()

# Plot ROC curve
roc_df = spark.createDataFrame(roc, ["FPR", "TPR"])
roc_df.show()

# You can use a plotting library like matplotlib to visualize the ROC curve
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 6))
plt.plot(roc_df.select("FPR").collect(), roc_df.select("TPR").collect(), label="ROC Curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()


24/05/11 12:24:42 WARN DAGScheduler: Broadcasting large task binary with size 1063.7 KiB
24/05/11 12:24:43 WARN DAGScheduler: Broadcasting large task binary with size 1098.1 KiB
24/05/11 12:24:43 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:43 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:43 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:43 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:43 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:44 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:44 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:44 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:44 WARN DAGScheduler: Broadcasting large task binary with size 1098.8 KiB
24/05/11 12:24:44 WAR

AttributeError: 'MulticlassMetrics' object has no attribute 'roc'

In [42]:

a = pipeline_model.transform(test_data)

In [45]:
a.select("rawPrediction").show()

24/05/09 22:06:07 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
[Stage 176:>                                                        (0 + 1) / 1]

+--------------------+
|       rawPrediction|
+--------------------+
|[-0.8582150101231...|
|[-0.7789983981667...|
|[0.72136592460470...|
|[-0.9049090401708...|
|[-1.5711702229821...|
|[-1.1903670775705...|
|[-0.7169869752199...|
|[-1.3876947281138...|
|[-1.2797648509277...|
|[-1.9836388524386...|
|[-1.5775210347265...|
|[-1.6222531154213...|
|[-1.1398413287592...|
|[-2.3561179739890...|
|[-1.2640578565771...|
|[-2.3121655558559...|
|[-0.9177198317846...|
|[-1.1899152747580...|
|[-0.7488317962122...|
|[-1.4739553338714...|
+--------------------+
only showing top 20 rows



                                                                                

# TEST

In [3]:
from pyspark.ml import PipelineModel
from pyspark.ml.classification import OneVsRestModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml import PipelineModel

import re

# Create a SparkSession
spark = SparkSession.builder \
    .appName("pipeline_twitter4") \
    .getOrCreate()

# Model and pipeline PATH
model_path = "./model"

# Import pipeline 
pipeline = PipelineModel.load(model_path)
input_string = "Hey , i am gone murder you all"
pipeline_model = PipelineModel.load("preprocessing_pipeline1/")# Create a DataFrame with a single column named "Tweet_content"

In [4]:
# Create a DataFrame with a single column named "Tweet_content"
df = spark.createDataFrame([(input_string,)], ["Tweet_content"])
df.toPandas()

Unnamed: 0,Tweet_content
0,"Hey , i am gone murder you all"


In [5]:
def clean_and_lowercase(text):
    # Convert the text to lowercase
    text_lower = text.lower()
    # Remove special characters, punctuation, and unnecessary symbols
    cleaned_text = re.sub(r'[^a-zA-Z\s]', '', text_lower)
    # Return the cleaned text
    return cleaned_text

# Define the UDF
clean_and_lowercase_udf = udf(clean_and_lowercase, StringType())

In [6]:
df_cleaned = df.withColumn("cleaned_tweet", clean_and_lowercase_udf("Tweet_content"))

In [7]:
df_cleaned.toPandas()

                                                                                

Unnamed: 0,Tweet_content,cleaned_tweet
0,"Hey , i am gone murder you all",hey i am gone murder you all


In [8]:
out_pipeline = pipeline.transform(df_cleaned)

In [11]:
out_pipeline.toPandas()

24/05/07 15:12:15 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
                                                                                

Unnamed: 0,Tweet_content,cleaned_tweet,tokens,filtered_tweet,raw_features,features,rawPrediction,prediction
0,"Hey , i am gone murder you all",hey i am gone murder you all,"[hey, , i, am, gone, murder, you, all]","[hey, , gone, murder]","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.7674838227386219, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-0.3844291148589706, -0.5572593847430178, -1....",0.0


In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(out_pipeline)
print("Accuracy:", accuracy)

IllegalArgumentException: label does not exist. Available: Tweet_content, cleaned_tweet, tokens, filtered_tweet, raw_features, features, rawPrediction, prediction