In [3]:
import os
import torch
import re
import pandas as pd
import numpy as np
import typing
from torch import nn;
from torch.utils.data import DataLoader, TensorDataset
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator, Vocab

DATA_DIR = './data/'
MIN_WORD_FREQUENCY = 100
FORCE_RETRAIN = False

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

Using device: cuda


# Tokenize datasets

In [4]:
TOKENIZER = get_tokenizer('basic_english')

def read_lines(
    dataset: str
) -> list[str]:
    """
    Reads all the lines form all the texts in the given `dataset`.

    Datasets are `train`, `val` and `test`.
    """

    # Scan for all input files
    inDirectoryName = os.path.join(DATA_DIR, 'input', dataset)
    inFileNames = [os.path.join(inDirectoryName, f) for f in os.listdir(inDirectoryName)]

    # Read all the lines from all the files
    lines = []
    for inFileName in inFileNames:
        with open(inFileName, 'r') as file:
            lines += file.readlines()

    print(f"Read {len(lines)} lines from {dataset}")
    return lines

def create_tokens(
    dataset: str
) -> list[str]:
    """
    Creates tokens for all the words in the given `dataset`.

    Datasets are `train`, `val` and `test`.
    """

    outFileName = os.path.join(DATA_DIR, f'words.{dataset}.pt')
    
    # If the file exists, don't create it again.
    if os.path.isfile(outFileName):
        print(f"Loaded tokenized words for {dataset} ({outFileName})")
        return torch.load(outFileName)

    tokens = []
    for line in read_lines(dataset):
        tokens += TOKENIZER(line)

    # Save tokens so we dont have to do this again
    torch.save(tokens, outFileName)
    
    return tokens

def create_vocabulary(
    dataset: str
) -> Vocab:
    """
    Creates a vocabulary for the given `dataset`.

    Datasets are `train`, `val` and `test`.
    """

    outFileName = os.path.join(DATA_DIR, f'vocabulary.pt')

    # If the file exists, don't create it again.
    if os.path.isfile(outFileName):
        print(f"Loaded vocabulary for {dataset} ({outFileName})")
        return torch.load(outFileName)

    def read_sanitize_tokenize():

        for line in read_lines(dataset):

            line = re.sub('\\w*[0-9]+\\w*', ' ', line) # Remove numbers
            line = re.sub('\\w*[A-Z]+\\w*', ' ', line) # Remove uppercase names
            line = re.sub('\\s+', ' ', line) # Remove double spaces

            yield TOKENIZER(line)

    vocabulary = build_vocab_from_iterator(read_sanitize_tokenize(), min_freq=MIN_WORD_FREQUENCY, specials=['<unk>'])

    vocabulary.set_default_index(vocabulary['<unk>'])

    # We removed all uppercase names, this includes 'I'
    vocabulary.append_token('i') 

    # Save vocabulary so we dont have to do this again
    torch.save(vocabulary, outFileName)

    return vocabulary
    


In [5]:
words_train = create_tokens('train')
words_val = create_tokens('val')
words_test = create_tokens('test')

vocabulary = create_vocabulary('train')
VOCABULARY_SIZE = len(vocabulary)

Loaded tokenized words for train (./data/words.train.pt)
Loaded tokenized words for val (./data/words.val.pt)
Loaded tokenized words for test (./data/words.test.pt)
Loaded vocabulary for train (./data/vocabulary.pt)


In [6]:

print("Words in 'train' dataset ........:", len(words_train))
print("Words in 'val' dataset ..........:", len(words_val))
print("Words in 'test' dataset .........:", len(words_test))
print("Distinct words in 'train' dataset:", len(set(words_train)))
print("Words in vocabulary .............:", VOCABULARY_SIZE)

Words in 'train' dataset ........: 2684706
Words in 'val' dataset ..........: 49526
Words in 'test' dataset .........: 124152
Distinct words in 'train' dataset: 52105
Words in vocabulary .............: 1880


# Utilities
This section contains som utilites which come in handy for all the next assignments.

In [32]:
def model_nameof(
    model: nn.Module, 
    criterion: object, 
    optimizer: torch.optim.Optimizer
) -> str:
    """
    Creates a good name for the model.
    """

    name = f'{model.__class__.__name__}_{criterion.__class__.__name__}_{optimizer.__class__.__name__}'
    options = optimizer.param_groups[0]

    if 'lr' in options:
        name += f'-lr{options["lr"]:.3f}'

    if 'momentum' in options and options['momentum'] != 0.0:
        name += f'-m{options["momentum"]:.3f}'

    if 'weight_decay' in options and options['weight_decay'] != 0.0:
        name += f'-wd{options["weight_decay"]:.3f}'

    return name

