In [None]:
import pandas as pd
import numpy as np
from sklearn.utils import shuffle

# import word embedding packages
from nltk.tokenize import word_tokenize, RegexpTokenizer
import gensim
from gensim.models import Word2Vec
import gensim.downloader
import string
glove_vectors = gensim.downloader.load('glove-wiki-gigaword-50')

In [None]:
def accuracy_ratio(y, predictions):
    zipped = zip(y, predictions)
    total_points = 0
    for curr in zipped:
        if curr[0] == curr[1]:
            total_points += 2
        elif curr[0] == curr[1]+1 or curr[0] == curr[1]-1:
            total_points += 1
        else:
            total_points += 0

    return (total_points/(len(y)*2))

In [None]:
def view_predictions(X, y, pred):
    visualize = pd.DataFrame(list(zip(list(X), y, list(pred))))
    visualize['diff'] = list(np.subtract(y, pred))
    visualize = visualize.sort_values(by = 'diff')
    pd.set_option("display.max_rows", None)
    print(visualize.head())

In [None]:
## data loading
df_train = pd.read_csv('data/Challenge1_Training_Scenarios.csv')
df_train.set_index('scenario_id', inplace=True)

df_test = pd.read_csv('data/Challenge1_Test_Scenarios.csv')

In [None]:
# development, split training set
X = df['scenario']
y = df['danger_level']
X, y = shuffle(X, y)
train_X = list(X[:404])
train_y = list(y[:404])
test_X = list(X[404:])
test_y = list(y[404:])

# creating submission, full training set
#train_X = list(df_train['scenario'])
#train_y = list(df_train['danger_level'])

#test_X = list(df_test['scenario'])

In [None]:
# 1st approach

In [None]:
# create bows
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.corpus import stopwords

vect = TfidfVectorizer(stop_words=stopwords.words('english'), ngram_range=(1,3))
train_counts = vect.fit_transform(train_X)
test_counts = vect.transform(test_X)

In [None]:
from sklearn.naive_bayes import MultinomialNB
bow_mnb = MultinomialNB()
bow_mnb.fit(train_counts, train_y)

pred_bow_mnb = bow_mnb.predict(test_counts)
#print(accuracy_ratio(test_y, pred_bow_mnb))

In [None]:
from sklearn.linear_model import LogisticRegression
bow_lr = LogisticRegression()
bow_lr.fit(train_counts, train_y)

pred_bow_lr = np.rint(bow_lr.predict(test_counts))
#print(accuracy_ratio(test_y, pred_bow_lr))

In [None]:
from sklearn.neighbors import KNeighborsClassifier
bow_kn = KNeighborsClassifier()
bow_kn.fit(train_counts, train_y)

pred_bow_kn = bow_kn.predict(test_counts)
#print(accuracy_ratio(test_y, pred_bow_kn))

In [None]:
from sklearn.svm import SVC
bow_svc = SVC()
bow_svc.fit(train_counts, train_y)

pred_bow_svc = bow_svc.predict(test_counts)
#print(accuracy_ratio(test_y, pred_bow_svc))

In [None]:
#view_predictions(test_X, test_y, pred_bow_svc)

In [None]:
# 2nd approach

In [None]:
def tokenize_remove_punctuation(input):
    input = input.lower()
    input = word_tokenize(input)
    input = list(filter(lambda token: token not in string.punctuation, input))
    return input

In [None]:
# grab embeddings for valued words and format
valued_negative_words = ['elderly', 'mask', 'social', 'coronavirus', 'home', 'work', 'outside']

for i, word in enumerate(valued_negative_words):
    valued_negative_words[i] = glove_vectors[word]

In [None]:
# transform to embeddings for each entry
train_embedded_X = train_X.copy()
for i, entry in enumerate(train_embedded_X):
    train_embedded_X[i] = tokenize_remove_punctuation(train_embedded_X[i])

    if 'covid-19' in train_embedded_X[i]:
        train_embedded_X[i][train_embedded_X[i].index('covid-19')] = 'coronavirus'
    if 'covid' in train_embedded_X[i]:
        train_embedded_X[i][train_embedded_X[i].index('covid')] = 'coronavirus'

    for j, word in enumerate(train_embedded_X[i]):
        if train_embedded_X[i][j] in glove_vectors:
            train_embedded_X[i][j] = glove_vectors[train_embedded_X[i][j]]
        else:
            train_embedded_X[i][j] = None

            
