In [1]:
import json
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from langdetect import detect
import nltk
from nltk.stem import WordNetLemmatizer 
import re
import emoji
from operator import add
import pandas as pd

In [2]:
lemmatizer = WordNetLemmatizer()
sentiment_dictionary = {}
emoji_sentiment_dictionary = {}
positive_words = []
negative_words = []

# load negative/positive words to python lists for tweet analysis

with open('positive_words.txt') as f:
    positive_words = f.read().splitlines()
    
with open('negative_words.txt') as f:
    negative_words = f.read().splitlines()
    
# load emoji sentiment
with open('emoji_sentiment.json') as j:
    emoji_sentiment_dictionary = json.load(j)
    
# with open('dict.tff','r') as f:
#     lines = f.readlines()

translate = {
    "weak":0.5,
    "strong":1,
    "positive":1,
    "neutral":0,
    "both":0,
    "negative":-1
}

# for line in lines:
#     word = re.sub(r'.*word1=([a-z]+)\spos.*\n', r'\1', line)
#     subjectivity = re.sub(r'.*type=([a-z]+)subj.*\n', r'\1', line)
#     polarity = re.sub(r'.*priorpolarity=([a-z]+)\n', r'\1', line)
#     sentiment_dictionary[word] = translate[polarity] * translate[subjectivity]

In [3]:
stopwords = [
    "i",
    "me",
    "my",
    "myself",
    "we",
    "our",
    "ours",
    "ourselves",
    "you",
    "your",
    "yours",
    "yourself",
    "yourselves",
    "he",
    "him",
    "his",
    "himself",
    "she",
    "her",
    "hers",
    "herself",
    "it",
    "its",
    "itself",
    "they",
    "them",
    "their",
    "theirs",
    "themselves",
    "what",
    "which",
    "who",
    "whom",
    "this",
    "that",
    "these",
    "those",
    "am",
    "is",
    "are",
    "was",
    "were",
    "be",
    "been",
    "being",
    "have",
    "has",
    "had",
    "having",
    "do",
    "does",
    "did",
    "doing",
    "a",
    "an",
    "the",
    "and",
    "but",
    "if",
    "or",
    "because",
    "as",
    "until",
    "while",
    "of",
    "at",
    "by",
    "for",
    "with",
    "about",
    "against",
    "between",
    "into",
    "through",
    "during",
    "before",
    "after",
    "above",
    "below",
    "to",
    "from",
    "up",
    "down",
    "in",
    "out",
    "on",
    "off",
    "over",
    "under",
    "again",
    "further",
    "then",
    "once",
    "here",
    "there",
    "when",
    "where",
    "why",
    "how",
    "all",
    "any",
    "both",
    "each",
    "few",
    "more",
    "most",
    "other",
    "some",
    "such",
    "no",
    "nor",
    "not",
    "only",
    "own",
    "same",
    "so",
    "than",
    "too",
    "very",
    "s",
    "t",
    "can",
    "will",
    "just",
    "don",
    "should",
    "now"
]

def remove_stopwords(text):
    to_remove = '|'.join(stopwords)
    regex = re.compile(r'\b('+to_remove+r')\b', flags=re.IGNORECASE)
    return regex.sub("", text)

def is_url(text):
    if re.search(r'(https?:\/\/[^\s]+)', text):
        return True
    return False

def lemmatize(text):
    return [lemmatizer.lemmatize(token.lower()) for token in text]

def tokenize(text):
    text = " ".join([word for word in text.split() if not is_url(word)])
    return re.findall(r'[a-zA-Z0-9-\']+', text)

# function returning unique emojis in a string
def get_emojis(text):
    return list(set([fix_emoji(c) for c in text if c in emoji.UNICODE_EMOJI]))

def fix_emoji(emoji):
    ret = re.sub(br".*(\\[^\\]*)$", br'\1' ,emoji.encode('unicode-escape')).decode('unicode-escape')
    return ret

