Built an RNN model from scratch as well as using pytorch built-in nn.RNN module. The model realized here is based on character instead of word as the original intent is to use on Chinese text.

In [15]:
import torch
import torch.nn as nn
import time
import math
import sys

In [27]:
def load_data_text():
    with open('./tale-of-two-cities.txt') as f:
        corpus_chars = f.read()
        
    corpus_chars = corpus_chars.replace('\n', ' ').replace('\r', ' ')
    corpus_chars = corpus_chars[0:500000]
    idx_to_char = list(set(corpus_chars))
    char_to_idx = dict([(char, i) for i, char in enumerate(idx_to_char)])
    vocab_size = len(char_to_idx)
    corpus_indices = [char_to_idx[char] for char in corpus_chars]
    
    return corpus_indices, char_to_idx, idx_to_char, vocab_size

In [28]:
corpus_indices, char_to_idx, idx_to_char, vocab_size = load_data_text()

In [31]:
# corpus_indices
# vocab_size

67

In [6]:
# One Hot encoding to represent a word as a vector 

def one_hot(x, n_class, dtype=torch.float32):
    
    result = torch.zeros(x.shape[0], n_class, 
                         dtype=dtype, device=x.device)  # shape: (n, n_class)
    result.scatter_(1, x.long().view(-1, 1), 1)  # result[i, x[i, 0]] = 1
    return result
    
x = torch.tensor([0, 2])
x_one_hot = one_hot(x, vocab_size)

print(x_one_hot)
print(x_one_hot.shape)
print(x_one_hot.sum(axis=1))

tensor([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0.]])
torch.Size([2, 61])
tensor([1., 1.])


In [7]:
def to_onehot(X, n_class):
    return [one_hot(X[:, i], n_class) for i in range(X.shape[1])]

X = torch.arange(10).view(2, 5)
inputs = to_onehot(X, vocab_size)
print(len(inputs), inputs[0].shape)

5 torch.Size([2, 61])


### One Hot drawbacks:
Cannot represent the similarity or relationships between words, can be very sparse, bad for NN model performance

In [11]:
# initializaing model params

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_inputs, num_hiddens, num_outputs = vocab_size, 256, vocab_size
## num_inputs: d
## num_hiddens: h, the number of hidden units is Hyperparam
## num_outputs: q

def get_params():
    def _one(shape):
        param = torch.zeros(shape, device=device, dtype=torch.float32)
        nn.init.normal_(param, 0, 0.01)
        return torch.nn.Parameter(param)

    # params for hidden layer
    W_xh = _one((num_inputs, num_hiddens))
    W_hh = _one((num_hiddens, num_hiddens))
    b_h = torch.nn.Parameter(torch.zeros(num_hiddens, device=device))
    
    # params for output layer
    W_hq = _one((num_hiddens, num_outputs))
    b_q = torch.nn.Parameter(torch.zeros(num_outputs, device=device))
    
    return (W_xh, W_hh, b_h, W_hq, b_q)

In [12]:
# define RNN model
def rnn(inputs, state, params):
# inputs & outputs are num_steps numbers of matrices batch_size*vocab_size

    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state
    outputs = []
    
    for X in inputs:
        H = torch.tanh(torch.matmul(X, W_xh) + torch.matmul(H, W_hh) + b_h)
        Y = torch.matmul(H, W_hq) + b_q
        outputs.append(Y)
        
    return outputs, (H,)

In [13]:
# define initial hidden state params
def init_rnn_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device), )

In [14]:
print(X.shape)
print(num_hiddens)
print(vocab_size)

state = init_rnn_state(X.shape[0], num_hiddens, device)
inputs = to_onehot(X.to(device), vocab_size)
params = get_params()
outputs, state_new = rnn(inputs, state, params)

print(len(inputs), inputs[0].shape)
print(len(outputs), outputs[0].shape)
print(len(state), state[0].shape)
print(len(state_new), state_new[0].shape)

torch.Size([2, 5])
256
61
5 torch.Size([2, 61])
5 torch.Size([2, 61])
1 torch.Size([2, 256])
1 torch.Size([2, 256])


