In [None]:
import math
import os
from types import FunctionType

import pandas as pd
import torch
import torchtext
from gensim.models import KeyedVectors

BASE_PATH: str = os.path.dirname(os.path.abspath(__file__))
CLASSIFIERS_PATH: str = BASE_PATH + "/classifiers/"
DATASET_PATH: str = BASE_PATH + "/datasets/"
EMBEDDINGS_PATH: str = BASE_PATH + "/embeddings/"
NUM_EMOTIONS: int = 28
EMBED_SIZE: int = 0

if not os.path.exists(CLASSIFIERS_PATH):
    raise FileNotFoundError("Could not find folder for classifier models.")
if not os.path.exists(DATASET_PATH):
    raise FileNotFoundError("Could not find folder with GoEmotion dataset.")
if not os.path.exists(EMBEDDINGS_PATH):
    raise FileNotFoundError("Could not find folder with word embeddings sets.")

if not torch.cuda.is_available():
    print("Warning: Using CPU for Pytorch.")
device: device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")


# First, let's define our basic BDRNN architecture
class BDRNN(torch.nn.Module):
    def __init__(self, vocab_word_count: int, vectors: torch.Tensor, output_size: int, num_layers: int, dropout: float,
                 *args: tuple[any],
                 **kwargs: dict[str, any]) -> None:
        super().__init__(*args, **kwargs)

        self.num_layers = num_layers if num_layers > 1 else 2
        self.hidden_size = NUM_EMOTIONS // num_layers

        self.embeddings = torch.nn.Embedding.from_pretrained(vectors, padding_idx=EMBED_SIZE)

        self.rnn_layers = torch.nn.RNN(input_size=vocab_word_count, hidden_size=self.hidden_size, num_layers=num_layers,
                                       bidirectional=True, dropout=dropout, batch_first=True)

        self.output_layer = torch.nn.Linear(self.hidden_size, output_size)

    def forward(self, input_data) -> torch.Tensor:
        embedded: torch.Tensor = self.embeddings(input_data)

        output: torch.Tensor
        hidden: torch.Tensor
        output, hidden = self.rnn_layers(embedded)

        return self.output_layer(hidden[-1, :])


class pandas_dataset(torch.utils.data.Dataset):
    def __init__(self, df: pd.DataFrame) -> None:
        self.df = df

    def __len__(self) -> int:
        return self.df.shape[0]

    def __getitem__(self, index: int) -> (str, str):
        return self.df["text"].iloc[index], self.df["emotion_ids"].iloc[index]


def get_vectors(embedding: str) -> tuple[dict[str, int], torch.Tensor]:
    skip_first_line: bool = False
    global EMBED_SIZE  # Sorry
    match embedding:
        case "glove":
            embedding_path: str = EMBEDDINGS_PATH + "glove.840B.300d.txt"
            EMBED_SIZE = 2196018
            embedding_components: int = 300
        case "word2vec":
            embedding_path: str = EMBEDDINGS_PATH + "GoogleNews-vectors-negative300.bin"
            gn_model = KeyedVectors.load_word2vec_format(embedding_path, binary=True)
            # Ignoring this for now, too lazy to deserialize
        case "numberbatch":
            embedding_path: str = EMBEDDINGS_PATH + "numberbatch-19.08-en.txt"
            EMBED_SIZE = 516782
            embedding_components: int = 300
            skip_first_line = True
        case default:
            raise RuntimeError("Invalid embedding chosen.")

    if not os.path.exists(embedding_path):
        raise FileNotFoundError("Could not find embedding file: {}".format(embedding_path))
    with (open(embedding_path, encoding="utf_8") as embeddings_file):
        word_labels: dict[str, int] = {}
        tensor: torch.Tensor = torch.empty((EMBED_SIZE + 1, embedding_components), dtype=torch.float32, device=device)
        if skip_first_line:
            _ = embeddings_file.readline()
        for index, embedding in enumerate(embeddings_file):
            embedding_split: list[str] = embedding.rstrip().split(" ")
            word_labels[embedding_split[0]] = index
            tensor[index] = torch.tensor([float(val) for val in embedding_split[1:]], dtype=torch.float32,
                                         device=device)
            if (index + 1) % 100000 == 0:
                print("Processed {}/{}".format(index + 1, EMBED_SIZE))
        tensor[-1] = torch.zeros(embedding_components, dtype=torch.float32, device=device)
        word_labels["<PAD>"] = EMBED_SIZE
        tensor.to(device)  # Unneeded?
        return word_labels, tensor


