In [None]:
import torch
import torch.nn as nn
import numpy as np
from tokenizers import Tokenizer
from nltk.tokenize import regexp_tokenize
from preprocessing import cyrillize, pattern
from sklearn.decomposition import TruncatedSVD

from get_embeddings import get_noise_dampening_embedding, get_sub_word_tokenization_embedding

import json
import random
from math import floor

In [None]:
def classify_n_gram(n_gram_vector, examples_vectors) -> bool:
    for example_vector in examples_vectors:
        # if all items of the example_vector are in n_gram_vector
        if all(any(np.array_equal(row, example) for example in n_gram_vector) for row in example_vector):
            return True
    return False

def split_n_gram(comment, embedding, pad_vector, window_size=5):
    comment_vector = embedding(comment)
    m = len(comment_vector)
    if m == 0:
        return []
    if m < window_size:
        for _ in range(m, window_size):
            comment_vector = np.vstack([comment_vector, pad_vector])
        m = window_size

    i = 0
    n_grams = []
    while i + window_size <= m:
        n_gram = comment_vector[i:i+window_size]
        n_grams.append(n_gram)
        i += 1
    return n_grams

def split_and_classify_n_grams(comment_record, embedding, pad_vector, window_size=5):
    examples_vectors = [embedding(example) for example in comment_record['examples']] if comment_record['examples'] != None else []

    return [{
            'n_gram': n_gram,
            'label': 'p' if classify_n_gram(n_gram, examples_vectors) else 'n'
        }
        for n_gram in split_n_gram(comment_record['comment'], embedding, pad_vector, window_size)
    ]

def predict(model, embedding, comment):
    n_grams = split_n_gram(comment, embedding, embedding('[PAD]'))
    return any(model.predict(np.concatenate(n_gram).reshape(1, -1)).item() == 1 for n_gram in n_grams)

def test_model(model, embedding, testing_set):
    tp, fn, fp, tn = 0, 0, 0, 0
    for comment in testing_set:
        if predict(model, embedding, comment['comment']):
            if comment['label'] == 'p':
                tp += 1
            else:
                fp += 1
        else:
            if comment['label'] == 'p':
                fn += 1
            else:
                tn += 1
    return tp, fn, fp, tn

def print_test_model(tp, fn, fp, tn):
    precision = tp/(tp + fp) if tp + fp > 0 else 0
    recall = tp/(tp + fn) if tp + fn > 0 else 0
    Fscore = (2.0 * precision * recall) / (precision + recall) if precision + recall > 0 else 0
    print('Precision: '+str(precision))
    print('Recall: '+str(recall))
    print('F1-score: '+str(Fscore))
    print('Confusion Matrix:')
    print('{:15} {:>8} {:>8}'.format('', 'Predicted p', 'Predicted n'))
    print('{:15} {:>8.3f} {:>8.3f}'.format('Actual p', tp, fn))
    print('{:15} {:>8.3f} {:>8.3f}'.format('Actual n', fp, tn))
    return Fscore, precision, recall

def train_model(model, train_records, embedding, window_size, balanced_classes, p_n_rate=1.0):
    training_n_gram_set = [
        (np.concatenate(n['n_gram']), 0 if n['label'] == 'n' else 1)
        for comment in train_records
        for n in split_and_classify_n_grams(comment, embedding, embedding('[PAD]'), window_size)
        if len(n['n_gram']) > 0
    ]
    positive_train = [a for a in training_n_gram_set if a[1] == 1]
    negative_train = [a for a in training_n_gram_set if a[1] == 0]
    train_sampled_data = positive_train + negative_train
    if balanced_classes:
        train_sampled_data = negative_train
        train_sampled_data += random.choices(positive_train, k=floor(p_n_rate*len(positive_train)))

    train_x, train_y = [a[0] for a in train_sampled_data], [a[1] for a in train_sampled_data]
    model.fit(train_x, train_y)

def k_cross_validation(model, supervised_comments, embedding, window_size, k, balanced_classes: bool = False, p_n_rate = 1.0):
    n = len(supervised_comments)
    m = n//k
    t = n//m + n%m

    tps, fns, fps, tns = 0, 0, 0, 0

    for i in range(0, n, m):
        test_records = supervised_comments[i:i+m]
        train_records = supervised_comments[0:i] + supervised_comments[i+m:n]

        train_model(model, train_records, embedding, window_size, balanced_classes, p_n_rate)

        tp, fn, fp, tn = test_model(model, embedding, test_records)
        tps += tp
        fns += fn
        fps += fp
        tns += tn

    return tps/t, fns/t, fps/t, tns/t

Loading data

In [None]:
with open('data/blitz_comments.json', 'r', encoding="utf-8") as f:
    supervised_comments = [
        {
            'comment': cyrillize(d['comment']),
            'label': d['label'],
            'examples': d['examples'] if 'examples' in d else None
        }
        for d in json.load(f) if 'label' in d
    ]