In [None]:
# RNN may have gradient vanishing, so we need to perform gradient clipping
def grad_clipping(params, theta, device):
    norm = torch.tensor([0.0], device=device)
    for param in params:
        norm += (param.grad.data ** 2).sum()
    norm = norm.sqrt().item()
    
    if norm > theta:
        for param in params:
            param.grad.data *= (theta / norm)

In [None]:
# define prediction function
def predict_rnn(prefix, num_chars, rnn, params, init_rnn_state,
                num_hiddens, vocab_size, device, idx_to_char, char_to_idx):
    
    state = init_rnn_state(1, num_hiddens, device)
    output = [char_to_idx[prefix[0]]]   # output takes prefix + num_chars characters from predicyion
    
    for t in range(num_chars + len(prefix) - 1): 
        
        # the output of previous timestep is the input of current timestep
        X = to_onehot(torch.tensor([[output[-1]]], device=device), vocab_size)
        
        # calculate output and update hidden state
        (Y, state) = rnn(X, state, params)
        
        # the input of next timestep is either the char in prefix or the best char in prediction
        if t < len(prefix) - 1:
            output.append(char_to_idx[prefix[t + 1]])
        else:
            output.append(Y[0].argmax(dim=1).item())
            
    return ''.join([idx_to_char[i] for i in output])


In [None]:
# define training function
def train_and_predict_rnn(rnn, get_params, init_rnn_state, num_hiddens,
                          vocab_size, device, corpus_indices, idx_to_char,
                          char_to_idx, is_random_iter, num_epochs, num_steps,
                          lr, clipping_theta, batch_size, pred_period,
                          pred_len, prefixes):
    
    if is_random_iter:
        data_iter_fn = d2l.data_iter_random 
    else:
        data_iter_fn = d2l.data_iter_consecutive
    params = get_params()
    loss = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        if not is_random_iter:  # if not using random sampling, initiate hidden state at the start of epoch
            state = init_rnn_state(batch_size, num_hiddens, device)
        l_sum, n, start = 0.0, 0, time.time()
        data_iter = data_iter_fn(corpus_indices, batch_size, num_steps, device)
        for X, Y in data_iter:
            if is_random_iter:  # if  using random sampling, initiate hidden state before batch update
                state = init_rnn_state(batch_size, num_hiddens, device)
            else:  # or need to detach hidden state using detach
                for s in state:
                    s.detach_()
            

            # inputs & outputs are num_steps numbers of matrices batch_size*vocab_size
            inputs = to_onehot(X, vocab_size)
            (outputs, state) = rnn(inputs, state, params)
            
            # shape is (num_steps * batch_size, vocab_size) after concatenation
            outputs = torch.cat(outputs, dim=0)
            
            # Y shape is (batch_size, num_steps), transpose and reshape to
            # (num_steps * batch_size,) vector to match the output
            y = torch.flatten(Y.T)
            # calculate loss using cross entropy
            l = loss(outputs, y.long())
            
            # reset gradient to zero
            if params[0].grad is not None:
                for param in params:
                    param.grad.data.zero_()
                    
            l.backward()
            
            grad_clipping(params, clipping_theta, device)  # grad clipping
            d2l.sgd(params, lr, 1)  # no need to average grad as the loss is already mean
            l_sum += l.item() * y.shape[0]
            n += y.shape[0]

        if (epoch + 1) % pred_period == 0:
            print('epoch %d, perplexity %f, time %.2f sec' % (
                epoch + 1, math.exp(l_sum / n), time.time() - start))
            
            for prefix in prefixes:
                print(' -', predict_rnn(prefix, pred_len, rnn, params, init_rnn_state,
                    num_hiddens, vocab_size, device, idx_to_char, char_to_idx))

In [None]:
# testing

## Using Pytorch built-in nn.RNN module to build model

1. input_size - The number of expected features in the input x
2. hidden_size – The number of features in the hidden state h
3. nonlinearity – The non-linearity to use. Can be either 'tanh' or 'relu'. Default: 'tanh'
4. batch_first – If True, then the input and output tensors are provided as (batch_size, num_steps, input_size). Default: False