def model_save(model: nn.Module, folder: str | None = None):
    """
    Save the given model to a file.
    """

    folder = '' if folder is None else folder + '/'
    filename = DATA_DIR + f'{folder}{model.name}.pt'

    torch.save(model.state_dict(), filename)
    print(f'Saved {model.name} ({filename})')

def model_load(model: nn.Module, folder: str | None = None) -> bool:
    """
    Save the given model to a file.

    Returns `True` if the model was loaded, `False` otherwise.
    """

    folder = '' if folder is None else folder + '/'
    filename = DATA_DIR + f'{folder}{model.name}.pt'

    if not os.path.exists(filename):
        return False
    
    model.load_state_dict(torch.load(filename))
    print(f'Loaded {model.name} ({filename}')
    return True

def dataset_create(
    words: list[str],
    context_size: int,
    vocabulary_index_to_target: dict[int, int] = {},
    dataset_name: str | None = None
) -> TensorDataset:
    """
    Creates a dataset from the given words.
    """

    filename = DATA_DIR + f'dataset/{dataset_name}.pt'
    if os.path.exists(filename) and dataset_name is not None and not FORCE_RETRAIN:
        return torch.load(filename)

    word_idx = [vocabulary[word] for word in words]

    contexts = []
    targets = []
    for i in range(len(words) - context_size):
        context = word_idx[i:i+context_size]
        target = word_idx[i+context_size]
        target = vocabulary_index_to_target.get(target, target)

        contexts.append(torch.tensor(context))
        targets.append(target)

    contexts = torch.stack(contexts).to(device)
    targets = torch.tensor(targets).to(device)

    dataset = TensorDataset(contexts, targets)
    torch.save(dataset, filename)

    return dataset


def model_train(
    model: nn.Module,
    dataset: TensorDataset,
    criterion: typing.Callable[[torch.Tensor, torch.Tensor], torch.Tensor],
    optimizer: torch.optim.Optimizer,
    batch_size: int,
    epochs: int,
    tranform_targets: typing.Callable[[torch.Tensor], torch.Tensor] = lambda x: x,
    tranform_contexts: typing.Callable[[torch.Tensor], torch.Tensor] = lambda x: x,
    model_category: str | None = None
):
    """
    Trains the given `model` with the given `dataset`.  

    dataset: The dataset to train the model with.
    model: The model to train.
    criterion: The loss function to use.
    optimizer: The optimizer to use.
    batch_size: The batch size to use.
    epochs: The number of epochs to train.
    force_retrain: If `True`, the model will be trained even if it has been trained before.
    tranform_targets: A function to transform the targets before they are passed to `criterion` along with the `model` output.
    tranform_contexts: A function to transform the contexts before they are passed to `model`.
    model_category: The category of the model. If `None`, the model's class name will be used.
    """
    criterion.to(device)
    model.to(device)
    model.train()

    # Use the model's class name as the category if none is given
    if model_category is None:
        model_category = model.__class__.__name__

    # Name the model for easier referencing
    model.name = model_nameof(model, criterion, optimizer)

    # If the model has already been trained,
    # and we are not forcing a retrain:
    #     Load the trained model and return
    if model_load(model, model_category) and not FORCE_RETRAIN:
        return
    
    # Prepare a data loader for the given dataset.
    # Ensure the data is shuffled.
    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    print(f'Training {model.name}...')

    losses = []

    for epoch in range(epochs):
        total_loss = torch.tensor([0.0]).to(device)
        total_size = 0

        for contexts, targets in data_loader:

            # Perform transformations
            contexts = tranform_contexts(contexts).to(device)
            targets = tranform_targets(targets).to(device)

            # Perform a training step
            optimizer.zero_grad()
            outputs = model(contexts).to(device)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            total_loss += loss
            total_size += len(targets)

        total_loss = total_loss.item() / total_size
        losses.append(total_loss)
        print(f'Training | {model.name} | Epoch {epoch} | Loss {total_loss}')

    # Save the model so we can skip training every time.
    model_save(model, model_category)