In [None]:
def normalize(arr):
    norm = np.linalg.norm(arr)
    return arr/norm if norm != 0 else arr

In [None]:
sub_word_embedding = get_sub_word_tokenization_embedding(dim=100)
noise_dampening_embedding = get_noise_dampening_embedding(dim=100, device='cuda')

In [None]:
results = []

Gradient Boosting Classifier

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
clf = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0, max_depth=1, random_state=0)

In [None]:
print("Gradient boosted stumps with sub word embedding")
f, p, r = print_test_model(*k_cross_validation(clf, supervised_comments, sub_word_embedding, window_size=5, k=10, balanced_classes=False))
results.append({
    'Model': 'Gradient Boosting Stumps',
    'Embedding': 'Sub word',
    'Balanced classes': 'No',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

In [None]:
print("Gradient boosted stumps with ND embedding")
f, p, r = print_test_model(*k_cross_validation(clf, supervised_comments, noise_dampening_embedding, window_size=5, k=10, balanced_classes=False))
results.append({
    'Model': 'Gradient Boosting Stumps',
    'Embedding': 'Noise dampening',
    'Balanced classes': 'No',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

K-NN

In [None]:
from sklearn.neighbors import KNeighborsClassifier

In [None]:
best_i, best_f = 1, 0
for i in range(1, 8, 2):
    j = len(supervised_comments)//10
    test_records = supervised_comments[0:j]
    train_records = supervised_comments[j:]
    model = KNeighborsClassifier(i)
    train_model(model, train_records, sub_word_embedding, window_size=5, balanced_classes=False)

    f, _, _ = print_test_model(*test_model(model, sub_word_embedding, test_records))
    if f > best_f:
        best_f = f
        best_i = i

In [None]:
print("K-NN with sub word embedding")
f, p, r = print_test_model(*k_cross_validation(KNeighborsClassifier(best_i), supervised_comments, sub_word_embedding, window_size=5, k=10, balanced_classes=False))
results.append({
    'Model': 'k-NN k=' + str(best_i),
    'Embedding': 'Sub word',
    'Balanced classes': 'No',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

In [None]:
best_i, best_f = 1, 0.0
for i in range(1, 8, 2):
    j = len(supervised_comments)//10
    test_records = supervised_comments[0:j]
    train_records = supervised_comments[j:]
    model = KNeighborsClassifier(i)
    train_model(model, train_records, noise_dampening_embedding, window_size=5, balanced_classes=False)

    f, _, _ = print_test_model(*test_model(model, noise_dampening_embedding, test_records))
    if f > best_f:
        best_f = f
        best_i = i

In [None]:
print("K-NN with ND embedding")
f, p, r = print_test_model(*k_cross_validation(KNeighborsClassifier(best_i), supervised_comments, noise_dampening_embedding, window_size=5, k=10, balanced_classes=False))
results.append({
    'Model': 'k-NN k=' + str(best_i),
    'Embedding': 'Noise dampening',
    'Balanced classes': 'No',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

Multi layer perceptron

In [None]:
from sklearn.neural_network import MLPClassifier

In [None]:
print("MLP with sub word embedding")
f, p, r = print_test_model(*k_cross_validation(MLPClassifier((500, 500, 2), activation='relu'), supervised_comments, sub_word_embedding, window_size=5, k=10, balanced_classes=True, p_n_rate=1))
results.append({
    'Model': 'MLP(500, 500, 2)',
    'Embedding': 'Sub word',
    'Balanced classes': 'Yes',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

In [None]:
print("MLP with ND embedding")
f, p, r = print_test_model(*k_cross_validation(MLPClassifier((500, 500, 2), activation='relu'), supervised_comments, noise_dampening_embedding, window_size=5, k=10, balanced_classes=True, p_n_rate=1))
results.append({
    'Model': 'MLP(500, 500, 2)',
    'Embedding': 'Noise dampening',
    'Balanced classes': 'Yes',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

SVM

In [None]:
from sklearn import svm

In [None]:
print("SVC with sub word embedding")
f, p, r = print_test_model(*k_cross_validation(svm.SVC(), supervised_comments, sub_word_embedding, window_size=5, k=10, balanced_classes=True, p_n_rate=1))
results.append({
    'Model': 'SVM',
    'Embedding': 'Sub word',
    'Balanced classes': 'Yes',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

In [None]:
print("SVC with ND embedding")
f, p, r = print_test_model(*k_cross_validation(svm.SVC(), supervised_comments, noise_dampening_embedding, window_size=5, k=10, balanced_classes=True, p_n_rate=1))
results.append({
    'Model': 'SVM',
    'Embedding': 'Noise dampening',
    'Balanced classes': 'Yes',
    'Precision': p,
    'Recall': r,
    'F1 score': f
})

# Results

In [None]:
import pandas as pd

df = pd.DataFrame.from_records(results)
df