In [None]:
import pandas as pd
import numpy as np
import gensim
from gensim.models import Word2Vec
import gensim.downloader
glove_vectors = gensim.downloader.load('glove-wiki-gigaword-50')

In [None]:
from sklearn.model_selection import train_test_split

df_train = pd.read_csv('data/Challenge1_Training_Scenarios.csv')
df_train.set_index('scenario_id', inplace=True)

# development
X = df_train['scenario'].to_numpy()
y = df_train['danger_level'].to_numpy()
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.2)

# submission
# train_X = list(df_train['scenario'])
# train_y = list(df_train['danger_level'])

# df_test = pd.read_csv('data/Challenge1_Test_Scenarios.csv')
# test_X = list(df_test['scenario'])

In [None]:
def accuracy_ratio(y, predictions):
    zipped = zip(y, predictions)
    total_points = 0
    for curr in zipped:
        if curr[0] == curr[1]:
            total_points += 2
        elif curr[0] == curr[1]+1 or curr[0] == curr[1]-1:
            total_points += 1
        else:
            total_points += 0

    return (total_points/(len(y)*2))

In [None]:
def view_predictions(X, y, pred):
    visualize = pd.DataFrame(list(zip(list(X), y, list(pred))))
    visualize['diff'] = list(np.subtract(y, pred))
    visualize = visualize.sort_values(by = 'diff')
    pd.set_option("display.max_rows", None)
    print(visualize.head())

In [None]:
# 1st approach

In [None]:
import re
import string
def remove_punctuation_and_lower(text):
    text = re.sub(r"[{}]".format(string.punctuation), " ", text)
    text = text.lower()
    return text

In [None]:
# create bows
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

class LemmaTokenizer:
    def __init__(self):
        self.wnl = WordNetLemmatizer()
    def __call__(self, doc):
        return [self.wnl.lemmatize(t) for t in word_tokenize(doc)]

vect = TfidfVectorizer(preprocessor=remove_punctuation_and_lower, tokenizer=LemmaTokenizer(), ngram_range=(1,3))
train_counts = vect.fit_transform(train_X)
test_counts = vect.transform(test_X)

In [None]:
# hyperparameter tuning for models being tested below
from sklearn.model_selection import GridSearchCV

In [None]:
from sklearn.naive_bayes import MultinomialNB
bow_mnb = MultinomialNB(alpha=0.1)
#parameters = {'alpha': (1,0.1,0.01,0.001,0.0001,0.00001)}
#bow_mnb = GridSearchCV(MultinomialNB(), parameters)

bow_mnb.fit(train_counts, train_y)
pred_bow_mnb = bow_mnb.predict(test_counts)
print(accuracy_ratio(test_y, pred_bow_mnb))
#bow_mnb.best_params_

In [None]:
from sklearn.linear_model import LinearRegression
bow_lr = LinearRegression()
#parameters = {'fit_intercept':[True,False], 'normalize':[True,False], 'copy_X':[True,False]}
#bow_lr = GridSearchCV(LinearRegression(), parameters)

bow_lr.fit(train_counts, train_y)
pred_bow_lr = np.rint(bow_lr.predict(test_counts))
print(accuracy_ratio(test_y, pred_bow_lr))
#bow_lr.best_params_

In [None]:
from sklearn.neighbors import KNeighborsClassifier
bow_kn = KNeighborsClassifier(n_neighbors=30, weights='distance', metric='euclidean')
#parameters = {'n_neighbors':[3,5,11,15,30,50], 'weights':['uniform','distance'], 'metric':['euclidean','manhattan','minkowski']}
#bow_kn = GridSearchCV(KNeighborsClassifier(), parameters)

bow_kn.fit(train_counts, train_y)
pred_bow_kn = bow_kn.predict(test_counts)
print(accuracy_ratio(test_y, pred_bow_kn))
#bow_kn.best_params_

In [None]:
from sklearn.svm import SVC
bow_svc = SVC(C=10, kernel='linear')
#parameters = {'C':[0.1,1,10,50] , 'gamma':['scale','auto',0.1,1], 'kernel':['linear','rbf','sigmoid']}
#bow_svc = GridSearchCV(SVC(), parameters)

bow_svc.fit(train_counts, train_y)
pred_bow_svc = bow_svc.predict(test_counts)
print(accuracy_ratio(test_y, pred_bow_svc))
#bow_svc.best_params_

In [None]:
#view_predictions(test_X, test_y, pred_bow_svc)

In [None]:
# 2nd approach

In [None]:
from nltk.tokenize import word_tokenize
import string

