<a href="https://colab.research.google.com/github/Malek-Ghorbel/TwitterSentimentAnalysis-BigData/blob/main/twitter_sentiment_analysis_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Installing packages**


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=8367c2d714fc4391b3fb145f7325eb8cc913b806e73b807614aae3af16157f5f
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
!pip install sparknlp

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sparknlp
  Downloading sparknlp-1.0.0-py3-none-any.whl (1.4 kB)
Collecting spark-nlp
  Downloading spark_nlp-4.3.2-py2.py3-none-any.whl (473 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m473.2/473.2 KB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: spark-nlp, sparknlp
Successfully installed spark-nlp-4.3.2 sparknlp-1.0.0


**importing necessary libraries**

In [None]:
import pyspark
from pyspark.sql.functions import * 
from pyspark.sql.types import * 
from pyspark.sql import SparkSession 

import pandas as pd

import re 
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString,CountVectorizer 

from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline ,PipelineModel

from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp import DocumentAssembler


import os
import gc

**Initialization of spark session**

In [None]:
from pyspark.sql import SparkSession #Import the spark session
from pyspark import SparkContext #Create a spark context
from pyspark.sql import SQLContext #Create an SQL context

import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.executor.memory", "12g").config("spark.driver.memory", "12g")\
    .config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g")\
    .config('spark.executor.cores', '3').config('spark.cores.max', '3')\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.2.3").getOrCreate()

**Charging the dataset (exisitng in drive)**


In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive

Mounted at /gdrive
/gdrive


In [None]:
training_data = spark.read.csv(os.getcwd()+"/MyDrive/training.csv", inferSchema = True, header = False) #Read in the data
training_data.show(10)

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  0|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

**Operations for Data preprocessing**

In [None]:
columns = ["target", "id", "date", "flag", "user", "tweet"]  

training_data = training_data.select(col("_c0").alias(columns[0]), col("_c1").alias(columns[1]), col("_c2").alias(columns[2]),
                      col("_c3").alias(columns[3]), col("_c4").alias(columns[4]), col("_c5").alias(columns[5]))
training_data.show(10) 

+------+----------+--------------------+--------+---------------+--------------------+
|target|        id|                date|    flag|           user|               tweet|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

remove unnecessary columns

In [None]:
training_data = training_data.select('target' ,'tweet')
training_data.show(10) 

+------+--------------------+
|target|               tweet|
+------+--------------------+
|     0|@switchfoot http:...|
|     0|is upset that he ...|
|     0|@Kenichan I dived...|
|     0|my whole body fee...|
|     0|@nationwideclass ...|
|     0|@Kwesidei not the...|
|     0|         Need a hug |
|     0|@LOLTrish hey  lo...|
|     0|@Tatiana_K nope t...|
|     0|@twittera que me ...|
+------+--------------------+
only showing top 10 rows



normalizing the sentiment column values

In [None]:
training_data = training_data.withColumn("target", when(training_data["target"] == 4, 1).otherwise(training_data["target"]))
training_data.groupBy("target").count().orderBy("count").show()

+------+------+
|target| count|
+------+------+
|     1|800000|
|     0|800000|
+------+------+



remove unneeded words from tweets (mentions, links ...)

In [None]:
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', r'http\S+', '')) 
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '@\w+', '')) 
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '#', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', 'RT', ''))


training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&amp;', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&quot;', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&gt;', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&lt;', ''))


training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '-', ''))

training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '   ', ' '))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '  ', ' '))


training_data = training_data.filter((training_data.tweet!= ' ') &(training_data.tweet!= '')& (training_data.tweet!= '   '))

Splitting data into training and test

In [None]:
Train_Test_sets = training_data.randomSplit([0.75, 0.25])
train_set = Train_Test_sets[0] 
test_set = Train_Test_sets[1] 

In [None]:
# document_assembler = DocumentAssembler() \
#     .setInputCol("tweet") \
#     .setOutputCol("document")
#document_assembler = DocumentAssembler(inputCol="tweet" , outputCol="document")
#model1 = document_assembler.fit(train_set)

In [None]:
# dentence_detector = SentenceDetector() \
#     .setInputCols(["document"]) \
#     .setOutputCol("sentence")

In [None]:
# tokenizer = Tokenizer() \
#   .setInputCols(["sentence"]) \
#   .setOutputCol("token")

In [None]:
# stopwords_cleaner = StopWordsCleaner()\
#       .setInputCols("token")\
#       .setOutputCol("cleanTokens")\
#       .setCaseSensitive(False)

