# Set Seed and CUDA

In [None]:
import torch
import torchtext
import gensim.downloader
import numpy as np
import torch
import torchtext
import tensorflow as tf
import nltk
import matplotlib.pyplot as plt
import os
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import SGDClassifier
from sklearn import metrics
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer
from collections import Counter
from collections import OrderedDict



SEED = 1234
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
WORD2VEC_VECTORS = gensim.downloader.load("word2vec-google-news-300")
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

stopwords = set(stopwords.words('english'))

print("PyTorch Version: ", torch.__version__)
print("torchtext Version: ", torchtext.__version__)
print(f"Using {'GPU' if str(DEVICE) == 'cuda' else 'CPU'}.")

# Dataset load and prep

In [None]:
from datasets import load_dataset

dataset = load_dataset("surrey-nlp/PLOD-CW")

training_set = dataset["train"]
print(len(training_set))
validation_set = dataset["validation"]
print(len(validation_set))
testing_set = dataset["test"]
print(len(testing_set))

X_raw = training_set["tokens"]
y_raw = training_set["ner_tags"]

X = [word for sublist in X_raw for word in sublist]
y = [label for sublist in y_raw for label in sublist]

X_validation_raw = [word for sublist in validation_set["tokens"] for word in sublist]
y_validation_raw  = [label for sublist in validation_set["ner_tags"] for label in sublist]

X_test_raw = [word for sublist in testing_set["tokens"] for word in sublist]
y_test_tags = [label for sublist in testing_set["ner_tags"] for label in sublist]

# Dataset Analysis

In [None]:
def analyse_dataset(dataset, folder, name):
    # Count the number of instances of each tag
    ner_tags = [tag for record in dataset["ner_tags"] for tag in record]
    ner_tag_freq = Counter(ner_tags)

    # Create pie chart for ner tag frequency
    plt.figure(figsize=(5, 4))
    plt.pie(ner_tag_freq.values(), labels=ner_tag_freq.keys(), autopct="%1.1f%%")
    plt.title("NER Distribution")
    plt.savefig(f"{folder}/{name}-ner-tag-distribution.png")
    plt.close()

    
    # Analysis of which pos tag each ner tag belongs to
    pos_tags = [tag for record in dataset["pos_tags"] for tag in record]
    tags_combined = zip(ner_tags, pos_tags)
    tags_freq = Counter(tags_combined)

    tags_freq_BAC = {}
    for item, counter in tags_freq.items():
        if item[0] == "B-LF" or item[0] == "I-LF":
            tags_freq_BAC[item[1]] = counter
    tags_freq_BAC_filtered = filter_by_threshold(tags_freq_BAC)
    
    plt.figure(figsize=(5, 4))
    plt.pie(tags_freq_BAC_filtered.values(), labels=tags_freq_BAC_filtered.keys(), autopct="%1.1f%%")
    plt.title("POS Tag Distribution for LF NER")
    plt.savefig(f"{folder}/{name}-LF-POS.png")
    plt.close()

    tags_freq_BOC = {}
    for item, counter in tags_freq.items():
        if item[0] == "B-O":
            tags_freq_BOC[item[1]] = counter

    tags_freq_BOC_filtered = OrderedDict(sorted(tags_freq_BOC.items(), key=lambda x: x[1], reverse=True))
    plt.figure(figsize=(8,6))
    plt.bar(tags_freq_BOC_filtered.keys(), tags_freq_BOC_filtered.values())
    plt.xticks(rotation=65)
    plt.title("Non-Abbreviation POS distribution")
    plt.savefig(f"{folder}/{name}-non-abbrv.png")
    plt.close()
    
def filter_by_threshold(dictionary, threshold_proportion=0.05):
    threshold = round((sum(dictionary.values()) * threshold_proportion), 0)
    print(f"Threshold: {threshold}")

    dict_filtered = {"other" : 0}
    for item, counter in dictionary.items():
        if counter >= threshold:
            dict_filtered[item] = counter
        else:
            dict_filtered["other"] += counter
    
    return dict_filtered
        
    
analyse_dataset(training_set, "training-plots", "training")
analyse_dataset(validation_set, "validation-plots", "validation")
analyse_dataset(testing_set, "testing-plots", "testing")

# Labels prep

