# Character-Level LSTM in PyTorch


First let's load in our required resources for data loading and model creation.

In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
with open('data/anna.txt', 'r') as f:
    text = f.read()

In [3]:
# encode the text and map each character to an integer and vice versa

chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}
encoded = np.array([char2int[ch] for ch in text])

In [4]:
text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

And we can see those same characters encoded as integers.

In [5]:
encoded[:100]

array([23, 63, 20, 41, 67, 21, 14, 27, 39, 70, 70, 70, 57, 20, 41, 41, 11,
       27, 15, 20,  6, 34, 82, 34, 21, 47, 27, 20, 14, 21, 27, 20, 82, 82,
       27, 20, 82, 34,  4, 21,  3, 27, 21, 76, 21, 14, 11, 27, 59, 29, 63,
       20, 41, 41, 11, 27, 15, 20,  6, 34, 82, 11, 27, 34, 47, 27, 59, 29,
       63, 20, 41, 41, 11, 27, 34, 29, 27, 34, 67, 47, 27, 45, 12, 29, 70,
       12, 20, 11, 56, 70, 70, 36, 76, 21, 14, 11, 67, 63, 34, 29])

### Pre-processing the data


In [6]:
def one_hot_encode(arr, n_labels):
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    return one_hot

### Making training mini-batches


In [7]:
def get_batches(arr, n_seqs, n_steps):
    '''Create a generator that returns batches of size
       n_seqs x n_steps from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       n_seqs: Batch size, the number of sequences per batch
       n_steps: Number of sequence steps per batch
    '''
    batch_size = n_seqs * n_steps
    n_batches = len(arr)//batch_size
    arr = arr[:batch_size*n_batches]
    
    arr = arr.reshape((n_seqs,-1))
    
    for n in range(0, arr.shape[1], n_steps):
        x = arr[:, n:n+n_steps]
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+n_steps]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

### Test the Implementation


In [8]:
batches = get_batches(encoded, 10, 50)
x, y = next(batches)

In [9]:
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[23 63 20 41 67 21 14 27 39 70]
 [27 20  6 27 29 45 67 27 81 45]
 [76 34 29 56 70 70 49 31 21 47]
 [29 27  8 59 14 34 29 81 27 63]
 [27 34 67 27 34 47 35 27 47 34]
 [27 48 67 27 12 20 47 70 45 29]
 [63 21 29 27 58 45  6 21 27 15]
 [ 3 27 30 59 67 27 29 45 12 27]
 [67 27 34 47 29 19 67 56 27 24]
 [27 47 20 34  8 27 67 45 27 63]]

y
 [[63 20 41 67 21 14 27 39 70 70]
 [20  6 27 29 45 67 27 81 45 34]
 [34 29 56 70 70 49 31 21 47 35]
 [27  8 59 14 34 29 81 27 63 34]
 [34 67 27 34 47 35 27 47 34 14]
 [48 67 27 12 20 47 70 45 29 82]
 [21 29 27 58 45  6 21 27 15 45]
 [27 30 59 67 27 29 45 12 27 47]
 [27 34 47 29 19 67 56 27 24 63]
 [47 20 34  8 27 67 45 27 63 21]]


---
## Defining the network with PyTorch

