In [7]:
# Import Spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd
import string, re, json

# Import NLTK
import nltk
import sys
from nltk.sentiment.vader import SentimentIntensityAnalyzer
nltk.download("vader_lexicon")

# Import numpy per 
import numpy as np

spark = SparkSession.builder \
        .config("spark.mongodb.input.uri", "mongodb://192.168.1.27/SentimentAnalysisSpark.Covid19?retryWrites=true") \
        .config("spark.mongodb.output.uri", "mongodb://192.168.1.27/SentimentAnalysisSpark.LabeledTweetsUTC?retryWrites=true") \
        .getOrCreate()


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/emanuele/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [8]:
pipeline_noRetweet = "[\
    {\
        '$match': {\
            'lang': 'en',\
            'retweeted_status':null\
        }\
    },{\
        '$project': {\
            'id_str': 1\
            'created_at': 1\
            'full_text': 1\
        },\
    }\
]"

pipeline_Retweet = "[\
    {\
        '$match': {\
            'lang': 'en'\
            'retweeted_status':{$ne: null}\
            'retweeted_status.lang':'en'\
        }\
    },{\
        '$project': {\
            'id_str': 1\
            'created_at': 1\
            'retweeted_status.full_text': 1\
        },\
    }\
]"

df_ENGNoRetweet = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline_noRetweet).load()
df_ENGRetweet = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline_Retweet).load()

#df_ENGNoRetweet.printSchema()
#df_ENGRetweet.printSchema()

In [9]:
df_Tweets = df_ENGRetweet\
    .selectExpr("id_str", "retweeted_status.full_text as full_text", "created_at")\
    .union(df_ENGNoRetweet.select("id_str", "full_text", "created_at"))

df_Tweets_noDup = df_Tweets.dropDuplicates(["full_text"])
df_Tweets_noDup.count()

50210

In [10]:
df_Tweets_noDup.printSchema()

root
 |-- id_str: string (nullable = true)
 |-- full_text: string (nullable = true)
 |-- created_at: string (nullable = true)



In [11]:
def vaderSentimentAnalysis(data_str):
    sid = SentimentIntensityAnalyzer()
    ss = sid.polarity_scores(data_str)
    ss.pop('compound', None)
    maximum = max(ss, key=ss.get)  
    if maximum == 'neu':
        if(ss['neu'] >= 0.6):
            return 0
        elif(ss['pos'] > ss['neg']):
            return 1
        elif(ss['neg'] > ss['pos']):
            return 2
        else:
            return 0
    elif maximum == 'pos':
        return 1
    elif maximum == 'neg':
        return 2
    
vaderSentimentAnalysis_udf = udf(vaderSentimentAnalysis, IntegerType())

In [12]:
df_Tweets_noDup_Labeled = df_Tweets_noDup.withColumn("label", vaderSentimentAnalysis_udf(df_Tweets_noDup['full_text']))

In [13]:
df_Tweets_noDup_Labeled.where("label = 0").show()

+-------------------+--------------------+--------------------+-----+
|             id_str|           full_text|          created_at|label|
+-------------------+--------------------+--------------------+-----+
|1234294581631442944|"First positive c...|Mon Mar 02 01:48:...|    0|
|1254892996904071170|"I feel like I do...|Mon Apr 27 21:59:...|    0|
|1254692729319235584|"Most applicants ...|Mon Apr 27 08:43:...|    0|
|1239564950198259712|"The United State...|Mon Mar 16 14:51:...|    0|
|1239569884100669440|"To love purely i...|Mon Mar 16 15:11:...|    0|
|1244744395817172998|#29. Mrs &amp; Mr...|Mon Mar 30 21:52:...|    0|
|1254584662602723328|#BS 
#plainandsim...|Mon Apr 27 01:34:...|    0|
|1242123497310208010|#COVID19 HMG have...|Mon Mar 23 16:18:...|    0|
|1242131756918149123|#Coronavirus: #Fl...|Mon Mar 23 16:51:...|    0|
|1252294117494849536|#Health #KY - Fli...|Mon Apr 20 17:52:...|    0|
|1252284339204886532|#HopkinsEngineer ...|Mon Apr 20 17:13:...|    0|
|1242130472672485376

In [14]:
from datetime import datetime
import pytz
from pyspark.sql.functions import to_date, to_utc_timestamp

## Converting date string format
def getDate(x):
    if x is not None:
        return str(datetime.strptime(x,'%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %H:%M:%S"))
    else:
        return None

## UDF declaration
date_fn = udf(getDate, StringType())

## Converting datatype in spark dataframe
df_Tweets_noDup_LabeledUTC = df_Tweets_noDup_Labeled.withColumn("created_at_UTC", to_utc_timestamp(date_fn("created_at"),"UTC")) 

In [17]:
df_Tweets_noDup_LabeledUTC.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

In [16]:
df_Tweets_noDup_LabeledUTC.printSchema()

root
 |-- id_str: string (nullable = true)
 |-- full_text: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- created_at_UTC: timestamp (nullable = true)

