# Details of solution 1 implementation

Traditional Supervised Machine Learning Method

In [127]:
import numpy as np
import pandas as pd
import nltk
import string
from collections import Counter
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import copy
import time

import torch.nn as nn
import torch
from torch.utils.data import Dataset, DataLoader
from torch import Tensor

import spellchecker.spellchecker as spellchecker
import textstat

torch.manual_seed(42)

<torch._C.Generator at 0x74355c17b790>

# Torch Classes

## Binary Classifier

In [128]:
# Define Binary Classifier
class BinaryClassifier(nn.Module):
    def __init__(self, input_size: int):
        super(BinaryClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

## Text Dataset

In [129]:
class FeatureDataset(Dataset):
    def __init__(self, data: pd.DataFrame, transform = None):
        self.fv_size = (len(data.columns) - 1) // 2
        self.data = self.convert_to_tensors(data)
        self.transform = transform

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        return self.data[idx][0], self.data[idx][1]

    def convert_to_tensors(self, data: pd.DataFrame) -> list[list[Tensor]]:
        feature_vecs = torch.tensor(data.iloc[:, :-1].values, dtype=torch.float)
        labels = torch.tensor(data.iloc[:, -1].values, dtype=torch.float).unsqueeze(1)

        ret_list = []
        for i in range(len(data)):
            ret_list.append([feature_vecs[i], labels[i]])
        return ret_list

In [130]:
class TestFeatureDataset(Dataset):
    def __init__(self, data: pd.DataFrame, transform = None):
        self.fv_size = len(data.columns)
        self.data = self.convert_to_tensors(data)
        self.transform = transform

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx: int) -> torch.Tensor:
        return self.data[idx]

    def convert_to_tensors(self, data: pd.DataFrame) -> list[Tensor]:
        return torch.tensor(data.values, dtype=torch.float)
        

# Feature Vector Code

## Components

In [131]:
def get_punct_info(words: list[str], text: str, verbose: bool):
    all_puncts = string.punctuation
    simple_puncts = [",", ".", "!", "?", '"', "'"]

    punct_count = 0
    simple_punct_count = 0

    for word in words:
        if word in all_puncts:
            punct_count += 1
        if word in simple_puncts:
            simple_punct_count += 1

    complex_punct_count = punct_count - simple_punct_count

    punct_complexity = complex_punct_count / punct_count if punct_count != 0 else 0

    punct_dist = np.zeros(len(all_puncts))
    for char in all_puncts:
        punct_dist[all_puncts.index(char)] = text.count(char)

    punct_dist = np.zeros(len(all_puncts)) if sum(punct_dist) == 0 else punct_dist / sum(punct_dist)

    if verbose:
        print(f"Punctuation count: {punct_count}")
        print(f"Punctuation complexity: {punct_complexity}")
        print(f"Punctuation distribution: {punct_dist}")
    
    return punct_count, punct_complexity, punct_dist

In [132]:
def get_dt_nn(tagged, verbose: bool = False):
    det_count = 0
    nn_count = 0

    for word in tagged:
        if word[1] == "DET":
            det_count += 1
        if word[1] == "NOUN":
            nn_count += 1

    dt_nn_ratio = 0 if nn_count == 0 else det_count / nn_count

    if verbose:
        print(f"DT count: {det_count}")
        print(f"NN count: {nn_count}")
        print(f"DT:NN ratio: {dt_nn_ratio}")

    return dt_nn_ratio

In [133]:
def get_word_len_features(words: list[str], verbose: bool = False):
    word_lengths = [len(word) for word in words]
    
    word_len_range = max(word_lengths) - min(word_lengths)
    word_len_q3 = np.percentile(word_lengths, 75)

    if verbose:
        print(f"Word length range: {word_len_range}")
        print(f"Word length Q3: {word_len_q3}")

    return word_len_range, word_len_q3

In [134]:
def get_sent_len_features(sents: list[str], verbose: bool = False):
    sent_lengths = [len(nltk.word_tokenize(sent)) for sent in sents]
    
    sent_len_range = max(sent_lengths) - min(sent_lengths)
    sent_len_q3 = np.percentile(sent_lengths, 75)

    if verbose:
        print(f"Sentence length range: {sent_len_range}")
        print(f"Sentence length Q3: {sent_len_q3}")

    return sent_len_range, sent_len_q3