def model_accuracy(
    model: nn.Module,
    dataset: TensorDataset,
    dataset_name: str = 'Validation',
    transform_contexts: typing.Callable[[torch.Tensor], torch.Tensor] = lambda x: x,
    transform_outputs: typing.Callable[[torch.Tensor], torch.Tensor] = lambda x: x
):
    """
    Evaluate the given model on the given dataset.

    Returns the accuracy of the model.
    """

    model.to(device)
    model.eval()

    data_loader = DataLoader(dataset, shuffle=False)

    correct = 0
    total = 0

    for contexts, targets in data_loader:
        contexts = transform_contexts(contexts).to(device)
        outputs = model(contexts)
        outputs = transform_outputs(outputs)

        total += targets.size(0)
        correct += (outputs == targets).sum().item()

    print(f'{dataset_name} | {model.name} | Accuracy {correct/total:.4f}')

    return correct / total

def model_pick_best(
    models: list[nn.Module],
    dataset: TensorDataset,
    performance_measure: typing.Callable[[nn.Module, TensorDataset], float],
):
    """
    Pick the best model from the given list of `models` on a given `dataset`.
    """

    best_model = None
    best_accuracy = 0.0

    for model in models:
        accuracy = performance_measure(model, dataset)

        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_model = model

    return best_model, best_accuracy

# Word embeddings
This section contains the training and selecting of the best performing embeddings using `CBOW`.

In [12]:
EMBEDDINGS_DIM = 32
EMBEDDINGS_CONTEXT_SIZE = 5
EMBEDDINGS_BATCH_SIZE = 128
EMBEDDINGS_EPOCHS = 100

class CBOW(nn.Module):
    def __init__(self):
        super(CBOW, self).__init__()
        self.embeddings = nn.Embedding(VOCABULARY_SIZE, EMBEDDINGS_DIM, sparse=True)
        self.linear = nn.Linear(EMBEDDINGS_DIM*EMBEDDINGS_CONTEXT_SIZE, VOCABULARY_SIZE)

    def forward(self, x):
        x = self.embeddings(x)
        x = x.view(x.size(0), -1)
        x = self.linear(x)
        x = torch.log_softmax(x, dim=1)
        return x

def cbow_create_dataset(
    words: list[str],
    dataset_name: str
):
    """
    Creates a dataset from the given words.
    """
    return dataset_create(words, EMBEDDINGS_CONTEXT_SIZE, dataset_name='cbow.' + dataset_name)

def cbow_train(
    dataset: TensorDataset,
    model: CBOW,
    criterion: object,
    optimizer: torch.optim.Optimizer,
):
    return model_train(
        model, dataset, criterion, optimizer, 
        model_category='embeddings',
        epochs=EMBEDDINGS_EPOCHS, batch_size=EMBEDDINGS_BATCH_SIZE,
        tranform_targets=lambda x: torch.nn.functional.one_hot(x, num_classes=VOCABULARY_SIZE).float()
    )

def cbow_performance(
    model: CBOW,
    dataset: TensorDataset,
    dataset_name: str = 'Validation'
):
    return model_accuracy(
        model, dataset, dataset_name,
        transform_outputs=lambda x: torch.argmax(x, dim=1)
    )

def cbow_create_embeddings() -> torch.Tensor:
    """
    Create multiple embeddings models and pick the best one.  

    Returns the embeddings of the best model.
    """
    training_data = cbow_create_dataset(words_train, 'train')

    m1 = CBOW()
    cbow_train(
        training_data, m1,
        nn.CrossEntropyLoss(),
        torch.optim.SGD(m1.parameters(), lr=0.02)
    )

    m2 = CBOW()
    cbow_train(
        training_data, m2,
        nn.CrossEntropyLoss(),
        torch.optim.SGD(m2.parameters(), lr=0.01)
    )
    
    m3 = CBOW()
    cbow_train(
        training_data, m3,
        nn.CrossEntropyLoss(),
        torch.optim.SGD(m3.parameters(), lr=0.001)
    )

    models = [m1, m2, m3]

    validation_data = cbow_create_dataset(words_val, 'val')
    best_model, best_model_accuracy = model_pick_best(
        models,
        dataset = validation_data,
        performance_measure=cbow_performance
    )

    print(f'Best model on validation: {best_model.name} | Accuracy {best_model_accuracy}')

    return best_model.embeddings.weight.detach().to(device)

In [13]:
embeddings = cbow_create_embeddings()

