<a href="https://colab.research.google.com/github/ajaysaikiran2208/PySpark/blob/main/Sentiment_analysis_in_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Sentiment analysis in PySpark

In [3]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 59kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=f609662af2d3bda4d05b2d4e6614e065e1e37919beb97470b3204e83a082089e
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


#Import modules and create spark session

In [6]:
#import modules
from pyspark.sql.types import * #PySpark SQL Types class is a base class of all data types in PySpark which defined in a package pyspark. sql. types. DataType and they are used to create DataFrame with a specific type.
from pyspark.sql.functions import * #List of built-in functions available for DataFrame.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
import pyspark
from pyspark.sql import SparkSession

#create Spark session
appName = "Sentiment Analysis in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Read data file into Spark dataFrame

In [7]:
#read csv file into dataFrame with automatically inferred schema
tweets_csv = spark.read.csv('/content/tweets.csv', inferSchema=True, header=True)
tweets_csv.show(truncate=False, n=3)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only showing top 3 rows



#Select Relevant Data

In [9]:

#select only "SentimentText" and "Sentiment" column, 
#and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



#Splitting data into train and test

In [10]:
#divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1327 ; Testing data rows: 605


#data processing and labeling

In [11]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

+-------------------------+-----+------------------------------+
|SentimentText            |label|SentimentWords                |
+-------------------------+-----+------------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |
|I adore cheese #thumbs-up|1    |[i, adore, cheese, #thumbs-up]|
+-------------------------+-----+------------------------------+
only showing top 5 rows



#Using Stopword command to remove unwanted words from the data

In [12]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+-------------------------+-----+------------------------------+---------------------------+
|SentimentText            |label|SentimentWords                |MeaningfulWords            |
+-------------------------+-----+------------------------------+---------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |[adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|[adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |[adore, cheese, #favorite] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |[adore, cheese, #loveit]   |
|I adore cheese #thumbs-up|1    |[i, adore, cheese, #thumbs-up]|[adore, cheese, #thumbs-up]|
+-------------------------+-----+------------------------------+---------------------------+
only showing top 5 rows



#HashingTF
Maps a sequence of terms to their term frequencies using the hashing trick. Currently we use Austin Appleby’s MurmurHash 3 algorithm (MurmurHash3_x86_32) to calculate the hash code value for the term object. 

In [13]:

hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+---------------------------+-------------------------------------------+
|label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #favorite] |(262144,[1689,100089,108624],[1.0,1.0,1.0])|
+-----+---------------------------+-------------------------------------------+
only showing top 3 rows



In [14]:
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

Training is done!


#Scaling, converting, or modifying features

In [15]:
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

+-----+-------------------------------------+--------------------------------------------------------+
|Label|MeaningfulWords                      |features                                                |
+-----+-------------------------------------+--------------------------------------------------------+
|1    |[adore, classical, music, #brilliant]|(262144,[45361,100089,102383,131250],[1.0,1.0,1.0,1.0]) |
|1    |[adore, classical, music, #favorite] |(262144,[100089,102383,108624,131250],[1.0,1.0,1.0,1.0])|
+-----+-------------------------------------+--------------------------------------------------------+
only showing top 2 rows



#Predict testing data and calculate the accuracy model

In [16]:
prediction = model.transform(numericTest)
predictionFinal = prediction.select(
    "MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=4, truncate = False)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)

+-------------------------------------+----------+-----+
|MeaningfulWords                      |prediction|Label|
+-------------------------------------+----------+-----+
|[adore, classical, music, #brilliant]|1.0       |1    |
|[adore, classical, music, #favorite] |1.0       |1    |
|[adore, coffee, #favorite]           |1.0       |1    |
|[adore, coffee, #loveit]             |1.0       |1    |
+-------------------------------------+----------+-----+
only showing top 4 rows

correct prediction: 594 , total data: 605 , accuracy: 0.9818181818181818
