In [1]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import pdb
from torch.utils.data import Dataset, DataLoader

%load_ext autoreload
%autoreload 2

torch.set_printoptions(linewidth=200)

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
hidden_size = 100

In [3]:
import torch
from torch.utils.data import Dataset

class DinosDataset(Dataset):
    def __init__(self):
        super().__init__()
        with open('dinos.txt') as f:
            content = f.read().lower()  # to lower case
        
        self.vocab = sorted(set(content))  # set removes repetition
        self.vocab_size = len(self.vocab)

        self.lines = content.splitlines()

        # Create dictionaries for character-to-index and index-to-character conversions
        self.ch_to_idx = {c: i for i, c in enumerate(self.vocab)}
        self.idx_to_ch = {i: c for i, c in enumerate(self.vocab)}
    
    def __getitem__(self, index):
        line = self.lines[index]

        # Prepend a space to the line. This is often used in text models to represent the start of a sequence.
        x_str = ' ' + line
        # Append a newline character to the line, possibly to indicate the end of a sequence.
        y_str = line + '\n'

        # Initialize a tensor for x with zeros. The shape is [length of the string, vocabulary size].
        x = torch.zeros([len(x_str), self.vocab_size], dtype=torch.float)
        # Initialize a tensor for y. It will contain the indices of characters.
        y = torch.empty(len(x_str), dtype=torch.long)

        # The first character of y is the first character of the line. 
        # The corresponding x value is a vector of zeros, representing the start.
        y[0] = self.ch_to_idx[y_str[0]]

        # Iterate over each character in the line (excluding the first space in x_str)
        for i, (x_ch, y_ch) in enumerate(zip(x_str[1:], y_str[1:]), start=1):
            # x is an one-hot encoding of the charactor
            x[i][self.ch_to_idx[x_ch]] = 1
            # y is the index of the charactor
            y[i] = self.ch_to_idx[y_ch]
        
        # Return the pair of tensors representing the input and target for the model
        return x, y
    
    def __len__(self):
        # Return the total number of lines in the dataset
        return len(self.lines)


In [4]:
trn_ds = DinosDataset()
trn_dl = DataLoader(trn_ds, batch_size=1, shuffle=True)

With this tokenization, the first vector in $x$ is always a zero vector (for " "), and the last element in $y$ is always 0 (for "\n").

In [5]:
trn_ds[1]

(tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.

In [6]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.linear_hh = nn.Linear(hidden_size, hidden_size)
        self.linear_hx = nn.Linear(input_size, hidden_size, bias=False)
        self.linear_output = nn.Linear(hidden_size, output_size)
    
    def forward(self, h_prev, x):
        h = torch.tanh(self.linear_hh(h_prev) + self.linear_hx(x))
        y = self.linear_output(h)
        return h, y

In [7]:
model = RNN(trn_ds.vocab_size, hidden_size, trn_ds.vocab_size).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-2)

In [8]:
def print_sample(sample_idxs):
    print(trn_ds.idx_to_ch[sample_idxs[0]].upper(), end='')
    [print(trn_ds.idx_to_ch[x], end='') for x in sample_idxs[1:]]

In [9]:
def sample(model, trn_ds, device):
    model.eval()
    word_size = 0

    newline_idx = trn_ds.ch_to_idx['\n']
    indices = []  # store the output
    pred_char_idx = -1
    # Initialize h_prev
    h_prev = torch.zeros([1, model.hidden_size], dtype=torch.float, device=device)
    # Initialize the input tensor\
    x = h_prev.new_zeros([1, trn_ds.vocab_size])

    with torch.no_grad():
        # Generate characters until the newline character is predicted or the word reaches 50 characters
        while pred_char_idx != newline_idx and word_size != 50:
            h_prev, y_pred = model(h_prev, x)
            # Apply softmax to get probability distribution over the vocabulary
            softmax_scores = torch.softmax(y_pred, dim=1).cpu().numpy().ravel()
            # Randomly select a character index based on the predicted probabilities
            np.random.seed(np.random.randint(1, 5000))
            idx = np.random.choice(np.arange(trn_ds.vocab_size), p=softmax_scores)
            indices.append(idx)
            
            # Update the input to the chosen next charactor
            x = (y_pred == y_pred.max(1)[0]).float()
            # check end of line
            pred_char_idx = idx
            
            word_size += 1
        
        # If maximum length is reached, append the newline index at the end of the output tensor
        if word_size == 50:
            indices.append(newline_idx)

    return indices

