In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.feature_selection import mutual_info_classif
import nltk
from nltk.corpus import stopwords  

# Data Preprocessing

In [2]:
class dataset:
    def __init__(self, demographic='all'):
        if demographic == 'all':
            self.train = pd.read_pickle('tweets-data/train.pkl')
            self.test = pd.read_pickle('tweets-data/test.pkl')
            self.dev = pd.read_pickle('tweets-data/dev.pkl')
            self.unlabeled = pd.read_pickle('tweets-data/unlabeled.pkl')

            self.train_tfidf = pd.read_pickle('tfidf/train_tfidf.pkl')
            self.test_tfidf = pd.read_pickle('tfidf/test_tfidf.pkl')
            self.dev_tfidf = pd.read_pickle('tfidf/dev_tfidf.pkl')
            self.unlabeled_tfidf = pd.read_pickle('tfidf/unlabeled_tfidf.pkl')

            self.train_emb = pd.read_pickle('sentence-transformers/train_emb.pkl')
            self.test_emb = pd.read_pickle('sentence-transformers/test_emb.pkl')
            self.dev_emb = pd.read_pickle('sentence-transformers/dev_emb.pkl')
            self.unlabeled_emb = pd.read_pickle('sentence-transformers/unlabeled_emb.pkl')
        else:
            self.train = pd.read_pickle('tweets-data/train.pkl')[pd.read_pickle('tweets-data/train.pkl')['Demographic'] == demographic]
            self.test = pd.read_pickle('tweets-data/test.pkl')[pd.read_pickle('tweets-data/test.pkl')['Demographic'] == demographic]
            self.dev = pd.read_pickle('tweets-data/dev.pkl')[pd.read_pickle('tweets-data/dev.pkl')['Demographic'] == demographic]
            self.unlabeled = pd.read_pickle('tweets-data/unlabeled.pkl')

            self.train_tfidf = pd.read_pickle('tfidf/train_tfidf.pkl')[pd.read_pickle('tfidf/train_tfidf.pkl')['Demographic'] == demographic]
            self.test_tfidf = pd.read_pickle('tfidf/test_tfidf.pkl')[pd.read_pickle('tfidf/test_tfidf.pkl')['Demographic'] == demographic]
            self.dev_tfidf = pd.read_pickle('tfidf/dev_tfidf.pkl')[pd.read_pickle('tfidf/dev_tfidf.pkl')['Demographic'] == demographic]
            self.unlabeled_tfidf = pd.read_pickle('tfidf/unlabeled_tfidf.pkl')

            self.train_emb = pd.read_pickle('sentence-transformers/train_emb.pkl')[pd.read_pickle('sentence-transformers/train_emb.pkl')['Demographic'] == demographic]
            self.test_emb = pd.read_pickle('sentence-transformers/test_emb.pkl')[pd.read_pickle('sentence-transformers/test_emb.pkl')['Demographic'] == demographic]
            self.dev_emb = pd.read_pickle('sentence-transformers/dev_emb.pkl')[pd.read_pickle('sentence-transformers/dev_emb.pkl')['Demographic'] == demographic]
            self.unlabeled_emb = pd.read_pickle('sentence-transformers/unlabeled_emb.pkl')
        
        # remove _twitter-entity_, set the text into lower case and a list of words
        self.train['text'] = self.train['text'].str.lower().str.replace('_twitter-entity_','').str.split()
        self.train['Length'] = self.train['text'].str.len()
        
        self.dev['text'] = self.dev['text'].str.lower().str.replace('_twitter-entity_','').str.split()
        self.dev['Length'] = self.dev['text'].str.len()

        

In [None]:
all_dg = dataset()
aae = dataset('AAE')
sae = dataset('SAE')

# Machine Learning Models

## KNN - Baseline

In [None]:
from sklearn.neighbors import KNeighborsClassifier
def knn(demographic, dataset, test = False):
    clf = KNeighborsClassifier(n_neighbors=5, metric='manhattan')
    y = demographic.train['Sentiment']
    dev_y = demographic.dev['Sentiment']
    if dataset == 'tfidf':
        x_tfidf = demographic.train_tfidf['TFIDF'].tolist()
        clf.fit(x_tfidf, y)
        result = clf.predict(demographic.dev_tfidf['TFIDF'].tolist())
    if dataset == 'emb':
        x_emb = demographic.train_emb['TFIDF'].tolist()
        clf.fit(x_emb, y)
        result = clf.predict(demographic.dev_emb['TFIDF'].tolist())
    if test == True:
        if dataset == 'tfidf':
            result = clf.predict(demographic.test_tfidf['TFIDF'].tolist())
        else:
            result = clf.predict(demographic.test_emb['TFIDF'].tolist())
    return result   

## Gaussian Naive Bayes