In [None]:
label_list = ["B-O", "B-AC", "B-LF", "I-LF"]
labels_vocab = {
    "B-O": 0,
    "B-AC": 1,
    "B-LF": 2,
    "I-LF": 3,
}
print("Converting labels")
integer_labels_2d = np.array([labels_vocab[label] for label in y])
y_validation_integers = np.array([labels_vocab[label] for label in y_validation_raw])
y_test_integers = np.array([labels_vocab[label] for label in y_test_tags])

# Experiement 1: Preprocessing

In [None]:
def remove_stop_words(list, labels):
    toReturn = []
    toReturnLabels = []

    for i, word in enumerate(list):
        if (word not in stopwords):
            toReturn.append(word)
            toReturnLabels.append(labels[i])

    return toReturn, toReturnLabels

def stem(list):
    ps = PorterStemmer()
    return [ps.stem(word) for word in list]

def lower_case(list):
    return [word.lower() for word in list]

def lemmatize(list):
    l = WordNetLemmatizer()
    return [l.lemmatize(word) for word in list]
        
exp1 = {}

# Remove stop word
exp1["rsw"], y_labels_processed = remove_stop_words(X, integer_labels_2d)
# Stem
exp1["stem"] = stem(X)
# Stem, lower case
exp1["stem-lc"] = stem(lower_case(X))
# Remove stop word, stem, lower case
exp1["rsw-stem-lc"] = lower_case(stem(exp1["rsw"]))
# Remove stop word, lemmatize
exp1["rsw-lemmatize"] = lemmatize(exp1["rsw"])
# Remove stop word, lower case, lemmatize
exp1["rsw-lc-lemmatize"] = lemmatize(lower_case(exp1["rsw"]))

## Under/Oversampling

In [None]:
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

def get_sample(samplerType, X_in, y_in):
    if samplerType=="over":
        sampler = RandomOverSampler()
    elif samplerType=="under":
        sampler = RandomUnderSampler()
    else:
        return None
    X_sampled_2d, y_sampled_labels = sampler.fit_resample(np.array(X_in).reshape(-1, 1), np.array(y_in))

    if (type(X_sampled_2d[0])) == np.ndarray:
        X_sampled_2d = [item for sublist in X_sampled_2d for item in sublist]
    if (type(y_sampled_labels[0])) == np.ndarray:
        y_sampled_labels = [labels_vocab[label] for label in y_sampled_labels]

    return X_sampled_2d, y_sampled_labels

X_undersampled, y_undersampled = get_sample("under", X, y)
print(np.shape(X_undersampled))
X_oversampled, y_oversampled = get_sample("over", X, y)
print(np.shape(X_oversampled))

X_validation_undersampled, y_validation_undersampled = get_sample("under", X_validation_raw, y_validation_raw)
X_validation_oversampled, y_validation_oversampled = get_sample("over", X_validation_raw, y_validation_raw)

def analyse_sampled_set(set, name):
    # Create pie chart for ner tag frequency
    ner_tag_freq = Counter(set)
    plt.figure(figsize=(5, 4))
    plt.pie(ner_tag_freq.values(), labels=ner_tag_freq.keys(), autopct="%1.1f%%")
    plt.title(f"NER Distribution ({ner_tag_freq.total()})")
    plt.savefig(f"sampled/{name}-ner-tag-distribution.png")
    plt.close()

analyse_sampled_set(y_undersampled, "training_undersampled")
analyse_sampled_set(y_oversampled, "training_oversampled")

analyse_sampled_set(y_validation_undersampled, "validation_undersampled")
analyse_sampled_set(y_validation_oversampled, "validation_oversampled")

# Word2Vec

In [None]:
def text_to_word2vec(text):
    vectors = []
    for word in text:
        try:
            vector = WORD2VEC_VECTORS[word]
            vectors.append(vector)
        except:
            vectors.append(np.zeros(300,))
    return vectors

X_training = {}
X_training["word2vec"] = text_to_word2vec(X)


X_validation = {}
X_validation["word2vec"] = text_to_word2vec(X_validation_raw)


X_test_word2vec = [word for sublist in testing_set["tokens"] for word in sublist]
X_testing = {}
X_testing["word2vec"] = text_to_word2vec(X_test_raw)

## Pre-processing Training and Fitting

In [None]:
exp1_vec = {}
for exp in exp1:
    exp1_vec[exp] = text_to_word2vec(exp1[exp])
    
models = []
for i, test in enumerate(exp1_vec):
    prediction = None
    models.append(SGDClassifier(class_weight="balanced"))
    if "rsw" in test:
        models[i].fit(exp1_vec[test], y_labels_processed)
    else:
        models[i].fit(exp1_vec[test], integer_labels_2d)

    prediction = models[i].predict(X_validation["word2vec"])
    print(f"============ {test} ============\n{metrics.classification_report(y_validation_integers, prediction)}")
    