def tokenize(text: str, labels: dict, tokenizer: FunctionType) -> list[int]:
    return [labels[word] if word in labels.keys() else labels["something"] for word in tokenizer(text)]


def resolve_emotions(id: str) -> str:
    return [emotions[int(emotion)] for emotion in id.split(",")]


def train(model: BDRNN, batches, num_epochs: int):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
    criterion = torch.nn.CrossEntropyLoss().to(device)
    losses = []

    model.train()
    for epoch_num, epochs in enumerate(range(num_epochs)):
        correct: int = 0
        total: int = 0
        for num_batch, batch in enumerate(batches):
            for sentence, emotions in batch:

                optimizer.zero_grad()

                predictions = model(sentence)
                # Rounding is naive, we should base this off a confidence threshold
                guesses = torch.round(torch.sigmoid(predictions))
                if torch.equal(guesses, emotions): correct += 1
                total += 1

                loss = criterion(predictions, emotions)
                losses.append(float(loss))

                loss.backward()

                optimizer.step()
        print("Epoch: {} | Loss: {} | Accuracy: {}%".format(epoch_num + 1, sum(losses) / len(losses), (correct /
                                                                                                      total) * 100))


def collate(batch: list[tuple[list[int], list[str]]]) -> list[tuple[torch.IntTensor, torch.Tensor]]:
    final_batch = []
    max_tokens = len(max(batch, key=lambda tuple: len(tuple[0]))[0])
    for sentence, emotions in batch:
        sentence.extend([EMBED_SIZE] * (max_tokens - len(sentence)))
        sentence = torch.IntTensor([int(value) for value in sentence]).to(device)
        # There's definitely a way to do a list comprehension here but I'm too stupid to figure it out
        _emotions = torch.zeros(NUM_EMOTIONS, dtype=torch.float32, device=device)
        emotions = emotions.split(",")
        for emotion in emotions:
            _emotions[int(emotion)] = 1.0
        final_batch.append((sentence, _emotions))
    return final_batch  # Can we modify in-place instead?

def main():
    # Now we need to handle our dataset
    with open(DATASET_PATH + "emotions.txt") as emotions_file:
        emotions = [emotion.strip() for emotion in emotions_file]
    if len(emotions) != NUM_EMOTIONS or emotions[4] != "approval":
        raise RuntimeError("Failed to load emotion mappings.")

    training_set = pd.read_csv(DATASET_PATH + "train.tsv", delimiter="\t", names=["text", "emotion_ids"],
                               usecols=[0, 1])
    testing_set = pd.read_csv(DATASET_PATH + "test.tsv", delimiter="\t", usecols=[0, 1])
    print(training_set.head())
    print(testing_set.head())

    max_words: int = max(training_set["text"].map(len).max(), testing_set["text"].map(len).max())
    input_dim: int = 2 ** math.ceil(math.log2(max_words)) if max_words >= 2 else 2

    # Time to do some training!
    labels, vectors = get_vectors("numberbatch")
    tokenizer = torchtext.data.utils.get_tokenizer("basic_english")
    training_set["text"] = training_set["text"].apply(tokenize, labels=labels, tokenizer=tokenizer)
    testing_set["text"] = testing_set["text"].apply(tokenize, labels=labels, tokenizer=tokenizer)
    print(training_set.head())
    print(testing_set.head())
    numberbatch_model = BDRNN(vectors.shape[1], vectors, NUM_EMOTIONS, 4, 0.5).to(device)
    train_dataset = pandas_dataset(training_set)
    test_dataset = pandas_dataset(testing_set)
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=True, collate_fn=collate)
    print('Created `training dataloader` with %d batches!' % len(train_dataloader))
    print('Created `testing dataloader` with %d batches!' % len(test_dataloader))
    train(numberbatch_model, train_dataloader, 10)

if __name__ == '__main__':
    main()

In [None]:
# Download link for word2vec: https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing
# Download link for Glove: https://huggingface.co/stanfordnlp/glove/resolve/main/glove.840B.300d.zip

