In [None]:
import nltk
import pandas as pd
import csv
import re
import numpy as np
import matplotlib.pyplot as plt

## Generate Clean Tweet Function

In [None]:
# Data clean
def data_clean(data:list):
  text = []
  for t in data:
    temp = t.lstrip('b\'')
    temp = temp.rstrip('\'')
    cle = re.sub('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+#]|[!*\(\),]|'\
                       '(?:%[0-9a-fA-F][0-9a-fA-F]))+','', temp)
    cle = re.sub("(@[A-Za-z0-9_]+)","", cle)
    cle = re.sub(r'(\\x(.){2})', '', cle)
    cle = re.sub('[0-9]', '', cle)
    cle = re.sub(r'(\\n)', ' ', cle)
    cle = re.sub(r'[^\w\s]','', cle)
    cle = cle.lower()
    text.append(cle)

  return text

## Lemmatize Function

In [None]:
# nltk.download('wordnet')
# nltk.download('averaged_perceptron_tagger')
from nltk import pos_tag
from nltk.stem.wordnet import WordNetLemmatizer

In [None]:
def tweets_lemmatized(tweet_tokens, stopword_list:list):
    lemmatizer = WordNetLemmatizer()
    tweets_lemmatized = []
    for word, tag in pos_tag(tweet_tokens):
        if tag.startswith('NN'):
            pos = 'n'
        elif tag.startswith('VB'):
            pos = 'v'
        else:
            pos = 'a'
        _token = lemmatizer.lemmatize(word, pos)
        if _token.lower() not in stopword_list:
            tweets_lemmatized.append(_token.lower())
#             for item in nltk.bigrams (_token.split()):
#                 tweets_lemmatized.append(' '.join(item))
#             for item in nltk.trigrams (_token.split()):
#                 tweets_lemmatized.append(' '.join(item))
    return tweets_lemmatized


In [None]:
# h = "I am trying to workd on a bigram!"
# tweets_lemmatized(data_clean([h]),stopword_List)

# Train a classifier using nltk twitter sample

In [None]:
# import nltk
# Import and stopwords
# nltk.download('stopwords')
from nltk.corpus import stopwords
stopword_List = stopwords.words('english')
#nltk.download('twitter_samples')

In [None]:
def makeSingleWordToTrainDict(m_word:str,m_argument:str,m_stopword_list:list):
    m_t = tweets_lemmatized([m_word],m_stopword_list)
    return ({m_t[i]:True for i in range(len(m_t))},m_argument)

In [None]:
# Import some positive & negative words
# The dictionary is retireved from https://github.com/leelaylay/TweetSemEval/tree/master/dataset/dict
posFile = '.\\dict\\positive-words.txt'
negFile = '.\\dict\\negative-words.txt'
sentimentDict = []
with open(posFile,'r') as posFileReader:
    for single_word in posFileReader:
        sentimentDict.append(makeSingleWordToTrainDict(single_word.rstrip("\n"),'Positive',stopword_List))
with open(negFile,'r') as negFileReader:
    for single_word in negFileReader:
        sentimentDict.append(makeSingleWordToTrainDict(single_word.rstrip("\n"),'Negative',stopword_List))

In [None]:
# Import 
from nltk.corpus import twitter_samples
sample_pos_tweets = twitter_samples.strings('positive_tweets.json')
sample_neg_tweets = twitter_samples.strings('negative_tweets.json')

In [None]:
def makeWordListToTrainDict(mwl_word:list,mwl_argument:str,mwl_stopword_list:list):
    mwl_t = tweets_lemmatized(mwl_word,mwl_stopword_list)
    return ({mwl_t[i]:True for i in range(len(mwl_t))},mwl_argument)

In [None]:
nltkTweetData = []

In [None]:
from nltk.tokenize import TweetTokenizer
# Clean sample and Tokennized, then generate to dictonary list
sample_pos_tweets_clean = data_clean(sample_pos_tweets)
sample_neg_tweets_clean = data_clean(sample_neg_tweets)
for _pos_tweet in sample_pos_tweets_clean:
    _pos_tokenized = TweetTokenizer().tokenize(_pos_tweet)
    _pos_dict = makeWordListToTrainDict(_pos_tokenized,'Positive',stopword_List)
    nltkTweetData.append(_pos_dict)