## Under/Oversampling Model

In [None]:
X_undersampled_vec = text_to_word2vec(X_undersampled)
X_oversampled_vec = text_to_word2vec(X_oversampled)

X_validation_undersampled_vec = text_to_word2vec(X_validation_undersampled) 
X_validation_oversampled_vec = text_to_word2vec(X_validation_oversampled)

In [None]:

model_undersampled = SGDClassifier()
model_undersampled.fit(X_undersampled_vec, y_undersampled)
model_undersampled_predictions = model_undersampled.predict(X_validation_undersampled_vec)
print(f"============ Undersampled ============\n{metrics.classification_report(y_validation_undersampled, model_undersampled_predictions)}")

model_oversampled = SGDClassifier()
model_oversampled.fit(X_oversampled_vec, y_oversampled)
model_oversampled_predictions = model_oversampled.predict(X_validation_oversampled_vec)
print(f"============ Oversampled ============\n{metrics.classification_report(y_validation_oversampled, model_oversampled_predictions)}")

model_balanced = SGDClassifier(class_weight="balanced")
model_balanced.fit(X_training["word2vec"], integer_labels_2d)
model_balanced_predictions = model_balanced.predict(X_validation["word2vec"])
print(f"============ \"balanced\" ============\n{metrics.classification_report(y_validation_integers, model_balanced_predictions)}")

In [None]:
X_processed = exp1["rsw-lemmatize"]
X_validation_preprocessed_raw, y_validation_processed = remove_stop_words(lemmatize(X_validation_raw), y_validation_raw)
y_validation_processed = [labels_vocab[label] for label in y_validation_processed]

X_training_preprocessed = {}
X_training_preprocessed["word2vec"] = text_to_word2vec(X_processed)

X_validation_preprocessed = {}
X_validation_preprocessed["word2vec"] = text_to_word2vec(X_validation_preprocessed_raw)

# Word Embeddings Experiment

# tf-idf

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

tfidf = TfidfVectorizer()

X_training_preprocessed["tf-idf"] = tfidf.fit_transform(X_processed)
X_validation_preprocessed["tf-idf"] = tfidf.transform(X_validation_preprocessed_raw)
X_testing["tf-idf"] = tfidf.transform(X_test_raw)

svm_tf_idf = SGDClassifier(class_weight="balanced")
svm_tf_idf.fit(X_training_preprocessed["tf-idf"], y_labels_processed)

svm_tf_idf_predictions = svm_tf_idf.predict(X_validation_preprocessed["tf-idf"])
print(metrics.classification_report(y_validation_processed, svm_tf_idf_predictions))

## GloVe

In [None]:
import gensim.downloader as api

glove_model = api.load("glove-wiki-gigaword-300")

In [None]:
def text_to_glove(text):
    vectors = []
    
    for word in text:
        try:
            vector = glove_model[word]
            vectors.append(vector)
        except:
            vectors.append(np.zeros(300,))

    return vectors

X_training_preprocessed["GloVe"] = text_to_glove(X_processed)
X_validation_preprocessed["GloVe"] = text_to_glove(X_validation_preprocessed_raw)
X_testing["GloVe"] = text_to_glove(X_test_raw)

# With preprocessing
print("Converting labels")
integer_labels_2d = np.array([labels_vocab[label] for label in y])

svm_glove = SGDClassifier(class_weight="balanced")
svm_glove.fit(X_training_preprocessed["GloVe"], y_labels_processed)

svm_glove_predictions = svm_glove.predict(X_validation_preprocessed["GloVe"])
print(metrics.classification_report(y_validation_processed, svm_glove_predictions))


## Word2Vec

In [None]:
print("Fitting SVM model")
svm_word2vec = SGDClassifier(class_weight="balanced")
svm_word2vec.fit(X_training_preprocessed["word2vec"], y_labels_processed)

svm_word2vec_predictions = svm_word2vec.predict(X_validation_preprocessed["word2vec"])
print(metrics.classification_report(y_validation_processed, svm_word2vec_predictions))

# Loss Functions

In [None]:
TRAINING_SET_X = X_training_preprocessed["word2vec"]
TRAINING_SET_y = y_labels_processed

In [None]:
svm_squared_hinge = SGDClassifier(loss="squared_hinge", class_weight="balanced")
svm_squared_hinge.fit(TRAINING_SET_X, TRAINING_SET_y)