Here, the batch_first decides the input shape, we use the default False, which corresponds to the (num_steps, batch_size, input_size) shape.

#### forward function params：

- input of shape (num_steps, batch_size, input_size): 
tensor containing the features of the input sequence.

- h_0 of shape (num_layers * num_directions, batch_size, hidden_size): 
tensor containing the initial hidden state for each element in the batch. 
Defaults to zero if not provided. If the RNN is bidirectional, num_directions should be 2, else it should be 1.

#### forward function returns:

- output of shape (num_steps, batch_size, num_directions * hidden_size): 
tensor containing the output features (h_t) from the last layer of the RNN, for each t.

- h_n of shape (num_layers * num_directions, batch_size, hidden_size): 
tensor containing the hidden state for t = num_steps.

In [1]:
## Now build an nn.RNN model to see the output shape.

In [None]:
class RNNModel(nn.Module):
    
    def __init__(self, rnn_layer, vocab_size):
        super(RNNModel, self).__init__()
        self.rnn = rnn_layer
        self.hidden_size = rnn_layer.hidden_size * (2 if rnn_layer.bidirectional else 1) 
        self.vocab_size = vocab_size
        self.dense = nn.Linear(self.hidden_size, vocab_size)

    def forward(self, inputs, state):
        # inputs.shape: (batch_size, num_steps)
        
        X = to_onehot(inputs, vocab_size)
        X = torch.stack(X)  # gather all the input together, X.shape: (num_steps, batch_size, vocab_size)
        hiddens, state = self.rnn(X, state)
        hiddens = hiddens.view(-1, hiddens.shape[-1])  # hiddens.shape: (num_steps * batch_size, hidden_size)
        output = self.dense(hiddens)
        
        return output, state

In [None]:
def predict_rnn_pytorch(prefix, num_chars, model, vocab_size, device, idx_to_char,
                      char_to_idx):
    
    state = None
    output = [char_to_idx[prefix[0]]]  # output takes prefix + number of num_chars chars
    
    for t in range(num_chars + len(prefix) - 1):
        X = torch.tensor([output[-1]], device=device).view(1, 1)
        (Y, state) = model(X, state)  # no param input in forward function
        if t < len(prefix) - 1:
            output.append(char_to_idx[prefix[t + 1]])
        else:
            output.append(Y.argmax(dim=1).item())
            
    return ''.join([idx_to_char[i] for i in output])

In [None]:
def train_and_predict_rnn_pytorch(model, num_hiddens, vocab_size, device,
                                corpus_indices, idx_to_char, char_to_idx,
                                num_epochs, num_steps, lr, clipping_theta,
                                batch_size, pred_period, pred_len, prefixes):
    
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    model.to(device)
    for epoch in range(num_epochs):
        l_sum, n, start = 0.0, 0, time.time()
        data_iter = d2l.data_iter_consecutive(corpus_indices, batch_size, num_steps, device) # non-shuffle sampling
        state = None
        for X, Y in data_iter:
            if state is not None:
                # detach hidden state
                if isinstance (state, tuple): # LSTM, state:(h, c)  
                    state[0].detach_()
                    state[1].detach_()
                else: 
                    state.detach_()
            (output, state) = model(X, state) # output.shape: (num_steps * batch_size, vocab_size)
            y = torch.flatten(Y.T)
            l = loss(output, y.long())
            
            optimizer.zero_grad()
            l.backward()
            grad_clipping(model.parameters(), clipping_theta, device)
            optimizer.step()
            l_sum += l.item() * y.shape[0]
            n += y.shape[0]
        

        if (epoch + 1) % pred_period == 0:
            print('epoch %d, perplexity %f, time %.2f sec' % (
                epoch + 1, math.exp(l_sum / n), time.time() - start))
            
            for prefix in prefixes:
                print(' -', predict_rnn_pytorch(
                    prefix, pred_len, model, vocab_size, device, idx_to_char,
                    char_to_idx))