In [None]:
# normalizer = Normalizer() \
#     .setInputCols(["cleanTokens"]) \
#     .setOutputCol("normalized")\
#     .setLowercase(True)

# finisher = Finisher() \
#     .setInputCols(["normalized"]) \
#     .setOutputCols(["token_features"]) \
#     .setOutputAsArray(True) \
#     .setCleanAnnotations(False)# To generate Term Frequency

In [None]:
# hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures")# To generate Inverse Document Frequency

In [None]:
# idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

In [None]:
# SVC = LinearSVC(labelCol = "target", featuresCol="features",maxIter=13, regParam=0.2)

Defining the stages for the Natural Language Processing (NLP) pipeline that will be applied to data

In [None]:
#from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

#Turn tweets into documents
document_assembler = DocumentAssembler() \
    .setInputCol("tweet") \
    .setOutputCol("document")

#Turn these documents into tokens
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

#Normalizing the tokens (Remove punctautions ..)
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

#Remove stop words from tokens
stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(["normalized"]) \
    .setOutputCol("cleanTokens") \
    .setCaseSensitive(False)

#turn the documented_tokens into array of tokens
finisher = Finisher() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True)

#Hashing the tokens
hashingTF = HashingTF(inputCol="tokens", outputCol="tf", numFeatures=1000)
idf = IDF(inputCol="tf", outputCol="features")

#Classification based on the hashed tokens using the ML model Support Vector Machine
svm = LinearSVC(featuresCol="features", labelCol="target")



In [None]:
#define the pipeline
nlp_pipeline = Pipeline(
    stages=[
        document_assembler,
        tokenizer, 
        normalizer, 
        stopwords_cleaner, 
        finisher, 
        hashingTF, 
        idf, 
        svm
    ]
)


# nlp_pipeline.setStages([
#     document_assembler,
#     tokenizer,
#     normalizer,
#     stopwords_cleaner,
#     finisher,
#     hashingTF,
#     idf,
#     svm
# ])

#get the model
p=nlp_pipeline.fit(train_set)

Evaluation

In [None]:
def evaluate(input_set):
    results=p.transform(input_set)
    evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(results)
    print("Accuracy = %g" % (accuracy))
    print("Error = %g " % (1.0 - accuracy))
    return accuracy

In [None]:
evaluate(test_set)

Accuracy = 0.688256
Error = 0.311744 


0.6882559090670315

Saving the model

In [None]:
p.save("/pipeline")

define the predict function for new instances

In [None]:
pipeline_model=PipelineModel.load("/pipeline")

def predict(line): # function to make a predection on a tweet or line and outout happy or sad
    sample_df = spark.createDataFrame([[str(line)]]).toDF('tweet')
    #-- preprocessing---
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', r'http\S+', '')) 
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '@\w+', '')) 
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '#', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', 'RT', ''))


    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&amp;', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&quot;', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&gt;', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&lt;', ''))


    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '-', ''))

    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '   ', ' '))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '  ', ' '))

    
    #---
    
    result = pipeline_model.transform(sample_df)
    sentiment = result.select('prediction').first()[0]
    if(sentiment == 1):
        sentiment = "Happy"
        print (str(line)+ " =====> "+"HAPPY")
    else:
        sentiment = "Sad"
        print(str(line)+ " =====> "+"Sad")

    return line , sentiment

In [None]:
predict("Iam really happy right now.") # =>1
predict("Easy Task! ")# =>1
predict("I will be sad if not accepted") #=>0
predict("I am alone")# =>0
predict("My day was full of good events but at the end , a car hit me and broke my leg")# =>0
predict("Death.") #=>0
predict("I failed in my last exam") #=>0
predict("my dad bought me a new car") #=>1
predict("the new car my dad bought me was crashed :(") #=>0
predict("I am nervous") #=>0
predict("I helped many people today") #=>1

Iam really happy right now. =====> HAPPY
Easy Task!  =====> HAPPY
I will be sad if not accepted =====> Sad
I am alone =====> Sad
My day was full of good events but at the end , a car hit me and broke my leg =====> Sad
Death. =====> HAPPY
I failed in my last exam =====> Sad
my dad bought me a new car =====> HAPPY
the new car my dad bought me was crashed :( =====> HAPPY
I am nervous =====> HAPPY
I helped many people today =====> HAPPY


('I helped many people today', 'Happy')