In [None]:
svm_perceptron = SGDClassifier(loss="perceptron", class_weight="balanced")
svm_perceptron.fit(TRAINING_SET_X, TRAINING_SET_y)

In [None]:
svm_modified_huber = SGDClassifier(loss="modified_huber", class_weight="balanced")
svm_modified_huber.fit(TRAINING_SET_X, TRAINING_SET_y)

In [None]:
svm_epsilon_insensitive = SGDClassifier(loss="epsilon_insensitive", class_weight="balanced")
svm_epsilon_insensitive.fit(TRAINING_SET_X, TRAINING_SET_y)

In [None]:
svm_squared_epsilon_insensitive = SGDClassifier(loss="squared_epsilon_insensitive", class_weight="balanced")
svm_squared_epsilon_insensitive.fit(TRAINING_SET_X, TRAINING_SET_y)

In [None]:
VALIDATION_SET_X = X_validation_preprocessed["word2vec"]
VALIDATION_SET_y = y_validation_processed

y_pred = svm_word2vec.predict(VALIDATION_SET_X)
print(f"\n====== Hinge loss (default) ======\n{metrics.classification_report(VALIDATION_SET_y, y_pred)}")

svm_squared_hinge_predictions = svm_squared_hinge.predict(VALIDATION_SET_X)
print(f"\n====== Squared Hinge loss ======\n{metrics.classification_report(VALIDATION_SET_y, svm_squared_hinge_predictions)}")

svm_perceptron_predictions = svm_perceptron.predict(VALIDATION_SET_X)
print(f"\n====== Perceptron ======\n{metrics.classification_report(VALIDATION_SET_y, svm_perceptron_predictions)}")

svm_modified_huber_predictions = svm_modified_huber.predict(VALIDATION_SET_X)
print(f"\n====== Modified Huber ======\n{metrics.classification_report(VALIDATION_SET_y, svm_modified_huber_predictions)}")

svm_epsilon_insensitive_predictions = svm_epsilon_insensitive.predict(VALIDATION_SET_X)
print(f"\n====== Epsilon Insensitive ======\n{metrics.classification_report(VALIDATION_SET_y, svm_epsilon_insensitive_predictions)}")

svm_squared_epsilon_insensitive_predictions = svm_squared_epsilon_insensitive.predict(VALIDATION_SET_X)
print(f"\n====== Squared Epsilon Insensitive ======\n{metrics.classification_report(VALIDATION_SET_y, svm_squared_epsilon_insensitive_predictions)}")


# Hyperparameter tuning

In [None]:
import optuna
def objective(trial):
    alpha = trial.suggest_float('alpha', 1e-5, 1e5, log=True)
    eta0 = trial.suggest_float('eta0', 1e-4, 1e4, log=True)
    
    svm_model = SGDClassifier(loss="log_loss", alpha=alpha, learning_rate="adaptive", eta0=eta0)

    svm_model.fit(X_training_preprocessed["word2vec"], y_labels_processed)

    optim_pred = svm_model.predict(X_validation_preprocessed["word2vec"])
    optim_f1 = metrics.f1_score(y_validation_processed, optim_pred, average="macro")

    return optim_f1

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)

best_params = study.best_params

svm_optimised = SGDClassifier(loss="log_loss", alpha=best_params["alpha"], learning_rate="adaptive", eta0=best_params["eta0"])
svm_optimised.fit(X_training_preprocessed["word2vec"], y_labels_processed)

svm_optimized_predictions = svm_optimised.predict(X_validation_preprocessed["word2vec"])
print(metrics.classification_report(y_validation_processed, svm_optimized_predictions))


In [None]:
print(metrics.classification_report(y_validation_processed, svm_optimized_predictions))

In [None]:
trials = study.get_trials()
print(trials[0].report)

# Final Model

In [None]:
model_final = SGDClassifier(loss="squared_epsilon_insensitive", alpha=best_params["alpha"], learning_rate="adaptive", eta0=best_params["eta0"])

X_final, y_final = get_sample("over", exp1["rsw-lemmatize"], y_labels_processed)
X_final_vec = text_to_word2vec(X_final)

print(np.shape(X_final_vec))
model_final.fit(X_final_vec, y_final)

In [None]:
X_test_vec = text_to_word2vec(X_test_raw)
model_final_predictions = model_final.predict(X_test_vec)
print(metrics.classification_report(y_test_integers, model_final_predictions))