In [None]:
from sklearn.naive_bayes import GaussianNB
def gnb(demographic, dataset, prob = False, test=False):
    clf = GaussianNB()
    y = demographic.train['Sentiment']
    dev_y = demographic.dev['Sentiment']
    if dataset == 'tfidf':
        x_tfidf = demographic.train_tfidf['TFIDF'].tolist()
        clf.fit(x_tfidf, y)
        if prob == True:
            result = clf.predict_log_proba(demographic.dev_tfidf['TFIDF'].tolist())
        else:
            result = clf.predict(demographic.dev_tfidf['TFIDF'].tolist())
    if dataset == 'emb':
        x_emb = demographic.train_emb['TFIDF'].tolist()
        clf.fit(x_emb, y)
        if prob == True:
            result = clf.predict_log_proba(demographic.dev_emb['TFIDF'].tolist())
        else:
            result = clf.predict(demographic.dev_emb['TFIDF'].tolist())
    if test == True:
        if dataset == 'tfidf':
            result = clf.predict(demographic.test_tfidf['TFIDF'].tolist())
        else:
            result = clf.predict(demographic.test_emb['TFIDF'].tolist())
    return result     

## Logistic Regression

In [None]:
from sklearn.linear_model import LogisticRegression
def lr(demographic, dataset, prob = False, test = False):
    clf = LogisticRegression(max_iter=200)
    y = demographic.train['Sentiment']
    dev_y = demographic.dev['Sentiment']
    if dataset == 'tfidf':
        x_tfidf = demographic.train_tfidf['TFIDF'].tolist()
        clf.fit(x_tfidf, y)
        if prob == True:
            result = clf.predict_proba(demographic.dev_tfidf['TFIDF'].tolist())
        else:
            result = clf.predict(demographic.dev_tfidf['TFIDF'].tolist())
    if dataset == 'emb':
        x_emb = demographic.train_emb['TFIDF'].tolist()
        clf.fit(x_emb, y)
        if prob == True:
            result = clf.predict_proba(demographic.dev_emb['TFIDF'].tolist())
        else:
            result = clf.predict(demographic.dev_emb['TFIDF'].tolist())
    if test == True:
        if dataset == 'tfidf':
            result = clf.predict(demographic.test_tfidf['TFIDF'].tolist())
        else:
            result = clf.predict(demographic.test_emb['TFIDF'].tolist())
    return result       
    

# Evaluation

In [None]:
from sklearn.metrics import classification_report
dataset = ['tfidf', 'emb']
demographic = [aae, sae]
dg_name = ['aae', 'sae']
models = [knn, gnb, lr]
model_names = ['KNN', 'GNB', 'LR']

for i in range(len(models)):
    print(model_names[i])
    for j in range(len(demographic)):
        class_y = demographic[j].dev['Sentiment']
        print(dg_name[j])
        for k in dataset:
            print(k)
            y_pred = models[i](demographic[j], k)
            print(classification_report(class_y, y_pred, zero_division=0))
    print()

## LR Evaluation 

In [None]:
def lr_evaluate(dataset):
    aae_prob = lr(aae, dataset, True)
    aae_pred_label = lr(aae, dataset)
    aae_label = np.array(aae.dev['Sentiment'])
    aae_prob = pd.DataFrame(aae_prob)
    aae_pred_label = pd.DataFrame(aae_pred_label)
    aae_label = pd.DataFrame(aae_label)
    aae_prob = pd.concat([aae_prob, aae_pred_label, aae_label], axis=1)
    aae_prob.columns = ['prob_neg', 'prob_pos', 'predictions', 'labels']

    sae_prob = lr(sae, dataset, True)
    sae_pred_label = lr(sae, dataset)
    sae_label = np.array(sae.dev['Sentiment'])
    sae_prob = pd.DataFrame(sae_prob)
    sae_pred_label = pd.DataFrame(sae_pred_label)
    sae_label = pd.DataFrame(sae_label)
    sae_prob = pd.concat([sae_prob, sae_pred_label, sae_label], axis=1)
    sae_prob.columns = ['prob_neg', 'prob_pos', 'predictions', 'labels']
    
    prob = [0.5, 0.6, 0.7, 0.8, 0.9, 1]
    aae_pos = []
    aae_neg = []
    aae_pos_num = []
    aae_neg_num = []
    for i in range(len(prob)):
        if prob[i] == 1:
            break
        else:
            pos_correct = aae_prob[(aae_prob['prob_pos'] > prob[i]) & (aae_prob['prob_pos'] <= prob[i+1]) & 
                                     (aae_prob['predictions'] == aae_prob['labels'])]
            pos_prob = aae_prob[(aae_prob['prob_pos'] > prob[i]) & (aae_prob['prob_pos'] <= prob[i+1])]
            aae_pos.append(len(pos_correct) / len(pos_prob))
            aae_pos_num.append(len(pos_prob))

            neg_correct = aae_prob[(aae_prob['prob_neg'] > prob[i]) & (aae_prob['prob_neg'] <= prob[i+1]) & 
                                     (aae_prob['predictions'] == aae_prob['labels'])]
            neg_prob = aae_prob[(aae_prob['prob_neg'] > prob[i]) & (aae_prob['prob_neg'] <= prob[i+1])]
            aae_neg.append(len(neg_correct) / len(neg_prob))
            aae_neg_num.append(len(neg_prob))

    sae_pos = []
    sae_neg = []
    sae_pos_num = []
    sae_neg_num = []
    for i in range(len(prob)):
        if prob[i] == 1:
            break
        else:
            pos_correct = sae_prob[(sae_prob['prob_pos'] > prob[i]) & (sae_prob['prob_pos'] <= prob[i+1]) & 
                                     (sae_prob['predictions'] == sae_prob['labels'])]
            pos_prob = sae_prob[(sae_prob['prob_pos'] > prob[i]) & (sae_prob['prob_pos'] <= prob[i+1])]
            sae_pos.append(len(pos_correct) / len(pos_prob))
            sae_pos_num.append(len(pos_prob))
            neg_correct = sae_prob[(sae_prob['prob_neg'] > prob[i]) & (sae_prob['prob_neg'] <= prob[i+1]) & 
                                     (sae_prob['predictions'] == sae_prob['labels'])]
            neg_prob = sae_prob[(sae_prob['prob_neg'] > prob[i]) & (sae_prob['prob_neg'] <= prob[i+1])]
            sae_neg.append(len(neg_correct) / len(neg_prob))
            sae_neg_num.append(len(neg_prob))
            
    plt.plot(aae_pos)
    plt.plot(aae_neg)
    plt.plot(sae_pos)
    plt.plot(sae_neg)
    plt.legend(['aae_pos', 'aae_neg', 'sae_pos', 'sae_neg'])
    plt.xticks(range(5))
    plt.title('Range of Prediction Accuracies')
    plt.xlabel('Ranges\n(0=0.5-0.6, 1=0.6-0.7, 2=0.7-0.8, 3=0.8-0.9, 4=0.9-1)')
    plt.ylabel('Accuracy Proportion')
    plt.show()
    
    plt.plot(aae_pos_num)
    plt.plot(aae_neg_num)
    plt.plot(sae_pos_num)
    plt.plot(sae_neg_num)
    plt.legend(['aae_pos', 'aae_neg', 'sae_pos', 'sae_neg'])
    plt.xticks(range(5))
    plt.title('Number of the Range of Prediction Confidence')
    plt.xlabel('Ranges\n(0=0.5-0.6, 1=0.6-0.7, 2=0.7-0.8, 3=0.8-0.9, 4=0.9-1)')
    plt.ylabel('Numbers')
    plt.show()