In [135]:
def check_caps(tagged, sents: list[str], verbose: bool = False):
    proper_nouns = [word for word in tagged if word[1] == "NNP"]
    correct_caps_prop_nouns = 0

    for word in proper_nouns:
        if word[0][0].isupper():
            correct_caps_prop_nouns += 1

    prop_noun_cap_ratio = 0 if len(proper_nouns) == 0 else correct_caps_prop_nouns / len(proper_nouns)

    correct_caps_sos = 0
    for sent in sents:
        if sent[0][0].isupper():
            correct_caps_sos = 0

    sos_cap_ratio = correct_caps_sos / len(sents)

    if verbose:
        print(f"Proper noun caps ratio: {prop_noun_cap_ratio}")
        print(f"SOS capitalisation ratio: {sos_cap_ratio}")

    return prop_noun_cap_ratio, sos_cap_ratio

In [136]:
def get_typo_stats(words: list[str], verbose: bool = False):
    spell = spellchecker.SpellChecker("en")
    misspelled = spell.unknown(words)
    typo_ratio = len(misspelled) / len(words)

    if verbose:
        print(f"Typo ratio: {typo_ratio}")

    return typo_ratio

In [137]:
def get_type_token(words: list[str], verbose: bool = False):
    type_token_ratio = len(set(words)) / len(words)

    if verbose:
        print(f"Type-Token Ratio: {type_token_ratio}")

    return type_token_ratio

In [138]:
def get_readability(text: str, verbose: bool = False):
    readability = textstat.textstat.flesch_kincaid_grade(text)

    if verbose:
        print(f"Reading grade: {reability}")

    return readability

## Main Feature Vector Code

In [139]:
def get_feature_vector(text: str, verbose: bool = False) -> np.ndarray:
    words = text.split()
    sents = nltk.sent_tokenize(text)
    tagged_uni = nltk.pos_tag(words, tagset="universal")
    tagged_reg = nltk.pos_tag(words)

    feat_vec = np.ndarray(0, dtype=np.float32)

    punc_count, punct_complexity, punct_dist = get_punct_info(words, text, verbose)

    dt_nn_ratio = get_dt_nn(tagged_uni, verbose)

    word_len_range, word_len_q3 = get_word_len_features(words, verbose)

    sent_len_range, sent_len_q3 = get_sent_len_features(sents, verbose)

    prop_noun_cap_ratio, sos_cap_ratio = check_caps(tagged_reg, sents, verbose)

    typo_ratio = get_typo_stats(words, verbose)

    type_token_ratio = get_type_token(words, verbose)

    readability = get_readability(text, verbose)
    
    feat_vec = np.append(feat_vec, punct_dist)
    feat_vec = np.append(feat_vec, punct_complexity)
    feat_vec = np.append(feat_vec, dt_nn_ratio)
    feat_vec = np.append(feat_vec, word_len_range)
    feat_vec = np.append(feat_vec, word_len_q3)
    feat_vec = np.append(feat_vec, sent_len_range)
    feat_vec = np.append(feat_vec, sent_len_q3)
    feat_vec = np.append(feat_vec, prop_noun_cap_ratio)
    feat_vec = np.append(feat_vec, sos_cap_ratio)
    feat_vec = np.append(feat_vec, typo_ratio)
    feat_vec = np.append(feat_vec, type_token_ratio)
    feat_vec = np.append(feat_vec, readability)

    return feat_vec.reshape(1, -1)

# Implementation

In [162]:
def create_feature_vecs(file_path, name):
    feat_vec_size = 43

    df = pd.read_csv(file_path)

    col_names = [f"fv_1_{i}" for i in range(feat_vec_size)] + [f"fv_2_{i}" for i in range(feat_vec_size)] + [f"fv_diff_{i}" for i in range(feat_vec_size)] + ["label"]

    feat_vec_df = pd.DataFrame(columns=col_names)

    for i in range(len(df)):
        if i % 500 == 0:
            print(f"\rProcessing pair {i}/{len(df)}", end="")

        label = df["label"]

        s1_fv = get_feature_vector(df["text_1"][i]).flatten()
        s2_fv = get_feature_vector(df["text_2"][i]).flatten()
        fv_diff = np.abs(s1_fv - s2_fv)

        row = np.concatenate((s1_fv, s2_fv, fv_diff, [label[i]]), axis=0)
        
        feat_vec_df = feat_vec_df._append(pd.DataFrame([row], columns=col_names), ignore_index=True)

    feat_vec_df.to_csv(f"{name}_feature_vectors.csv", index=False)

