In [None]:
from textblob import TextBlob
from textblob.sentiments import NaiveBayesAnalyzer
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import scipy.stats as stats
import math
import re
import nltk
import html
from nltk import word_tokenize, pos_tag
from nltk.corpus import stopwords, wordnet
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import csv
import time
from pycorenlp import StanfordCoreNLP

nltk.download('vader_lexicon')
nltk.download('sentiwordnet')

In [None]:
# test with pre-labelled existing data from Twitter
raw = pd.read_csv('./Data/twitter_corpus-master/full-corpus.csv', header=0)

In [None]:
raw.head()

In [None]:
raw.Sentiment.unique()

In [None]:
test = raw[raw.Sentiment != 'irrelevant']
test.drop(['Topic', 'TweetId', 'TweetDate'], inplace=True, axis=1)

In [None]:
# basic pre-processing done on Twitter data

test.loc[:, 'TweetText'] = test.loc[:, 'TweetText'].apply(lambda x: html.unescape(x))
test.loc[:, 'TweetText'] = test.loc[:, 'TweetText'].apply(lambda x: re.sub(r'(www\.|https?://).*?(\s|$)|@.*?(\s|$)|\$.*?(\s|$)|\d|\%|\\|/|-|_', ' ', x))
test.loc[:, 'TweetText'] = test.loc[:, 'TweetText'].apply(lambda x: re.sub(r'\s+', ' ', x))


In [None]:
test.head()

## Testing TextBlob

In [None]:
start = time.process_time()
test.loc[:50, 'TextBlob Sentiment Score'] = test.loc[:50, ['TweetText']].apply(lambda x: TextBlob(x[0], analyzer=NaiveBayesAnalyzer()).sentiment[1], axis=1)
time_TextBlob = time.process_time() - start

In [None]:
print(time_TextBlob)

In [None]:
test.head()

In [None]:
def get_class_TextBlob(x):
    if x >= 0.6:
        return "positive"
    elif x <= 0.4:
        return "negative"
    else:
        return "neutral"

test.loc[:, 'TextBlob Sentiment'] = test.loc[:, ['TextBlob Sentiment Score']].apply(lambda x: get_class_TextBlob(x[0]), axis=1)

In [None]:
test.loc[:, 'TextBlob Match'] = np.where(test.loc[:, 'TextBlob Sentiment'] == test.loc[:, 'Sentiment'], 'Yes', 'No')

In [None]:
TextBlob_accuracy = (test['TextBlob Match'][:50] == 'Yes').sum() / 50
print(TextBlob_accuracy)

## Testing VADER

In [None]:
sia = SentimentIntensityAnalyzer()

In [None]:
start = time.process_time() 
test.loc[:, 'VADER Sentiment Score'] = test.loc[:, ['TweetText']].apply(lambda x: sia.polarity_scores(x[0])['compound'], axis=1)
time_VADAR = time.process_time() - start

In [None]:
def get_class_VADER(x):
    if x >= 0.3:
        return "positive"
    elif x <= -0.3:
        return "negative"
    else:
        return "neutral"

test.loc[:, 'VADER Sentiment'] = test.loc[:, ['VADER Sentiment Score']].apply(lambda x: get_class_VADER(x[0]), axis=1)

In [None]:
test.loc[:, 'VADER Match'] = np.where(test.loc[:, 'VADER Sentiment'] == test.loc[:, 'Sentiment'], 'Yes', 'No')

In [None]:
VADER_accuracy = (test['VADER Match'] == 'Yes').sum() / len(test)
print(VADER_accuracy)

## Testing Stanford CoreNLP

In [None]:
nlp = StanfordCoreNLP('http://localhost:9000')
# need to connect to their server

In [None]:
CoreNLP_scores = []
CoreNLP_ss = 0
length_s = 0

for i in range(50):
    text = test['TweetText'][i]
    result = nlp.annotate(text,
                   properties={
                       'annotators': 'sentiment, ner, pos',
                       'outputFormat': 'json',
                       'timeout': 15000,
                   })

    for s in result['sentences']:
        CoreNLP_ss += int(s['sentimentValue'])
        length_s += 1
        
    if (length_s > 0):
        score = CoreNLP_ss/length_s
    if (score < 2):
        score_text = 'negative'
    elif(score == 2):
        score_text = 'neutral'
    else: 
        score_text = 'positive'
        
    CoreNLP_scores.append(score_text)

In [None]:
CoreNLP_sentiments_df = pd.DataFrame({'CoreNLP Sentiment':CoreNLP_scores})
test['CoreNLP Sentiment'] = CoreNLP_sentiments_df

In [None]:
test.loc[:, 'CoreNLP Match'] = np.where(test.loc[:, 'CoreNLP Sentiment'] == test.loc[:, 'Sentiment'], 'Yes', 'No')

In [None]:
CoreNLP_accuracy = (test['CoreNLP Match'] == 'Yes').sum() / len(CoreNLP_scores)
print(CoreNLP_accuracy)