import matplotlib.pyplot as plt
import numpy as np
from numpy import ndarray
from sklearn.manifold import TSNE
from torch import Tensor, cat
from torch.cuda import is_available as cuda_is_available
from train import get_vectors


# EDITABLE VARIABLES
embeddings = ["numberbatch", "glove"] # Embeddings definition. Add word2vec once deserialization is done.

# Word comparison groups. Format: [base_word, similar_word_1, similar_word_2]
word_comparison_groups = [
    ["tire", "tired", "tyre"],
]

# Add a new distance function here if you want.
def calculate_distances(base_word: ndarray[float], similar_word_1: ndarray[float], similar_word_2: ndarray[float]) -> dict[str, list[float]]:
    return {
        "euclidean": [euclidean_distance(similar_word_1, base_word), euclidean_distance(similar_word_2, base_word)],
        "cosine": [cosine_similarity(similar_word_1, base_word), cosine_similarity(similar_word_2, base_word)],
        "manhattan": [manhattan_distance(similar_word_1, base_word), manhattan_distance(similar_word_2, base_word)],
    }


# We should look at comparing vectors in different embeddings and see how well ambigious words center around common
# synonyms for each meaning. We could probably do some sort of visualization for this as well.

def euclidean_distance(vector1: Tensor, vector2: Tensor) -> float:
    return np.linalg.norm(vector1 - vector2)

def cosine_similarity(vector1: Tensor, vector2: Tensor) -> float:
    return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))

def manhattan_distance(v1, v2):
    return np.sum(np.abs(v1 - v2))

def convert_tensors_to_numpy(embeddings_list: list[Tensor]) -> ndarray[float]:
    if cuda_is_available():
        numpy_vectors = np.array([vector.cpu().numpy() for vector in embeddings_list])
    else:
        numpy_vectors = np.array([vector.numpy() for vector in embeddings_list])
    return numpy_vectors

def reduce_embeddings(numpy_vectors: ndarray[float]) -> ndarray[float]:
    tsne = TSNE(n_components=3, random_state=0, perplexity=2, init='pca', n_iter=6000)
    return tsne.fit_transform(numpy_vectors)

def get_word_vectors_to_compare(numpy_vectors: ndarray[float]) -> tuple[ndarray[float], ndarray[float], ndarray[float]]:
    return numpy_vectors[0], numpy_vectors[1], numpy_vectors[2]

def show_plot_comparison(reduced_vectors: ndarray[float], word_list: list[str], norm_distances: dict[str, list[float]]):
    for distance_type, calculated_distances in norm_distances.items():
        fig = plt.figure(figsize=(10, 10))
        ax = fig.add_subplot(projection='3d')
        ax.scatter(reduced_vectors[0][0], reduced_vectors[0][1], reduced_vectors[0][2], c='b', label=word_list[0])
        ax.scatter(reduced_vectors[1][0], reduced_vectors[1][1], reduced_vectors[1][2], c='g', label=word_list[1])
        ax.scatter(reduced_vectors[2][0], reduced_vectors[2][1], reduced_vectors[2][2], c='r', label=word_list[2])
        ax.legend()
        plt.show()
        print("{} distance between 'tire' and 'tired' is {}\nDistance between 'tire' and 'tyre' is {}".format(
            distance_type, calculated_distances[0], calculated_distances[1]))

def get_distances(embeddings_list: list[Tensor]) -> dict[str, list[float]]:
    numpy_vectors = convert_tensors_to_numpy(embeddings_list)
    base_word, similar_word1, similar_word2 = get_word_vectors_to_compare(numpy_vectors)
    return calculate_distances(base_word, similar_word1, similar_word2)

def get_reduced_vectors(embeddings_list: list[Tensor]) -> ndarray[float, float]:
        numpy_vectors = convert_tensors_to_numpy(embeddings_list)
        return reduce_embeddings(numpy_vectors)

def visualize_embedding(embeddings_list: list[Tensor], norm_distances: dict[str, list[float]], word_list: list[str]) -> dict[str, list[float]]:
    reduced_vectors: ndarray[float, float] = get_reduced_vectors(embeddings_list)
    show_plot_comparison(reduced_vectors, word_list, norm_distances)

def get_embeddings(embeddings: list[str]) -> dict[str, (dict[str, int], Tensor)]:
    if len(embeddings) == 0:
        raise ValueError("No embeddings were selected to load.")
    
    result = {}
    for embedding in embeddings:
        vocab, vectors = get_vectors(embedding)
        result[embedding] = (vocab, vectors)
    return result