In [None]:
print('Logistic Regression tfidf')
lr_evaluate('tfidf')

In [None]:
print('Logistic Regression emb')
lr_evaluate('emb')

# POS Tagging

In [None]:
def pos(demographic):
    pos_set = []
    for i in demographic['text']:
        tags = nltk.pos_tag(i, tagset='universal')
        num_pos = {}
        for j in tags:
            num_pos[j[1]] = num_pos.get(j[1], 0) + 1
        pos_set.append(num_pos)
    pos_set = pd.DataFrame(pos_set)
    pos_set = pos_set.fillna(0)
    
    length = np.array(demographic['Length'])
    length = pd.DataFrame(length)
    length.columns = ['Length']
    addition_set = pd.concat([pos_set,length], axis=1)
    return addition_set

## Mutual Information Classification

In [None]:
# aae
y = aae.train['Sentiment']
pos_aae = pos(aae.train)
mi = mutual_info_classif(pos_aae, y,discrete_features=True)
for i in range(13):
    print(pos_aae.columns[i],':', mi[i])

In [None]:
# sae
y = sae.train['Sentiment']
pos_sae = pos(sae.train)
mi = mutual_info_classif(pos_sae, y,discrete_features=True)
for i in range(13):
    print(pos_sae.columns[i],':', mi[i])

In [None]:
# select 5 pos features and length with the most correlation
key_pos = ['NOUN', 'VERB', 'ADV', 'PRT', 'Length']
pos_aae_train = pos(aae.train)
pos_aae_train.index = aae.train.index
pos_sae_train = pos(sae.train)
pos_sae_train.index = sae.train.index


pos_aae_dev = pos(aae.dev)
pos_aae_dev.index = aae.dev.index
pos_sae_dev = pos(sae.dev)
pos_sae_dev.index = sae.dev.index

In [None]:
# the model using pos as features
def pos_model(model, dg):
    if model == 'knn':
        clf = KNeighborsClassifier(n_neighbors=5, metric='manhattan')
    elif model == 'gnb':
        clf = GaussianNB()
    else:
        clf = LogisticRegression(max_iter=200)
        
    y = dg.train['Sentiment']
    dev_y = dg.dev['Sentiment']
    print(model)
    if dg == aae:
        print('aae')
        clf.fit(pos_aae_train, y)
        
        prediction = clf.predict(pos_aae_dev)
        score = classification_report(prediction, dev_y, zero_division=0)
    else:
        print('sae')
        clf.fit(pos_sae_train, y)
        prediction = clf.predict(pos_sae_dev)
        score = classification_report(prediction, dev_y, zero_division=0)   
    print(score)
    print()

In [None]:
#scores
pos_model('knn', aae)
pos_model('knn', sae)
pos_model('gnb', aae)
pos_model('gnb', sae)
pos_model('lr', aae)
pos_model('lr', sae)