def evaluate_by_emojis(text):
    negative = 0
    positive = 0
    for emoji in get_emojis(text):
        if emoji in emoji_sentiment_dictionary:
            if emoji_sentiment_dictionary[emoji]["positive-emotion"] > emoji_sentiment_dictionary[emoji]["negative-emotion"]:
                positive += 1
            else:
                negative += 1
            
    return 1 if positive>negative else -1

def evaluate_by_words(text):
    negative = 0
    positive = 0
    for token in tokenize(text):
        token = token.lower()
        token = lemmatizer.lemmatize(token)
        if token in positive_words:
            positive += 1
        if token in negative_words:
            negative += 1
            
    if positive == negative:
        return 0
    return 1 if positive>negative else -1

In [4]:
# filters for streaming pipeline

def language_filter(text, lang):
    try:
        if detect(text) == lang:
            return True
    except:
        return False
    return False

def no_retweet(text):
    if text.startswith('RT @'):
        return False
    return True

def split(tweet):
    text = remove_stopwords(tweet['text'])
    sentiment = tweet['sentiment']
    return [
        (lemmatizer.lemmatize(token.lower()), (sentiment, 1))
        for token in tokenize(text)
    ]

def split_emojis(tweet):
    emojis = get_emojis(tweet['text'])
    sentiment = tweet['sentiment']
    
    return [
        (emoji, (sentiment, 1))
        for emoji in emojis
    ]

def only_emojis(text):
    return True if len(get_emojis(text)) > 0 else False

def add_tuples(a,b):
    return a[0]+b[0], a[1]+b[1]

In [None]:
inputPath = "twitter_stream_2020_03_01/03/01/[0-1][0-9]"
spark = SparkSession.builder.appName("Sentiment App").getOrCreate()
pd.set_option('display.max_rows', 500)

data = (
    spark
        .read
        .json(inputPath)
)

In [6]:
evaluated_words = (
    data
        .rdd
        .filter(lambda tweet: tweet['text'] is not None)
        .filter(lambda tweet: no_retweet(tweet['text']))
        .filter(lambda tweet: only_emojis(tweet['text']))
        .filter(lambda tweet: language_filter(tweet['text'], 'en'))
        .map(lambda tweet: {
            'sentiment':evaluate_by_emojis(tweet['text']), 
            'text':tweet['text']
        })
        .flatMap(lambda tweet: split(tweet))
        .reduceByKey(add_tuples)
        .map(lambda row: (row[0], row[1][0], row[1][1]))
)

df = evaluated_words.toDF().toPandas()

In [7]:
# rename stuff, create ratio column 
df['ratio']=df.apply(lambda row: row['_2'] / row['_3'], axis=1)
df = df.rename(columns={'_1':'word', '_2':'score', '_3':'count'})

In [8]:
# most negative words
df.sort_values(by=['ratio', 'count'], ascending=[True, False]).head(20)

Unnamed: 0,word,score,count,ratio
53253,maternal,-28,28,-1.0
27459,downshifter,-16,16,-1.0
88127,paternal,-16,16,-1.0
56772,aq7481-600,-14,14,-1.0
25976,5h,-13,13,-1.0
67446,sony,-9,9,-1.0
7341,recep,-7,7,-1.0
2013,esan,-6,6,-1.0
5457,nowhiring,-6,6,-1.0
17512,blah,-6,6,-1.0


In [9]:
# most positive words
df.sort_values(by=['ratio', 'count'], ascending=[False, False]).head(20)

Unnamed: 0,word,score,count,ratio
24035,iherb,355,355,1.0
25719,aqh3836,181,181,1.0
30803,delighted,70,70,1.0
49880,slytherin,52,52,1.0
30879,heart-shaped,51,51,1.0
30936,sun3,41,41,1.0
44700,sun1,41,41,1.0
31021,nisnass,33,33,1.0
44829,vogacloset,32,32,1.0
10378,ps4live,31,31,1.0


In [10]:
df.sort_values(by='score').to_csv('result_word_sentiment.csv')