# Bc comparison groups added in sequence, we can query word_comparison_groups by index from where the group appears in the result. 
# (result[embedding_name][0] for the first word group)
def get_comparison_embeddings(embeddings: dict[str, (dict[str, int], Tensor)]) -> dict[str, list[list[Tensor]]]:
    # Dictionary of embedding name to list of comparison groups
    result: dict[str, list[list[Tensor]]] = {}
    for embed_name, (vocab, vectors) in embeddings.items():
        comparisons = []
        if embed_name not in result:
            result[embed_name] = []
        for idx, group in enumerate(word_comparison_groups):
            comparisons = []
            for word in group:
                comparisons.append(vectors[vocab[word]])
            result[embed_name].append(comparisons)
    return result

"""Returns a dictionary of embedding names to a list of dictionaries of distance types to their calculated distances"""
def compare_embeddings(comparison_embeddings: dict[str, list[list[Tensor]]]) -> dict[str, list[dict[str, list[float]]]]:
    result = {}
    for embedding, word_groups in comparison_embeddings.items():
        for idx, group_vectors in enumerate(word_groups):
            if result.get(embedding) is None:
                result[embedding] = []
            distances: dict[str, list[float]] = get_distances(group_vectors)
            result[embedding].append(distances)
    return result

def compare_distances(embedding_distances: dict[str, list[dict[str, list[float]]]]):
    """Returns a dictionary of embedding names to a dictionary of distance types to their calculated distance ratios."""
    def calculate_distance_ratios():
        # Dict of embedding name to distance type and all of that distance type's calculated distance ratios
        ratios: dict[str, dict[str, list[float]]] = {}
        # For every distance type for embeddings
        for embedding_name, distances in embedding_distances.items():
            ratios[embedding_name] = {}
            for distance in distances:
                for distance_type, values in distance.items():
                    if ratios[embedding_name].get(distance_type) is None:
                        ratios[embedding_name][distance_type] = []
                    ratio = (max(values[0], values[1]) / min(values[0], values[1]))
                    ratios[embedding_name][distance_type].append(ratio)
        return ratios

    """
    Plots boxplots of the distance ratios for each embedding and distance type.
    Args: ratios: dict[str, dict[str, list[float]]] - Dictionary of embedding names to a dictionary of distance types to a list of all their calculated distance ratios.
    
    Ex:
    ratios = {
        "glove": {
            "euclidean": [1.0, 2.55, 1.380, 4.44, 4.4, 4983],
            "cosine": [1.0, 2.55, 1.380, 4.44, 4.4, 4983],
            ...
        },
        "numberbatch": {
            "euclidean": [1.0, 2.55, 1.380, 4.44, 4.4, 4983],
            "cosine": [1.0, 2.55, 1.380, 4.44, 4.4, 4983],
            ...        
        }
    }
    """
    def show_boxplots(ratios: dict[str, dict[str, list[float]]]):
        distances: list[dict[str, list[float]]] = list(ratios.values())
        # Get all the values per distance type for every embedding
        to_plot = {}
        for i in range(0, len(distances)):
            for embedding_name, distances2 in ratios.items():
                for distance_type, values in distances2.items():
                    print(embedding_name, distance_type, values)
                    if to_plot.get(distance_type) is None:
                        to_plot[distance_type] = {}
                    to_plot[distance_type][embedding_name] = values
                    
        for distance_type, embedding_data in to_plot.items():
            fig, axs = plt.subplots(figsize=(10, 8))
            boxplots_data = []
            labels = []
            for embedding_name, values in embedding_data.items():
                boxplots_data.append(values)
                labels.append(embedding_name)
            axs.boxplot(boxplots_data)
            axs.set_xticklabels(labels)
            axs.set_title(distance_type)
            plt.tight_layout()
            plt.show()
    
    # Dict of embedding name to distance type and its calculated distance ratio
    ratios = calculate_distance_ratios()
    show_boxplots(ratios)

orig_embeddings = get_embeddings(embeddings)
comp_embeddings = get_comparison_embeddings(orig_embeddings)
comp_distances = compare_embeddings(comp_embeddings)
compare_distances(comp_distances)