for _neg_tweet in sample_neg_tweets_clean:
    _neg_tokenized = TweetTokenizer().tokenize(_neg_tweet)
    _neg_dict = makeWordListToTrainDict(_neg_tokenized,'Negative',stopword_List)
    nltkTweetData.append(_neg_dict)

In [None]:
import random
random.shuffle(nltkTweetData)

In [None]:
train_data = sentimentDict + nltkTweetData[:7000]
test_data = nltkTweetData[7000:]

In [None]:
from nltk import FreqDist, classify, NaiveBayesClassifier
classifier = NaiveBayesClassifier.train(train_data)

In [None]:
print("Accuracy is:", classify.accuracy(classifier, test_data))

# Apply the classifier to our data

In [None]:
def result(text: str):
  cleaned_tweet = data_clean([text])
  tmp_list = []
  for x in cleaned_tweet:
    tmp_list.append(TweetTokenizer().tokenize(x))
  result = classifier.classify(dict([token, True] for token in tmp_list[0]))
  return result

In [None]:
# Define a month iterator
from datetime import timedelta, date
def monthrange(m_start_date, m_end_date):
    for n in range(int((m_end_date - m_start_date).days/31)+1):
        yield m_start_date + timedelta(n*31)

In [None]:
# Since we have multiple test data, the classfiy process is packaged as a function
def classifyTestData(testNum:int):
    _start_date = date(2020, 8, 1)
    _end_date = date(2021, 3, 23)
    classifyRes = []
    for single_month in monthrange(_start_date, _end_date):
        month = single_month.strftime("%Y-%m")
        fileFolder = ".\\test_data_",str(testNum),"\\"
        fileFolder=''.join(fileFolder)
        monthCSV = fileFolder,month,"_hydrated.csv"
        monthCSV=''.join(monthCSV)
        data = pd.read_csv(monthCSV)
        negCnt = 0
        posCnt = 0
        for text in data["text"]:
            if(result(text)== "Positive"):
                posCnt = posCnt +1
            else:
                negCnt = negCnt +1
        classifyRes.append([posCnt,negCnt,len(data["text"])])
    return classifyRes

In [None]:
folderNum = 5

In [None]:
allClassified = []
for i in range(folderNum):
    allClassified.append(classifyTestData(i+1))
allClassified = np.array(allClassified)

In [None]:
month = len(allClassified[0])

In [None]:
# For better View, we round the result to 4 decimals
pnRate = []
avgPNRate = []
for j in range(month):
    posRate = 0
    negRate = 0
    for i in range(folderNum):
        currentPosRate = allClassified[i][j][0]/allClassified[i][j][2]
        currentNegRate = allClassified[i][j][1]/allClassified[i][j][2]
        posRate = posRate + currentPosRate
        negRate = negRate + currentNegRate
        pnRate.append([np.round(currentPosRate,4),np.round(currentNegRate,4)])
    avgPNRate.append([np.round(posRate/folderNum,4),np.round(negRate/folderNum,4)])

In [None]:
start_date = date(2020, 8, 1)
end_date = date(2021, 3, 23)
months = []
for single_month in monthrange(start_date, end_date):
    months.append(single_month.strftime("%Y-%m"))

In [None]:
print(months)

In [None]:
avgPNRateDT = pd.DataFrame(data = avgPNRate, columns = ['Positive','Negative'],index = months)

In [None]:
avgPNRateDT

In [None]:
avgPNRateDT.plot.bar(stacked=True, alpha=0.5,) 
plt.title("Average Positive and Negative Rate towards Covid-19")
plt.show()

In [None]:
pnRate = np.array(pnRate)
for f in range(folderNum):
    subPNRateDT = []
    for i in range(month):
        subPNRateDT.append(pnRate[f*month+i])
    subPNRateDT = np.array(subPNRateDT)
    subPNRateDT = pd.DataFrame(data = avgPNRate, columns = ['Positive','Negative'],index = months)
    print(subPNRateDT)
    subPNRateDT.plot.bar(stacked=True, alpha=0.5,) 
    title = "Folder test_data_",str(f+1), " Positive and Negative Rate towards Covid-19"
    title=''.join(title)
    plt.title(title)
    plt.show()
    print()