In [11]:
evaluated_emojis = (
    data
        .rdd
        .filter(lambda tweet: tweet['text'] is not None)
        .filter(lambda tweet: no_retweet(tweet['text']))
        .filter(lambda tweet: only_emojis(tweet['text']))
        .filter(lambda tweet: language_filter(tweet['text'], 'en'))
        .map(lambda tweet: {
            'sentiment':evaluate_by_words(tweet['text']), 
            'text':tweet['text']
        })
        .flatMap(lambda tweet: split_emojis(tweet))
        .reduceByKey(add_tuples)
        .map(lambda row: (row[0], row[1][0], row[1][1]))
)

df2 = evaluated_emojis.toDF().toPandas()

In [12]:
# rename stuff, create ratio column 
df2['ratio']=df2.apply(lambda row: row['_2'] / row['_3'], axis=1)
df2 = df2.rename(columns={'_1':'word', '_2':'score', '_3':'count'})

In [13]:
# most negative emojis
df2.sort_values(by=['ratio', 'count'], ascending=[True, False]).head(20)

Unnamed: 0,word,score,count,ratio
1120,🕎,-2,2,-1.0
8,🛃,-1,1,-1.0
11,🚉,-1,1,-1.0
127,⏸,-1,1,-1.0
220,🈵,-1,1,-1.0
225,🐡,-1,1,-1.0
251,🚃,-1,1,-1.0
340,🚸,-1,1,-1.0
350,🛳,-1,1,-1.0
423,🍶,-1,1,-1.0


In [14]:
# most positive emojis
df2.sort_values(by=['ratio', 'count'], ascending=[False, False]).head(20)

Unnamed: 0,word,score,count,ratio
122,🦌,5,5,1.0
664,🥐,4,4,1.0
387,🌘,3,3,1.0
524,👪,3,3,1.0
54,🍚,2,2,1.0
75,🦙,2,2,1.0
115,🥨,2,2,1.0
148,🦞,2,2,1.0
213,🚙,2,2,1.0
271,📡,2,2,1.0


In [15]:
df2.sort_values(by='score').to_csv('result_emoji_sentiment.csv')

In [None]:
def evaluate(text):
    senteval = 0
    words = 0
    emojis = 0

    text = remove_stopwords(text)

    # lemmatize each token
    for token in tokenize(text):
        words += 1
        token = token.lower()
        token = lemmatizer.lemmatize(token)

        if token in positive_words:
            senteval += 1
        if token in negative_words:
            senteval -= 1

    for emoji in get_emojis(text):
        emojis+=1
        if emoji in df2.values:
             senteval += df2.loc[df2['word'] == emoji].values[0][3]
                
    count = words+emojis
    if count == 0:
        return 0

    return senteval / count

In [None]:
evaluated_tweets = (
    data
        .rdd
        .filter(lambda tweet: tweet['text'] is not None)
        .filter(lambda tweet: no_retweet(tweet['text']))
        .filter(lambda tweet: language_filter(tweet['text'], 'en'))
        .map(lambda tweet: 
             (
                 evaluate(tweet['text']), tweet['text']
             )
        )
)

df3 = evaluated_tweets.toDF().toPandas()

In [17]:
df3.nlargest(30, '_1')

Unnamed: 0,_1,_2
735,1.0,Just praising https://t.co/pTfHVTBPLW
750,1.0,that was incredible!
1390,1.0,Correct
1994,1.0,Perfectly balanced.
2299,1.0,Pride of Love https://t.co/Obl4hcL7A1
2899,1.0,UNDERSTANDABLE????
3179,1.0,"If you support me, I will support you."
3631,1.0,right so https://t.co/dGZlviuHX6
4098,1.0,Enough https://t.co/ADESDBN5E1
4984,1.0,Creative!


In [18]:
df3.nsmallest(30, '_1')

Unnamed: 0,_1,_2
272,-1.0,This is how and when I will die https://t.co/V...
1128,-1.0,HE’S INSANE https://t.co/Xl5E71wd7q
4729,-1.0,Negative
5240,-1.0,This is bad
5387,-1.0,you bitches so hateful it’s disgusting.
5620,-1.0,Racism is over
5868,-1.0,headache
5903,-1.0,Just so wrong.
6086,-1.0,What are your symptoms?
6232,-1.0,why is he my bias again