Loaded CBOW_CrossEntropyLoss_SGD-lr0.020 (./data/embeddings/CBOW_CrossEntropyLoss_SGD-lr0.020.pt
Loaded CBOW_CrossEntropyLoss_SGD-lr0.010 (./data/embeddings/CBOW_CrossEntropyLoss_SGD-lr0.010.pt
Loaded CBOW_CrossEntropyLoss_SGD-lr0.001 (./data/embeddings/CBOW_CrossEntropyLoss_SGD-lr0.001.pt
Validation | CBOW_CrossEntropyLoss_SGD-lr0.020 | Accuracy 0.2166
Validation | CBOW_CrossEntropyLoss_SGD-lr0.010 | Accuracy 0.2084
Validation | CBOW_CrossEntropyLoss_SGD-lr0.001 | Accuracy 0.1869
Best model on validation: CBOW_CrossEntropyLoss_SGD-lr0.020 | Accuracy 0.21659497990751397


## Insepecting the embeddings
In this section we try to understand the embeddings we created in the previous section.  
We will identify which words the model believes are similar and take a look at the embeddings using the `Tensorflow Projector` tool.

In [14]:
def word_vector_similarity_cosine(word_a:torch.Tensor, word_b:torch.Tensor):
    return torch.dot(word_a, word_b) / (word_a.norm() * word_b.norm())

def word_vector_similarity_euclidian(word_a:torch.Tensor, word_b:torch.Tensor):
    return (word_a - word_b).norm()

def word_similarity_cosine(word_a:str, word_b:str):
    word_a_idx = vocabulary[word_a]
    word_b_idx = vocabulary[word_b]

    word_a_embedding = embeddings[word_a_idx]
    word_b_embedding = embeddings[word_b_idx]

    return word_vector_similarity_cosine(word_a_embedding, word_b_embedding)

def word_find_top_closest(
    word: str,
    top: int
):
    similarities = []
    for other in vocabulary.lookup_tokens(range(len(vocabulary))):
        similarity = word_similarity_cosine(word, other).item()
        similarities.append((other, similarity))

    similarities.sort(key=lambda x: x[1], reverse=True)

    similarities = similarities[1:top+1]

    return similarities

def word_find_closest(
    word_vector:torch.Tensor,
):
    closest_word = None
    closest_distance = 1_000_000

    for other in vocabulary.lookup_tokens(range(len(vocabulary))):
        other_idx = vocabulary[other]
        other_embedding = embeddings[other_idx]

        distance = word_vector_similarity_euclidian(word_vector, other_embedding)

        if distance < closest_distance:
            closest_distance = distance
            closest_word = other
    
    return closest_word

In [15]:
def print_most_similar_words(words, top = 10):
    print(f"Top {top} most similar words")
    for word in words:
        if vocabulary[word] == vocabulary['<unk>']:
            print(word, ':', "Not in vocabulary")
        else:
            print(word, ':', [x[0] for x in word_find_top_closest(word, top)])

print_most_similar_words([
    'king', 'queen', 'man', 'woman', 'he', 'she', 'doctor', 'nurse',
    'black', 'white', 'slave', 'master',
    'poor', 'rich', 
    'smart', 'dumb', 
    'strong', 'weak',
    'good', 'bad',
])

Top 10 most similar words
king : ['earl', 'prince', 'father', 'spirit', 'building', 'aloud', 'rest', 'wrong', 'bishop', 'feel']
queen : ['fixed', 'can', 'sprang', 'desired', 'trust', 'wound', 'large', 'walls', 'spring', 'crossed']
man : ['wood', 'social', 'together', 'doctor', 'chamber', 'glancing', 'party', 'sun', 'harm', 'ye']
woman : ['fatigue', 'soul', 'paid', 'empty', 'size', 'heat', 'man', 'thick', 'singing', 'le']
he : ['who', 'she', 'growth', 'fully', 'face', 'wrong', 'count', 'they', 'example', 'i']
she : ['he', 'himself', 'everything', 'never', 'excellent', 'brothers', 'key', 'child', 'her', 'i']
doctor : ['smile', 'report', 'torn', 'glancing', 'man', 'really', 'cases', 'otherwise', 'uttered', 'dream']
nurse : Not in vocabulary
black : ['poor', 'skald', 'words', 'white', 'turned', 'o', 'minutes', 'called', 'years', 'human']
white : ['single', 'poor', 'excellent', 'rain', 'nose', 'personal', 'whose', 'black', 'horrible', 'gaze']
slave : Not in vocabulary
master : ['begged', 'b

In [16]:
def tensorflow_projector_create_data():
    e = embeddings.cpu().numpy()
    e = pd.DataFrame(e)
    e.to_csv(DATA_DIR + 'tensorflow_projector/embeddings.tsv', sep='\t', index=False, header=False)

    v = vocabulary.lookup_tokens(range(len(vocabulary)))
    v = pd.DataFrame(v)
    v.to_csv(DATA_DIR + 'tensorflow_projector/vocabulary.tsv', sep='\t', index=False, header=False)

tensorflow_projector_create_data()

# Conjugating _be_ and _have_

In [35]:
BEHAVE_CONTEXT_SIZE = 5
BEHAVE_BATCH_SIZE = 512
BEHAVE_EPOCHS = 1
BEHAVE_WORDS = ['<unk>', 'be', 'am', 'are', 'is', 'was', 'were', 'been', 'being', 'have', 'has', 'had', 'having']
BEHAVE_WORDS_SIZE = len(BEHAVE_WORDS)

class BeHaveRNN(nn.Module):
    def __init__(self):
        super(BeHaveRNN, self).__init__()

        self.rnn = nn.RNN(EMBEDDINGS_DIM*BEHAVE_CONTEXT_SIZE, EMBEDDINGS_DIM, batch_first=True)
        self.fc1 = nn.Linear(EMBEDDINGS_DIM, BEHAVE_WORDS_SIZE * 4)
        self.fc2 = nn.Linear(BEHAVE_WORDS_SIZE * 4, BEHAVE_WORDS_SIZE)
        self.hidden = None

    def reset(self):
        self.hidden = None

    def forward(self, x):
        if isinstance(x, list):
            return [self.forward(x) for x in x]

        x = embeddings[x]
        x = x.view(x.size(0), -1)
        x, hidden = self.rnn(x, self.hidden)
        x = nn.functional.relu(x)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.fc2(x)
        x = torch.log_softmax(x, dim=1)

        self.hidden = hidden.data

        return x

def behave_create_dataset(
    words: list[str],
    dataset_name: str
):
    """
    Creates a dataset from the given words.  
    """
    label_to_vocabulary:dict[int, int] = {}
    vocabulary_to_label = { vocabulary[word]: 0 for word in vocabulary.lookup_tokens(range(VOCABULARY_SIZE)) }

    for label, word in enumerate(BEHAVE_WORDS):
        vocabulary_index = vocabulary[word]

        vocabulary_to_label[vocabulary_index] = label
        label_to_vocabulary[label] = vocabulary_index

    return dataset_create(words, BEHAVE_CONTEXT_SIZE, vocabulary_to_label, dataset_name='behave.'+dataset_name), label_to_vocabulary

def behave_rnn_transform_targets(targets: torch.Tensor) -> list[torch.Tensor]:
    return [torch.nn.functional.one_hot(targets, num_classes=BEHAVE_WORDS_SIZE).float()] * BEHAVE_CONTEXT_SIZE

def behave_rnn_transform_contexts(contexts: torch.Tensor) -> list[torch.Tensor]:
    contextList = []
    for i in range(BEHAVE_CONTEXT_SIZE - 1):
        contextList.append(contexts[:, 0:i])
    return contextList

def behave_rnn_criterion(criterion, outputs: list[torch.Tensor], targets: list[torch.Tensor]) -> torch.Tensor:
    loss = 0
    for i in range(BEHAVE_CONTEXT_SIZE - 1):
        loss += criterion(outputs[i], targets[i])
    return loss

def behave_rnn_create_criterion(criterion: object) -> object:
    return lambda x, y: behave_rnn_criterion(criterion, x, y)

def behave_rnn_train(
    dataset: TensorDataset,
    model: CBOW,
    criterion: object,
    optimizer: torch.optim.Optimizer
):
    return model_train(
        model, dataset, criterion, optimizer, 
        model_category='behave',
        epochs=BEHAVE_EPOCHS, batch_size=BEHAVE_BATCH_SIZE,
        tranform_targets=lambda x: torch.nn.functional.one_hot(x, num_classes=BEHAVE_WORDS_SIZE).float()
    )

def behave_performance(
    dataset: TensorDataset,
    model: CBOW,
    dataset_name: str = 'Validation'
):
    return model_accuracy(
        model, dataset, dataset_name,
        transform_outputs=lambda x: torch.argmax(x, dim=1)
    )

model = BeHaveRNN()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
training_data, label_to_vocabulary = behave_create_dataset(words_train, 'train')

behave_rnn_train(training_data, model, criterion, optimizer)

validation_data, _ = behave_create_dataset(words_val, 'val')
behave_performance(validation_data, model, 'Validation')

Loaded BeHaveRNN_CrossEntropyLoss_Adam-lr0.001 (./data/behave/BeHaveRNN_CrossEntropyLoss_Adam-lr0.001.pt
Validation | BeHaveRNN_CrossEntropyLoss_Adam-lr0.001 | Accuracy 0.9474


KeyboardInterrupt: 