test_embedded_X = test_X.copy()
for i, entry in enumerate(test_embedded_X):
    test_embedded_X[i] = tokenize_remove_punctuation(test_embedded_X[i])
    
    if 'covid-19' in test_embedded_X[i]:
        test_embedded_X[i][test_embedded_X[i].index('covid-19')] = 'coronavirus'
    if 'covid' in test_embedded_X[i]:
        test_embedded_X[i][test_embedded_X[i].index('covid')] = 'coronavirus'
    
    for j, word in enumerate(test_embedded_X[i]):
        if test_embedded_X[i][j] in glove_vectors:
            test_embedded_X[i][j] = glove_vectors[test_embedded_X[i][j]]
        else:
            test_embedded_X[i][j] = None

In [None]:
# vector of closest distances to each valued word approach
for i, entry in enumerate(train_embedded_X):
    curr_min_distance_vec = np.full(len(valued_negative_words), float('inf'))

    # for each word
    for j, word in enumerate(train_embedded_X[i]):
        if word is not None:
            # loop through valued words
            for k, valued_word in enumerate(valued_negative_words):
                curr_distance = np.sum(np.square(valued_word - word))
                if curr_distance < curr_min_distance_vec[k]:
                    curr_min_distance_vec[k] = curr_distance
    train_embedded_X[i] = curr_min_distance_vec.copy()

train_embedded_X = list(train_embedded_X)

for i, entry in enumerate(test_embedded_X):
    curr_min_distance_vec = np.full(len(valued_negative_words), float('inf'))

    # for each word
    for j, word in enumerate(test_embedded_X[i]):
        if word is not None:
            # loop through valued words
            for k, valued_word in enumerate(valued_negative_words):
                curr_distance = np.sum(np.square(valued_word - word))
                if curr_distance < curr_min_distance_vec[k]:
                    curr_min_distance_vec[k] = curr_distance
    test_embedded_X[i] = curr_min_distance_vec.copy()

test_embedded_X = list(test_embedded_X)

In [None]:
embedded_mnb = MultinomialNB()
embedded_mnb.fit(train_embedded_X, train_y)
pred_embedded_mnb = embedded_mnb.predict(test_embedded_X)
#print(accuracy_ratio(test_y, pred_embedded_mnb))

In [None]:
from sklearn.linear_model import LinearRegression
embedded_lr = LinearRegression()
embedded_lr.fit(train_embedded_X, train_y)
pred_embedded_lr = np.rint(embedded_lr.predict(test_embedded_X))
#print(accuracy_ratio(test_y, pred_embedded_lr))

In [None]:
embedded_svc = SVC()
embedded_svc.fit(train_embedded_X, train_y)
pred_embedded_svc = embedded_svc.predict(test_embedded_X)
#print(accuracy_ratio(test_y, pred_embedded_svc))

In [None]:
# ensembling predictions

In [None]:
from collections import Counter
def get_majority(prediction_list, index):
    prediction = [pred[index] for pred in prediction_list]
    majority = Counter(prediction).most_common()
    return majority[0][0]

def predict_ensemble(prediction_list):
    ensemble_predictions = []
    for i, curr_pred in enumerate(prediction_list[0]):
        ensemble_predictions.append(get_majority(prediction_list, i))
    return ensemble_predictions

In [None]:
test_predictions = [pred_bow_mnb, pred_bow_lr, pred_bow_kn, pred_bow_svc, pred_embedded_mnb, pred_embedded_lr, pred_embedded_svc]
ensemble_predictions = predict_ensemble(test_predictions)

In [None]:
# output code if creating submission
# output = pd.DataFrame(zip(list(df_test['scenario_id']), ensemble_predictions), columns=['scenario_id','danger_level'])
# output.to_csv('submission/Challenge1_submission.csv', index=False)