def tokenize_remove_punctuation(input):
    input = input.lower()
    input = word_tokenize(input)
    input = list(filter(lambda token: token not in string.punctuation, input))
    return input

def create_embedded(input):
    embedded = input.copy()
    for i, entry in enumerate(embedded):
        embedded[i] = tokenize_remove_punctuation(embedded[i])

        if 'covid' in embedded[i]:
            embedded[i][embedded[i].index('covid')] = 'coronavirus'

        for j, word in enumerate(embedded[i]):
            if embedded[i][j] in glove_vectors:
                embedded[i][j] = glove_vectors[embedded[i][j]]
            else:
                embedded[i][j] = None
    return embedded

In [None]:
# grab embeddings for valued words and format
valued_words = ['mask', 'coronavirus', 'travel', 'home', 'outside', 'asthma']

for i, word in enumerate(valued_words):
    valued_words[i] = glove_vectors[word]

In [None]:
# transform to embeddings for each entry
train_embedded_X = create_embedded(train_X) 
test_embedded_X = create_embedded(test_X)

In [None]:
def calculate_min_distances(embedding, valued_words):
    output = []
    for i, entry in enumerate(embedding):
        curr_min_distance_vec = np.full(len(valued_words), float('inf'))

        # for each word
        for j, word in enumerate(embedding[i]):
            if word is not None:
                # loop through valued words
                for k, valued_word in enumerate(valued_words):
                    curr_distance = np.sum(np.square(valued_word - word))
                    if curr_distance < curr_min_distance_vec[k]:
                        curr_min_distance_vec[k] = curr_distance
        output.append(curr_min_distance_vec.copy())

    return output

In [None]:
train_min_distances = calculate_min_distances(train_embedded_X, valued_words)
test_min_distances = calculate_min_distances(test_embedded_X, valued_words)

In [None]:
distances_mnb = MultinomialNB()
distances_mnb.fit(train_min_distances, train_y)
pred_distances_mnb = distances_mnb.predict(test_min_distances)
print(accuracy_ratio(test_y, pred_distances_mnb))

In [None]:
distances_lr = LinearRegression()
distances_lr.fit(train_min_distances, train_y)
pred_distances_lr = np.rint(distances_lr.predict(test_min_distances))
print(accuracy_ratio(test_y, pred_distances_lr))

In [None]:
distances_svc = SVC()
distances_svc.fit(train_min_distances, train_y)
pred_distances_svc = distances_svc.predict(test_min_distances)
print(accuracy_ratio(test_y, pred_distances_svc))

In [None]:
# 3rd approach

In [None]:
# for each row
def calculate_avg_embedding(embedding):
    output = []
    for i, entry in enumerate(embedding):
        row_total = np.zeros(embedding[i][0].shape)
        for j, word in enumerate(embedding[i]):
            if word is not None:
                row_total = np.sum([embedding[i][j], row_total], axis=0)
        output.append(row_total)
    return output

In [None]:
train_avg_embed_X = calculate_avg_embedding(train_embedded_X)
test_avg_embed_X = calculate_avg_embedding(test_embedded_X)

In [None]:
avg_lr = LinearRegression()
avg_lr.fit(train_avg_embed_X, train_y)
pred_avg_lr = np.rint(avg_lr.predict(test_avg_embed_X))
print(accuracy_ratio(test_y, pred_avg_lr))

In [None]:
avg_svc = SVC()
avg_svc.fit(train_avg_embed_X, train_y)
pred_avg_svc = avg_svc.predict(test_avg_embed_X)
print(accuracy_ratio(test_y, pred_avg_svc))

In [None]:
# ensembling predictions

In [None]:
from collections import Counter
def get_majority(prediction_list, index):
    prediction = [pred[index] for pred in prediction_list]
    majority = Counter(prediction).most_common()
    return majority[0][0]

def predict_ensemble(prediction_list):
    ensemble_predictions = []
    for i, curr_pred in enumerate(prediction_list[0]):
        ensemble_predictions.append(get_majority(prediction_list, i))
    return ensemble_predictions

In [None]:
test_predictions = [pred_bow_mnb, pred_bow_lr, pred_bow_kn, pred_bow_svc, pred_distances_mnb, pred_distances_lr, pred_distances_svc, pred_avg_lr, pred_avg_svc]
ensemble_predictions = predict_ensemble(test_predictions)
ensemble_predictions = [round(x) for x in ensemble_predictions]
print(accuracy_ratio(test_y, ensemble_predictions))

In [None]:
# output code if creating submission
# output = pd.DataFrame(zip(list(df_test['scenario_id']), ensemble_predictions), columns=['scenario_id','danger_level'])
# output.to_csv('submission/Challenge1_submission.csv', index=False)