In [163]:
def create_feature_vecs_test(file_path, name):
    feat_vec_size = 43
    df = pd.read_csv(file_path)
    col_names = [f"fv_1_{i}" for i in range(feat_vec_size)] + [f"fv_2_{i}" for i in range(feat_vec_size)] + [f"fv_diff_{i}" for i in range(feat_vec_size)]

    feat_vec_df = pd.DataFrame(columns=col_names)

    for i in range(len(df)):
        if i % 500 == 0:
            print(f"\rProcessing pair {i}/{len(df)} (as test data)", end="")

        s1_fv = get_feature_vector(df["text_1"][i]).flatten()
        s2_fv = get_feature_vector(df["text_2"][i]).flatten()
        fv_diff = np.abs(s1_fv - s2_fv)
        
        row = np.concatenate((s1_fv, s2_fv, fv_diff), axis=0)

        feat_vec_df = feat_vec_df._append(pd.DataFrame([row], columns=col_names), ignore_index=True)

    feat_vec_df.to_csv(f"{name}_feature_vectors.csv", index=False)

In [142]:
def get_feat_dataset(df: pd.DataFrame, test: bool = False) -> FeatureDataset:
    if not test:
        return FeatureDataset(df)
    else:
        return TestFeatureDataset(df)

In [157]:
def train_classifier():
    print("Beginning Training")
    feat_vec_train_df = pd.read_csv("training_feature_vectors.csv")
    feat_vec_eval_df = pd.read_csv("eval_feature_vectors.csv")
    feat_vec_train_ds = get_feat_dataset(feat_vec_train_df)
    feat_vec_eval_ds = get_feat_dataset(feat_vec_eval_df)
    feat_vec_train_dl = DataLoader(feat_vec_train_ds, batch_size=16, shuffle=True)
    feat_vec_test_dl = DataLoader(feat_vec_eval_ds, batch_size=1, shuffle=False)
    
    feat_vec_size = feat_vec_train_df.shape[1] - 1
    model = BinaryClassifier(feat_vec_size)
    criterion = nn.BCEWithLogitsLoss()
    optimiser = torch.optim.Adam(model.parameters(), lr=0.00005)

    epochs = 200
    threshold = 0

    current_best = (None, 0.0, -1)
    
    for i in range(epochs):
        time_per_epoch = time.time()
        loss_list = []
        model.train()
        for fv, label in feat_vec_train_dl:
            optimiser.zero_grad()
            output = model(fv)
            loss = criterion(output, label)
            loss_list.append(loss.detach().numpy())
            loss.backward()
            optimiser.step()

        print(f"Mean Training Loss for Epoch {i}: {np.mean(loss_list)}")

        loss_list = []
        preds = []
        true_labels = []

        model.eval()
        with torch.no_grad():
            for fv, label in feat_vec_test_dl:
                output = model(fv)
                loss = criterion(output, label)
                loss_list.append(loss.detach().numpy())
                preds.append(1 if output[0] > threshold else 0)
                true_labels.append(label.detach().numpy())

        correct = 0
        TPs, FPs, FNs, TNs = 0, 0, 0, 0
        for j in range(len(true_labels)):
            if true_labels[j] == preds[j]:
                correct += 1
                if true_labels[j] == 1:
                    TPs += 1
                else:
                    TNs += 1
            else:
                if true_labels[j] == 1:
                    FNs += 1
                else:
                    FPs += 1

        accuracy = correct / len(true_labels)
        recall = 0 if TPs + FNs == 0 else TPs / (TPs + FNs)
        precision = 0 if TPs + FPs == 0 else TPs / (TPs + FPs)
        f1 = 0 if precision + recall == 0 else 2 * (precision * recall) / (precision + recall)
        
        print(f"Testing Stats for Epoch {i}")
        print(f"    Mean Loss: {np.mean(loss_list)}")
        print(f"    Accuracy: {accuracy}")
        print(f"    _____________________________")
        print(f"    |True\Pred|Positive|Negative|")
        print(f"    |---------|--------|--------|")
        print(f"    |Positive |TPs: {TPs}|FNs: {FNs}|")
        print(f"    |---------|--------|--------|")
        print(f"    |Negative |FPs: {FPs}|TNs: {TNs}|")
        print(f"    |---------|--------|--------|")
        print(f"    ")
        print(f"    Precision: {precision}")
        print(f"    Recall: {recall}")
        print(f"    F1 Score: {f1}")

        if accuracy > current_best[1]:
            current_best = (copy.deepcopy(model), accuracy, i)

        print(f"Time per epoch: {time.time() - time_per_epoch}")

    print(f"Got best performance at epoch {current_best[2]} with accuracy {current_best[1]}")
    
    return current_best[0]