In [10]:
class CharRNN(nn.Module):
    
    def __init__(self, tokens, n_steps=100, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        self.lstm = nn.LSTM(len(self.chars),self.n_hidden,self.n_layers, batch_first = True)
        
        self.dropout = nn.Dropout(self.drop_prob)
        
        self.fc = nn.Linear(self.n_hidden,len(self.chars))
        
        self.init_weights()
      
    
    def forward(self, x, hc):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hc`. '''
        
        x,(h,c) = self.lstm(x,hc)
        
        x = self.dropout(x)
        
        x = x.view(x.size()[0]*x.size()[1], self.n_hidden)
        
        x = self.fc(x)
        return x, (h, c)
    
    
    def predict(self, char, h=None, cuda=False, top_k=None):
        ''' Given a character, predict the next character.
        
            Returns the predicted character and the hidden state.
        '''
        if cuda:
            self.cuda()
        else:
            self.cpu()
        
        if h is None:
            h = self.init_hidden(1)
        
        x = np.array([[self.char2int[char]]])
        x = one_hot_encode(x, len(self.chars))
        inputs = torch.from_numpy(x)
        if cuda:
            inputs = inputs.cuda()
        
        h = tuple([each.data for each in h])
        out, h = self.forward(inputs, h)

        p = F.softmax(out, dim=1).data
        if cuda:
            p = p.cpu()
        
        if top_k is None:
            top_ch = np.arange(len(self.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
            
        return self.int2char[char], h
    
    def init_weights(self):
        ''' Initialize weights for fully connected layer '''
        initrange = 0.1
        
        self.fc.bias.data.fill_(0)
        self.fc.weight.data.uniform_(-1, 1)
        
    def init_hidden(self, n_seqs):
        ''' Initializes hidden state '''
        weight = next(self.parameters()).data
        return (weight.new(self.n_layers, n_seqs, self.n_hidden).zero_(),
                weight.new(self.n_layers, n_seqs, self.n_hidden).zero_())
        

### A note on the `predict`  function

The output of our RNN is from a fully-connected layer and it outputs a **distribution of next-character scores**.

To actually get the next character, we apply a softmax function, which gives us a *probability* distribution that we can then sample to predict the next character.

In [11]:
def train(net, data, epochs=10, n_seqs=10, n_steps=50, lr=0.001, clip=5, val_frac=0.1, cuda=False, print_every=10):
    
    net.train()
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if cuda:
        net.cuda()
    
    counter = 0
    n_chars = len(net.chars)
    
    for e in range(epochs):
        h = net.init_hidden(n_seqs)
        for x, y in get_batches(data, n_seqs, n_steps):
            
            counter += 1
            
            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if cuda:
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            net.zero_grad()
            
            output, h = net.forward(inputs, h)
            loss = criterion(output, targets.view(n_seqs*n_steps))

            loss.backward()
            
            # `clip_grad_norm` helps prevent the exploding gradient problem in LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)

            opt.step()
            
            if counter % print_every == 0:
                
                # Get validation loss
                val_h = net.init_hidden(n_seqs)
                val_losses = []
                for x, y in get_batches(val_data, n_seqs, n_steps):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if cuda:
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net.forward(inputs, val_h)
                    val_loss = criterion(output, targets.view(n_seqs*n_steps))
                
                    val_losses.append(val_loss.item())
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

### Train the network

In [12]:
if 'net' in locals():
    del net

In [24]:
# define and print the net
net = CharRNN(chars, n_hidden=256, n_layers=3)
print(net)

CharRNN(
  (lstm): LSTM(83, 256, num_layers=3, batch_first=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=256, out_features=83, bias=True)
)


In [25]:
n_seqs, n_steps = 128, 100

train(net, encoded, epochs=1, n_seqs=n_seqs, n_steps=n_steps, lr=0.001, cuda=False, print_every=10)

Epoch: 1/1... Step: 10... Loss: 3.4843... Val Loss: 3.4631
Epoch: 1/1... Step: 20... Loss: 3.3642... Val Loss: 3.3801
Epoch: 1/1... Step: 30... Loss: 3.3459... Val Loss: 3.3475
Epoch: 1/1... Step: 40... Loss: 3.2925... Val Loss: 3.3150
Epoch: 1/1... Step: 50... Loss: 3.2927... Val Loss: 3.2734
Epoch: 1/1... Step: 60... Loss: 3.1886... Val Loss: 3.1984
Epoch: 1/1... Step: 70... Loss: 3.0765... Val Loss: 3.1283
Epoch: 1/1... Step: 80... Loss: 3.0220... Val Loss: 3.0219
Epoch: 1/1... Step: 90... Loss: 2.9619... Val Loss: 2.9448
Epoch: 1/1... Step: 100... Loss: 2.8174... Val Loss: 2.8217
Epoch: 1/1... Step: 110... Loss: 2.6873... Val Loss: 2.6984
Epoch: 1/1... Step: 120... Loss: 2.5965... Val Loss: 2.6239
Epoch: 1/1... Step: 130... Loss: 2.5657... Val Loss: 2.5708


In [26]:
# Will be using a different model trained on gpu
# saving the model
model_name = 'rnn_1_epoch.net'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

### Sampling

In [27]:
def sample(net, size, prime='The', top_k=None, cuda=False):
        
    if cuda:
        net.cuda()
    else:
        net.cpu()

    net.eval()
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = net.predict(ch, h, cuda=cuda, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = net.predict(chars[-1], h, cuda=cuda, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [28]:
print(sample(net, 2000, prime='Anna', top_k=5, cuda=False))

Annad whin ghe ho tand of he an ta than sathit has andad int hime her tha the wor on ther sater on ha calte of the wartin to the wos anding hersin the the
dalenton, the the that he hose seut in so oled thing, and hit tham so hin har tas ath ansd his the had to the sor on sane he sarent to te hindes hin an he that ha sorad he and the hans ofo ho thin the wor has hod th men he withe whe he the whing an thered ho the he ha dimoned, saten thas sere tho chat and ane tith ath the ho sasede son of tho he and to has of onte anthis hint asd touther, suuthe whes ad oun hithe hud to ha too tore shus hat thand asd at hers at onle sare the se ther, singing ant that he sint hess, tenthe to her othe he souderithe afet the tounes ant he wathis as ol the thes saud the he timt one whas, shad
as the aride he the hered as ile orathe he she what on the and the sater an he se ole on oteringet the the whas hat to so ha seed the salting her here he ane thas thumt the the he sar tha seont ange atonedens thes t

## Loading a checkpoint

In [30]:
# Here we have loaded in a model that trained over on the gpu
with open('rnn_model.net', 'rb') as f:
    checkpoint = torch.load(f,map_location=torch.device('cpu'))
    
loaded = CharRNN(checkpoint['tokens'], n_hidden=checkpoint['n_hidden'], n_layers=checkpoint['n_layers'])
loaded.load_state_dict(checkpoint['state_dict'])

<All keys matched successfully>

In [31]:
# Change cuda to True if you are using GPU!
print(sample(loaded, 2000, cuda=False, top_k=5, prime="And Levin said"))

And Levin said.

"I have not come in. It is it all that I'm ashamed to her. Won't it all is a
pink,."

"Was. We've been so late..."

And he sat down to his wife's beauty. "What are you thinking of the tin only
and
freshness. I shall be so late, a man and say that is the day all of his wife
to make. I did not speak, but it appaner for me that it's between that
it of many to me that it was an one another, and I don't know., Stiva
said, I don't know why, the ministrous arouse. When that setting you
think that things to arrange him for anything in them, and she were see
that," said Vronsky, standing the same to this position.

The sense of the priving came in a sense of her. "In a position?" though
was, and the same weight still, as soon and his heads that the particular artray
stairched her face.

"I am so solution?" he remembered to him. "What a strange is the standarry in this
sense of the thing in the second seconds that to say
anything and the court, that is she we arrived or impares 