In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as func
import numpy as np
from tqdm import tqdm

In [2]:
# open text file and read
with open(r"C:\Users\Gokul\Downloads\anna.txt", 'r') as f:
    text = f.read()

In [3]:
# encoding and mapping text to int and vice versa
chars = tuple(set(text))                           #83 chars
int2char = dict(enumerate(chars))                  #mapping char to unique integers
char2int = {ch: i for i, ch in int2char.items()}   #mapping integers to char

# encode text
encoded = np.array([char2int[ch] for ch in text])  #encoding all char in txt to integers

In [4]:
np.array([[3,4,5]])

array([[3, 4, 5]])

In [6]:
#Preprocessing - creating one-hot vectors
def one_hot_encode(arr, vector_len):

    #initialize array
    one_hot = np.zeros((arr.size, vector_len), dtype = np.float32)
    #print(one_hot.shape[0])
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1
    #print(arr.shape, *arr.shape)
    #reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, vector_len))
    return one_hot

In [7]:
# To check whether the one_hot_encode function works as expected
test_seq = np.array([[3, 5, 1]]) # 2 dimensional
one_hot_trial = one_hot_encode(test_seq, 8)
print(one_hot_trial)

[[[0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 1. 0. 0.]
  [0. 1. 0. 0. 0. 0. 0. 0.]]]


In [8]:
def get_batches(arr, batch_size, seq_length):
    # arr = [1,2,..13], batch_size = 2 -> [1,2,..6],[7,8,..,12]
    # batch_size - 2 and seq_length - num of elements to go in a batch = 3
    total_batch_size = batch_size * seq_length  # 2*3 = 6
    n_batch = len(arr) // total_batch_size      # num of batches = 13//6 = 2
    # Keep only enough characters to make full batches
    arr = arr[:(n_batch * total_batch_size)]    # now arr = [1,2,..12]
    # Reshape into batch_size rows
    arr = arr.reshape(batch_size, -1)
    #print(arr.shape)

    # Iterate over the batches using a window of size seq_length
    for i in range(0, arr.shape[1], seq_length):
        # features
        x = arr[:, i:i+seq_length]
        # targets - shifted by one
        y = np.zeros_like(x)
        # whole thing below is a wrap-around - last element of y is first element of x
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, i+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        # yield is generator func, gives o/p generator object, can be read using for loop
        yield x, y
    #return x, y

In [9]:
# To check whether the get_batches function works as expected
batches = get_batches(encoded, batch_size = 8, seq_length = 50)
x, y = next(batches)
# printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[40 44 58 77 76 32 59 14 65 26]
 [81 55 45 14 76 44 58 76 14 58]
 [32 45 16 14 55 59 14 58 14 11]
 [81 14 76 44 32 14 82 44 34 32]
 [14 81 58 66 14 44 32 59 14 76]
 [82 46 81 81 34 55 45 14 58 45]
 [14 36 45 45 58 14 44 58 16 14]
 [49 52  1 55 45 81 51 80  5 14]]

y
 [[44 58 77 76 32 59 14 65 26 26]
 [55 45 14 76 44 58 76 14 58 76]
 [45 16 14 55 59 14 58 14 11 55]
 [14 76 44 32 14 82 44 34 32 11]
 [81 58 66 14 44 32 59 14 76 32]
 [46 81 81 34 55 45 14 58 45 16]
 [36 45 45 58 14 44 58 16 14 81]
 [52  1 55 45 81 51 80  5 14 67]]


In [10]:
# RNN
class RNN(nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob = 0.5, lr = 0.001):
        super().__init__()
        self.n_hidden = n_hidden
        self.n_layers = n_layers
        self.drop_prob = drop_prob
        self.lr = lr

        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: i for i, ch in self.int2char.items()}

        # LSTM(input, hidden, layers, dropout, batch_first)
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, dropout = drop_prob, batch_first = True)
        self.dropout = nn.Dropout(drop_prob)
        # Linear(hidden, output)
        self.fc = nn.Linear(n_hidden, len(self.chars))

    def forward(self, x, hidden):
        r_out, hidden = self.lstm(x, hidden)
        out = self.dropout(r_out)
        # change shape for FC layer
        out = out.contiguous().view(-1, self.n_hidden)
        out = self.fc(out)
        return out, hidden

    def init_hidden(self, batch_size):
        # Create two new tensors with sizes n_layers x batch_size x n_hidden
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        hidden = (weight.new(n_layers, batch_size, self.n_hidden).zero_(), weight.new(n_layers, batch_size, self.n_hidden).zero_())
        return hidden