In [158]:
def eval_classifier(model):
    feat_vec_df = pd.read_csv("eval_feature_vectors.csv")
    feat_vec_ds = get_feat_dataset(feat_vec_df)
    feat_vec_dl = DataLoader(feat_vec_ds, batch_size=1, shuffle=False)
    
    criterion = nn.BCEWithLogitsLoss()

    loss_list = []
    preds = []
    true_labels = []

    threshold = 0

    model.eval()

    with torch.no_grad():
        for fv, label in feat_vec_dl:
            output = model(fv)
            loss = criterion(output, label)
            loss_list.append(loss.detach().numpy())
            preds.append(1 if output[0] > threshold else 0)
            true_labels.append(label.detach().numpy())
    
    correct = 0
    TPs = 0
    FPs = 0
    TNs = 0
    FNs = 0

    for i in range(len(true_labels)):
        if true_labels[i] == preds[i]:
            correct += 1
            if true_labels[i] == 1:
                TPs += 1
            else:
                TNs += 1
        else:
            if true_labels[i] == 1:
                FNs += 1
            else:
                FPs += 1

    accuracy = correct / len(true_labels)
    recall = 0 if TPs + FNs == 0 else TPs / (TPs + FNs)
    precision = 0 if TPs + FPs == 0 else TPs / (TPs + FPs)
    f1 = 0 if precision + recall == 0 else 2 * (precision * recall) / (precision + recall)

    print(f"Testing Stats for Classifier")
    print(f"    Mean Loss: {np.mean(loss_list)}")
    print(f"    Accuracy: {accuracy}")
    print(f"    _____________________________")
    print(f"    |True\Pred|Positive|Negative|")
    print(f"    |---------|--------|--------|")
    print(f"    |Positive |TPs: {TPs}|FNs: {FNs}|")
    print(f"    |---------|--------|--------|")
    print(f"    |Negative |FPs: {FPs}|TNs: {TNs}|")
    print(f"    |---------|--------|--------|")
    print(f"    ")
    print(f"    Precision: {precision}")
    print(f"    Recall: {recall}")
    print(f"    F1 Score: {f1}")

    return (accuracy, recall, precision, f1)

In [159]:
def test_classifier(model, data_path):
    feat_vec_df = pd.read_csv(data_path)
    test_ds = get_feat_dataset(feat_vec_df, True)
    test_dl = DataLoader(test_ds, batch_size=1, shuffle=False)

    model.eval()
    threshold = 0

    preds = []
    
    with torch.no_grad():
        for X in test_dl:
            output = model(X)
            preds.append(1 if output[0] > threshold else 0)

    return preds

## Notebook "Engine"

In [169]:
# Inference Mode Switch
generate_new_feat_vecs = False
inference_mode = True
run_next = False

In [170]:
torch.manual_seed(42)
if run_next:
    if generate_new_feat_vecs:
        create_feature_vecs("Data/train.csv", "training")
        create_feature_vecs("Data/dev.csv", "eval")
        create_feature_vecs_test("Data/test.csv", "testing")
    if not inference_mode:
        train_time_start = time.time()
        classifier = train_classifier()
        print(f"Training time: {time.time() - train_time_start}")
        torch.save(classifier, "solution_1_classifier.pth")
        results = eval_classifier(classifier)
    else:
        classifier = torch.load("solution_1_classifier.pth", weights_only=False)
        eval_classifier(classifier)
        predictions = test_classifier(classifier, "testing_feature_vectors.csv")
        preds_df = pd.DataFrame(predictions, columns=["prediction"])
        preds_df.to_csv("Group_17_A.csv", index=False)

In [174]:
# Run for new data
new_path = "Data/AV_trial_test.csv"
create_feature_vecs_test(new_path, "new_data")
classifier = torch.load("solution_1_classifier.pth", weights_only=False)
predictions = test_classifier(classifier, "new_data_feature_vectors.csv")
preds_df = pd.DataFrame(predictions, columns=["prediction"])
preds_df

Processing pair 0/50 (as test data)

  feat_vec_df = feat_vec_df._append(pd.DataFrame([row], columns=col_names), ignore_index=True)


Unnamed: 0,prediction
0,1
1,1
2,1
3,0
4,0
5,1
6,0
7,0
8,0
9,0
