<a href="https://colab.research.google.com/github/ArTeDS/Sentiment-Analysis-on-Streaming-Covid_19-Tweets/blob/main/spark_sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
'''
INSTRUCTIONS:
1) Download Spark manually and extract it to whichever directory you want
2) Download Java jdk 8 (might work with 11). It doesn't work with later versions
3) For Hadoop to work, download winutils. Choose the right version according to the Hadoop version you downloaded

link: https://github.com/steveloughran/winutils

From Hadoop version 3.1 and later versions, the winutils is downloaded by the GitHub of another user. 
A link is provided on the link above. 

Both the wintutils.exe and hadoop.dll must be downloaded and placed in a folder named hadoop\bin

After putting hadoop.dll and winutils in hadoop/bin folder, you also need to put 
hadoop.dll into the C:\Windows\System32 folder

4) Create environment variables providing the paths for spark, java jdk, and hadoop, according to the 
paths you have placed each. Below are my paths. Change them with yours. If working with notebooks, do that on a cell.
If using PyCharm, set the environment variables on the configuration.

5) Before you import pyspark modules, install findspark and run the following code:
import findspark
findspark.init()

Example:
import os    
os.environ['SPARK_HOME'] = 'C:\spark\spark-3.2.0-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_311'
os.environ['HADOOP_HOME'] = 'C:\hadoop'

6) If not installed, install Microsoft Visual C++ 2010. It has to be the 2010 edition

7) If working on notebooks, after running the codes, open a cmd and type telnet localhost port (e.g. telnet 127.0.0.1 5555). 
You may have to activate the telnet client from Windows Features (search on Google how to do that). 
If using Pycharm, just create and run an HTTP client. You can find that in Tools.

'''
import os    
os.environ['SPARK_HOME'] = 'C:\spark\spark-3.2.0-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_311'
os.environ['HADOOP_HOME'] = 'C:\hadoop'

In [None]:
# ! pip install textblob
# ! pip install findspark

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import DataFrameWriter
from textblob import TextBlob


def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

In [None]:
# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

def text_classification(words):
    # polarity detection. It also creates a column for polarity
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    
    # subjectivity detection. It also creates a column for subjectivity
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

In [None]:
if __name__ == "__main__":
    # create Spark session
    spark = SparkSession.builder\
            .appName("big_data_project_covid19_sentiment")\
            .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")\
            .getOrCreate()
    
    # read the tweet data from socket
    lines = spark.readStream.format("socket")\
            .option("host", "127.0.0.1")\
            .option("port", 5555)\
            .load()

    # Preprocess the data
    words = preprocessing(lines)
    
    # text classification to define polarity and subjectivity
    words = text_classification(words)
    words = words.repartition(1)
    
    # write to streaming dataframe
    query = words.writeStream\
        .queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./parquet")\
        .option("checkpointLocation", "./check")\
        .trigger(processingTime='60 seconds')\
        .start()
    query.awaitTermination(900)

In [None]:
#! pip install pyarrow
import pandas as pd
parguet_file = r'C:\Data Science\MSc Data Science\Big Data Management and Processing\Project\parquet'
df=pd.read_parquet(parguet_file, engine='auto')

In [None]:
df

Unnamed: 0,word,polarity,subjectivity
0,,0.0,0.0
1,One of the weird things over the last few ye...,-0.09,0.30333333333333334
2,Petition Do not make COVID vaccination a requi...,0.03333333333333333,0.06666666666666667
3,If you go to the South they have been over C...,0.35,0.55
4,If you go to the South they have been over C...,0.35,0.55
...,...,...,...
43161,Almost 2000 people a week are dying with cov...,0.0,0.0
43162,Spain has some of the most beautiful horse's â...,0.675,0.75
43163,- Claim pandemic is over,0.0,0.0
43164,âš½ï¸� In game fatigue,-0.4,0.4