In [11]:
def train(model, data, epochs=5, batch_size=10, seq_length=50, lr=0.001, clip=4, val_frac=0.1, print_every=10):
    model.train()
    # loss and optimizer
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr = lr)

    # Creating training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    counter = 0
    n_chars = len(model.chars)
    for epoch in tqdm(range(epochs)):
        h = model.init_hidden(batch_size)
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1

            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])
            output, h = model(inputs,h)
            loss = loss_func(output, targets.view(batch_size*seq_length).long())

            model.zero_grad()
            loss.backward()
            # prevents from exploding gradient problem
            nn.utils.clip_grad_norm_(model.parameters(), clip)
            optimizer.step()

            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = model.init_hidden(batch_size)
                val_losses = []
                model.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)

                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])

                    inputs, targets = x, y

                    output, val_h = model(inputs, val_h)
                    val_loss = loss_func(output, targets.view(batch_size*seq_length).long())
                    val_losses.append(val_loss.item())

                model.train() # reset to train mode after iterating through validation data

                print("Epoch: {}/{}...".format(epoch+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [21]:
# hyperparameters
n_hidden = 512
n_layers = 2
batch_size = 128
seq_length = 100
n_epochs = 3
lr = 0.001
print_every = 10

model = RNN(chars, n_hidden, n_layers, lr)
print(model)

RNN(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.001)
  (dropout): Dropout(p=0.001, inplace=False)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)


In [22]:
train(model, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=lr, print_every=print_every)

100%|██████████| 3/3 [35:07<00:00, 702.64s/it]


Epoch: 1/3... Step: 10... Loss: 3.2230... Val Loss: 3.2260
Epoch: 1/3... Step: 20... Loss: 3.1178... Val Loss: 3.1430
Epoch: 1/3... Step: 30... Loss: 3.1130... Val Loss: 3.1271
Epoch: 1/3... Step: 40... Loss: 3.0887... Val Loss: 3.1213
Epoch: 1/3... Step: 50... Loss: 3.1223... Val Loss: 3.1200
Epoch: 1/3... Step: 60... Loss: 3.1007... Val Loss: 3.1176
Epoch: 1/3... Step: 70... Loss: 3.0913... Val Loss: 3.1175
Epoch: 1/3... Step: 80... Loss: 3.1096... Val Loss: 3.1130
Epoch: 1/3... Step: 90... Loss: 3.1046... Val Loss: 3.1037
Epoch: 1/3... Step: 100... Loss: 3.0771... Val Loss: 3.0807
Epoch: 1/3... Step: 110... Loss: 3.0323... Val Loss: 3.0254
Epoch: 1/3... Step: 120... Loss: 2.9315... Val Loss: 2.9375
Epoch: 1/3... Step: 130... Loss: 2.9602... Val Loss: 2.8542
Epoch: 2/3... Step: 140... Loss: 2.7949... Val Loss: 2.7726
Epoch: 2/3... Step: 150... Loss: 2.6988... Val Loss: 2.9193
Epoch: 2/3... Step: 160... Loss: 2.6476... Val Loss: 2.6562
Epoch: 2/3... Step: 170... Loss: 2.5511... Val Lo

In [23]:
# Top-k sampling used
def predict(model, char, h = None, top_k = None):
    #tensor inputs
    x = np.array([[model.char2int[char]]])
    x = one_hot_encode(x, len(model.chars))
    inputs = torch.from_numpy(x)

    h = tuple([each.data for each in h])
    out, h = model(inputs, h)
    # getting char probabilities
    p = func.softmax(out, dim=1).data
    #get top characters
    if top_k is None:
        top_ch = np.arange(len(model.chars))
    else:
        p, top_ch = p.topk(top_k)
        top_ch = top_ch.numpy().squeeze()

    # select the likely next character with some element of randomness
    p = p.numpy().squeeze()
    char = np.random.choice(top_ch, p=p/p.sum())

    # return the encoded value of the predicted char and hidden state
    return model.int2char[char], h

In [24]:
def sample(model, size, prime='The', top_k = None):
    model.eval()  #evaluation mode

    # Running through prime characters
    chars = [ch for ch in prime]
    h = model.init_hidden(1)
    for ch in prime:
        char, h = predict(model, ch, h, top_k = top_k)

    chars.append(char)

    # Passing prev char and getting new one
    for i in range(size):
        char, h = predict(model, chars[-1], h, top_k = top_k)
        chars.append(char)
    return ''.join(chars)

In [25]:
print(sample(model, 1000, prime='Anna', top_k=5))

Annad of and has soming. "To shiss therester, and shith the said his fentiting of him. He sad
tha dersed the could of a preated this tha dead as the cardor her his dacking harders ander, hit
attoor her the mastere to beat the parer thing har hould the sad thome thine han stale and andor the
heas to be that and her, and and has deas frongh with of ham all the sis astere as the hang his, wat
her hore thoughing
sould and har at to ham
anes, he
prostions and to
the sered to sade to
beno he
werlits of then stere has hen at han a froos at had. He
sing alle and anding and thet and steled oun he sin the was, and the
pound har her astire his soont har her. The pas than somate, and all the serting to be what his was ate han astelled."

"The wis ofer ater, and alk the sad as and ther the her, all, be atten his, andad his santed to thind tho sander. He countren,
shis her atrowe him thin sout in ther. The proster aster sime, and, and she hid not in the simithing tith her here to thoughter a to to t

#### Due to computational limitation, only few epochs were run but as we can see from the above paragraph of text generated, which minimal training, some of the commonly used words are generated without any spelling mistakes. With more training epochs, we can generate text with complete sense and no errors.