## Testing using SentiWordNet

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os
import numpy as np
from nltk import word_tokenize, pos_tag
from nltk.corpus import sentiwordnet as swn
from nltk.wsd import lesk
from nltk.stem.wordnet import WordNetLemmatizer

def get_sentiword_score(message):
        """
            takes a message and performs following operations:
            1) tokenize
            2) POS tagging
            3) reduce text to nouns, verbs, adjectives, adverbs
            4) lemmatize the words
            for each selected tag, if more than one sense exists, performs word sense disambiguation
            using lesk algorithm and finally returns positivity score, negativity score from
            sentiwordnet lexicon
        """

        tokens = word_tokenize(message)
        pos = pos_tag(tokens)
        lemmatizer = WordNetLemmatizer()
        selected_tags = list()
        scores = list()

        for i in range(len(pos)):
            if pos[i][1].startswith('J'):
                selected_tags.append((lemmatizer.lemmatize(pos[i][0], 'a'), 'a'))
            elif pos[i][1].startswith('V'):
                selected_tags.append((lemmatizer.lemmatize(pos[i][0], 'v'), 'v'))
            elif pos[i][1].startswith('N'):
                selected_tags.append((lemmatizer.lemmatize(pos[i][0], 'n'), 'n'))
            elif pos[i][1].startswith('R'):
                selected_tags.append((lemmatizer.lemmatize(pos[i][0], 'r'), 'r'))

        # score list: [(sense name, pos score, neg score)]
        for i in range(len(selected_tags)):
            senses = list(swn.senti_synsets(selected_tags[i][0], selected_tags[i][1]))
            if len(senses) == 1:
                scores.append((senses[0].synset.name(), senses[0].pos_score(), senses[0].neg_score()))
            elif len(senses) > 1:
                sense = lesk(tokens, selected_tags[i][0], selected_tags[i][1])
                if sense is None:
                    # take average score of all original senses
                    pos_score = 0
                    neg_score = 0
                    for i in senses:
                        pos_score += i.pos_score()
                        neg_score += i.neg_score()
                    scores.append((senses[0].synset.name(), pos_score/len(senses), neg_score/len(senses)))
                else:
                    sense = swn.senti_synset(sense.name())
                    scores.append((sense.synset.name(), sense.pos_score(), sense.neg_score()))

        """
            Aggregating sentiment scores:
                Sum up the positive and negative scores
                Whenever a negative word is encountered, reverse the positive and negative score.
        """

        # collected from word stat financial dictionary
        negation_words = list(open('Lexicon/lexicon_negation_words.txt').read().split())

        # final_score = 0
        # counter = 1
        # for score in scores:
        #     if any(score[0].startswith(x) for x in negation_words):
        #         counter *= -1
        #     else:
        #         if score[1] > score[2]:
        #             final_score += counter*score[1]
        #         elif score[1] < score[2]:
        #             final_score -= counter*score[2]

        counter = 1
        pos_score = 0
        neg_score = 0
        for score in scores:
            if any(score[0].startswith(x) for x in negation_words):
                counter *= -1
            else:
                if counter == 1:
                    pos_score += score[1]
                    neg_score += score[2]
                elif counter == -1:
                    pos_score += score[2]
                    neg_score += score[1]

        final_score = [pos_score, neg_score]
        return final_score

In [None]:
start = time.process_time() 
SWN_scores = []
for x in test['TweetText']:
    SWN_scores.append(get_sentiword_score(x))
time_SWN = time.process_time() - start

In [None]:
ss = []
for x in SWN_scores:
    if x[0] + x[1] == 0:
        ss.append('neutral')
    elif (x[0] / (x[0] + x[1])) > 0.6:
        ss.append('positive')
    elif (x[1] / (x[0] + x[1])) > 0.6:
        ss.append('negative')
    else: 
        ss.append('neutral')

In [None]:
ss_df = pd.DataFrame({'SWN Sentiment':ss})
test['SWN Sentiment'] = ss_df

In [None]:
test.loc[:, 'SWN Match'] = np.where(test.loc[:, 'SWN Sentiment'] == test.loc[:, 'Sentiment'], 'Yes', 'No')

In [None]:
SWN_accuracy = (test['SWN Match'] == 'Yes').sum() / len(test)
print(SWN_accuracy)

## Comparison

In [None]:
time_per_text_TextBlob = time_TextBlob / 50
time_per_text_VADAR = time_VADAR / len(test)
# time per text for Stanford CoreNLP is manually measured and calculated
time_per_text_SWN = time_SWN / len(test)

In [None]:
print("Average time taken per tweet (TextBlob): ", time_per_text_TextBlob)
print("Average time taken per tweet (VADER): ", time_per_text_VADAR)
print("Average time taken per tweet (SWN): ", time_per_text_SWN)

In [None]:
print("TextBlob accuracy: ", TextBlob_accuracy)
print("VADER accuracy: ", VADER_accuracy)
print("CoreNLP accuracy: ", CoreNLP_accuracy)
print("SWN accuracy: ", SWN_accuracy)