In [1]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torch.nn.utils.rnn import pack_sequence, pad_packed_sequence

import os
import string

In [2]:
vocab = list(string.ascii_letters) + list(string.digits) + list(string.punctuation) + [" "] + ["UNK"]

In [59]:
def tokenize(vocab: list[str], input: str) -> torch.Tensor:
    vecs = []
    for char in input:
        try:
            idx = vocab.index(char)
        except ValueError:
            idx = -1
        vec = torch.zeros(len(vocab))
        vec[idx] = 1
        vecs.append(vec)
    return torch.stack(vecs)


def detokenize(vocab: list[str], input: torch.Tensor) -> str:
    output = ""
    for token in input:
        idx = torch.nonzero(token)
        output += vocab[idx]
    return output


def make_training_pair(tokens, context_length):
    inputs = []
    labels = []
    for i in range(len(tokens) - 1):
        if i < context_length:
            # Add one here because the : is exclusive on the right
            input = tokens[: i + 1]
        else:
            input = tokens[i - context_length : i]

        # Loss function expects output class as integer and not one-hot
        label = torch.nonzero(tokens[i + 1])

        inputs.append(input)
        labels.append(label)

    return pack_sequence(inputs, enforce_sorted=False), torch.stack(labels).flatten()


class MovieReviewDataset(Dataset):
    def __init__(self, review_dir: str, context_length: int, vocab: list[str]) -> None:
        self.dir = review_dir
        self.context_length = context_length
        self.vocab = vocab
        self.files = [item for item in os.listdir(review_dir)]

    def __len__(self) -> int:
        return len(self.files)

    def __getitem__(self, index: int) -> str:
        with open(self.dir + self.files[index], "r") as file:
            string = file.read()
        tokens = tokenize(self.vocab, string)
        return make_training_pair(tokens, self.context_length)

In [4]:
dataset = MovieReviewDataset(
    "./data/aclImdb/train/unsup/", context_length=128, vocab=vocab
)

In [12]:
class MovieModel(nn.Module):
    def __init__(self, vocab_length, hidden_size, num_layers) -> None:
        super().__init__()
        self.lstm = nn.LSTM(vocab_length, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_length)
        self.sm = nn.LogSoftmax(dim=1)

    def forward(self, x):
        lstm_out = self.lstm(x)[0]
        # Output of LSTM is PackedSequence, so turn that into a tensor of padded sequences
        lstm_out, lengths = pad_packed_sequence(lstm_out)
        # By using lengths of the sequences, select the last output of the LSTM
        lstm_out = lstm_out[lengths - 1, torch.arange(0, len(lengths)), :]
        return self.sm(self.linear(lstm_out))

In [108]:
model = MovieModel(len(vocab), 512, 3)
optim = torch.optim.AdamW(model.parameters(), lr=0.1)
loss_func = nn.NLLLoss()

losses = []
for sample in range(100):
    print(sample)
    optim.zero_grad()

    input, labels = dataset[sample]
    loss = loss_func(model(input), labels)
    losses.append(loss)

    loss.backward()
    optim.step()

0
1
2
3
4
5
6


In [100]:
def gen_text(model, vocab, chars, context_length):
    outputs = []
    for i in range(chars):
        if i == 0:
            input = [torch.zeros((1, len(vocab)))]
        elif i < context_length:
            # Add one here because the : is exclusive on the right
            input = outputs[: i + 1]
        else:
            input = outputs[i - context_length : i]
        
        idx = model(pack_sequence(input)).argmax()
        output = torch.zeros((1, len(vocab)))
        output[:, idx] = 1
        outputs.append(output)

    return torch.vstack(outputs)

In [105]:
detokenize(vocab, gen_text(model, vocab, 100, 100))

'                                                                                                    '