In [10]:
def train_one_epoch(model, loss_fn, optimizer, trn_dl, device):
    # Iterate over all lines (batches) in the DataLoader
    for line_num, (x, y) in enumerate(trn_dl):
        model.train()
        loss = 0
        optimizer.zero_grad()
        # Initialize the h_prev at the start of each line
        h_prev = torch.zeros([1, model.hidden_size], dtype=torch.float, device=device)
        x, y = x.to(device), y.to(device)

        # Iterate over each character in the line
        for i in range(x.shape[1]):
            # Forward pass: compute predicted y by passing x and h_prev to the model
            h_prev, y_pred = model(h_prev, x[:, i])
            # Accumulate the loss
            loss += loss_fn(y_pred, y[:, i])

        # Print a sample every 100 lines
        if (line_num + 1) % 100 == 0:
            print_sample(sample(model, trn_ds, device))
            
        loss.backward()
        # Clip gradients to prevent exploding gradient problem in RNNs
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5)
        optimizer.step()

In [11]:
def train(model, loss_fn, optimizer, trn_dl, device, epochs=1):
    # Train the model for a number of epochs
    for e in range(1, epochs + 1):
        print(f'{"-" * 20} Epoch {e} {"-" * 20}')
        # Train the model for one epoch
        train_one_epoch(model, loss_fn, optimizer, trn_dl, device)

In [12]:
train(model, loss_fn, optimizer, trn_dl, device, epochs=5)

-------------------- Epoch 1 --------------------
Fastrrxusa
Quookssr
Guafeos
Pssysrsrnraolus
Rrhvsonaurur
Sacuoemaurusturus
Dumlcnerusalrus
Esplytngus
Jmairanuras
Laresashur
Xpsesourus
Jyrnsiueus
Guburrbrus
Lermr
Tarussnrus
-------------------- Epoch 2 --------------------
Etrounurus
Inais
Sisakluaus
Hlcucaurus
Turiseusus
Suaos
Gucurserus
Lfrir
Tcivntlaurus
Rnvusaur
Inbis
Slsailucus
Hlcucaurus
Tureshusus
Suans
-------------------- Epoch 3 --------------------
Fucuotdrus
Lhrir
Tcrustmrus
Etrotiurus
Inbgs
Slsaimugus
Hlaucitaurus
Llrusuurus
Amraroirus
Burasaurus
Ansrsaurus
Ssrysauruh
Lbruaiuras
Larasaumus
Wrsassurus
-------------------- Epoch 4 --------------------
Kyrksluhus
Fuauiscrus
Lirhr
Tcrupteaurus
Snvuriuras
Alrasaunus
Aalros
Tcrusturus
Ehurus
Rusaranrus
Guaasacros
Hbraoturesauros
Kyrosmulus
Eucuotirus
Lirio
-------------------- Epoch 5 --------------------
Tcrusttrus
Diurysauruo
Lalrasaurus
Anhuragras
Pueuspurus
Yurustcrus
Scretalruf
Cuoibtlrus
Bururaurus
Lsruaisaurus
Tbeunasaur

## Print training data (used for debugging, you can ignore this)

In [13]:
def print_ds(ds, num_examples=10):
    for i, (x, y) in enumerate(trn_ds, 1):
        print('*'*50)
        x_str, y_str = '', ''
        for idx in y:
            y_str += trn_ds.idx_to_ch[idx.item()]
        print(repr(y_str))

        for t in x[1:]:
            x_str += trn_ds.idx_to_ch[t.argmax().item()]
        print(repr(x_str))

        if i == num_examples:
            break

In [14]:
print_ds(trn_ds, 5)

**************************************************
'aachenosaurus\n'
'aachenosaurus'
**************************************************
'aardonyx\n'
'aardonyx'
**************************************************
'abdallahsaurus\n'
'abdallahsaurus'
**************************************************
'abelisaurus\n'
'abelisaurus'
**************************************************
'abrictosaurus\n'
'abrictosaurus'
