In [1]:
import torch
from torch import nn
import numpy as np
import torch.nn.functional as F

### Data Preparation

**Load Data**

In [2]:
with open('anna.txt', 'r') as f:
    text = f.read()

In [3]:
text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

**Tokenization**

In [4]:
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch:ii for ii, ch in int2char.items()}
encoded = np.array([char2int[ch] for ch in text])
encoded[:100]

array([46, 29,  6, 38, 15, 78, 12, 59, 79, 49, 49, 49, 36,  6, 38, 38, 73,
       59, 50,  6, 58, 34,  2, 34, 78, 63, 59,  6, 12, 78, 59,  6,  2,  2,
       59,  6,  2, 34, 82, 78, 33, 59, 78, 75, 78, 12, 73, 59, 67, 61, 29,
        6, 38, 38, 73, 59, 50,  6, 58, 34,  2, 73, 59, 34, 63, 59, 67, 61,
       29,  6, 38, 38, 73, 59, 34, 61, 59, 34, 15, 63, 59, 64, 56, 61, 49,
       56,  6, 73, 27, 49, 49, 44, 75, 78, 12, 73, 15, 29, 34, 61])

**One-Hot Encoding**

In [5]:
def one_hot_encode(arr, n_labels):
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    return one_hot

In [6]:
def get_batches(arr, batch_size, seq_length):
    batch_size_total = batch_size*seq_length
    n_batches = len(arr)//batch_size_total
    arr = arr[:n_batches*batch_size_total]
    arr = arr.reshape((batch_size, -1))
    for n in range(0, arr.shape[1], seq_length):
        x = arr[:, n:n+seq_length]
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length] 
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [7]:
batches = get_batches(encoded, 8, 50)
x, y = next(batches)
print('x\n', x[:10, :10])
print('y\n', y[:10, :10])

x
 [[46 29  6 38 15 78 12 59 79 49]
 [63 64 61 59 15 29  6 15 59  6]
 [78 61 66 59 64 12 59  6 59 50]
 [63 59 15 29 78 59  4 29 34 78]
 [59 63  6 56 59 29 78 12 59 15]
 [ 4 67 63 63 34 64 61 59  6 61]
 [59 48 61 61  6 59 29  6 66 59]
 [31 53  2 64 61 63 82 73 27 59]]
y
 [[29  6 38 15 78 12 59 79 49 49]
 [64 61 59 15 29  6 15 59  6 15]
 [61 66 59 64 12 59  6 59 50 64]
 [59 15 29 78 59  4 29 34 78 50]
 [63  6 56 59 29 78 12 59 15 78]
 [67 63 63 34 64 61 59  6 61 66]
 [48 61 61  6 59 29  6 66 59 63]
 [53  2 64 61 63 82 73 27 59 17]]


### Training

In [8]:
if torch.backends.mps.is_available():
    device = "mps"
    print("Training on GPU")
else:
    device = "cpu"
    print("Training on CPU")

Training on GPU


In [9]:
class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden = 256, n_layers=2, drop_prob=0.5, lr=0.001):
        super().__init__()
        self. drop_prob = drop_prob
        self. n_layers = n_layers
        self.n_hidden = n_hidden
        self. lr = lr
        
        #creating character dicts
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(n_hidden, len(self.chars))
        
    def forward(self, x, hidden):
        r_output, hidden = self.lstm(x, hidden)
        out = self.dropout(r_output)
        out = out.view(-1, self.n_hidden)
        out = self.fc(out)
        return out, hidden
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
                 weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))
        return hidden

In [10]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    net.to(device)
    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        h = net.init_hidden(batch_size)
        for x, y in get_batches(data, batch_size, seq_length):
            counter+=1
            
            #one hot encode data and make them torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, target = torch.from_numpy(x), torch.from_numpy(y)
            
            #Move data to device
            inputs = inputs.to(device)
            target = target.to(device)
            
            #Create new var for hidden state to avoid backprop thru entire training history
            h = tuple([x.data for x in h])
            
            #zero accumulated gradients
            net.zero_grad()
            
            #Get output from model
            output, h = net(inputs, h)
            
            #loss & backprop
            loss = criterion(output, target.view(batch_size*seq_length))
            loss.backward()
            
            #clip gradient - Prevent exploding gradient problem
            nn.utils.clip_grad_norm(net.parameters(), clip)
            opt.step()
            
            #loss stats
            if counter % print_every == 0:
                val_h = net.init_hidden(batch_size)
                val_losses = []
                for x, y in get_batches(val_data, batch_size, seq_length):
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    val_h = tuple([x.data for x in val_h])
                    inputs, target = x, y
                    
                    #Move data to device
                    inputs = inputs.to(device)
                    target = target.to(device)
                    
                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, target.view(batch_size*seq_length))
                    val_losses.append(val_loss)
                print("Epoch: {}/{}...".format(e+1, epochs),
                     "Step: {}...".format(counter),
                     "Loss: {:4f}...".format(loss.item()),
                     "Val Loss: {:4f}...".format(np.mean(val_losses)))

In [11]:
n_hidden = 512
n_layers = 2
net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (lstm): LSTM(83, 128, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=128, out_features=83, bias=True)
)




In [None]:
batch_size = 128
seq_length = 100
n_epochs = 20

#train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

In [None]:
model_name = 'rnn_20_epoch.net'
checkpoint = {'n_hidden':net.n_hidden, 'n_layers':net.n_layers, 'state_dict':net.state_dict(), 'tokens':chars}
with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

In [None]:
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [None]:
def sample(net, size, prime='The', top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [None]:
print(sample(net, 1000, prime='Anna', top_k=5))

**Loading a checkpoint**

In [None]:
# Here we have loaded in a model that trained over 20 epochs `rnn_20_epoch.net`
with open('rnn_20_epoch.net', 'rb') as f:
    checkpoint = torch.load(f)
    
loaded = CharRNN(checkpoint['tokens'], n_hidden=checkpoint['n_hidden'], n_layers=checkpoint['n_layers'])
loaded.load_state_dict(checkpoint['state_dict'])

In [None]:
print(sample(loaded, 2000, top_k=5, prime="And Levin said"))