In [2]:
import torch as T
from torch.optim import Adam
import torch.nn as nn
import numpy as np

In [3]:
class DATA_LOADER():
    def __init__(self, filename):
        self.filename = filename
        with open(self.filename, 'r') as f:
            text_data = f.read()

        # TOKENISATION FOR DATA ENCODING
        self.char_in_text = tuple(set(text_data))
        self.n_characters = len(self.char_in_text)

        # DICT FOR KEY AS NOS. AND VALUE AS CHARACTER
        self.I2C = dict(enumerate(self.char_in_text))
        self.C2I = {v: k for k, v in self.I2C.items()}

        # ENCODING THE GIVEN TEXT
        self.encoded_text = np.array([self.C2I[i] for i in text_data])

    def load_data(self):
        """
        RETURNS ENCODED TEXT AND LENGTH OF SEQ
        """
        return self.encoded_text, self.n_characters


    def one_hot(self, inp, length):
        a = np.zeros((inp.size, length))
        a[np.arange(a.shape[0]), inp.flatten()] = 1
        a = a.reshape((*inp.shape, length))
        return a

    #encoded_text, no_of_char = load_data(r'C:\Users\krshr\Desktop\Files\Deep_learning\NLP_RNN_LSTM\CHARACTER_LEVEL_RNN\data.txt')
    #encoded_text = one_hot(encoded_text, no_of_char)

    def get_batch(self, a, BATCH_SIZE, SEQ_LENGTH):

        PER_BATCH_ELEMENTS = BATCH_SIZE * SEQ_LENGTH    # 400
        N = len(a) // PER_BATCH_ELEMENTS                # 3854

        a = a[:N * PER_BATCH_ELEMENTS]                  # (1541600,)
        a = a.reshape((BATCH_SIZE, -1))                 # (8, 192700)

        for i in range(0, a.shape[1], SEQ_LENGTH):
            
            x = a[:, i : i+SEQ_LENGTH]
            y = np.zeros_like(x)

            try:
                y[:, :-1], y[:, -1] = x[:, 1:], x[:, N + SEQ_LENGTH]
            except:
                y[:, :-1], y[:, -1] = x[:, 1:], x[:, 0]
            
            yield x, y

In [4]:
class  LSTM(nn.Module):
    def __init__(self, in_dims, hid_dims, n_layers, out_dims):
        super().__init__()

        self.in_dims = in_dims
        self.hid_dims = hid_dims
        self.n_layers = n_layers
        self.out_dims = out_dims
        self.drop_prob=0.5
        self.lr=0.001
        self.device = 'cuda' if T.cuda.is_available() else 'cpu'

        self.LSTM = nn.LSTM(self.in_dims, self.hid_dims, n_layers,
                            dropout=self.drop_prob, batch_first=True)

        self.dropout = nn.Dropout(self.drop_prob)

        self.fc = nn.Linear(self.hid_dims, self.out_dims)

    def forward(self, x, hidden):
        
        out, hidden = self.LSTM(x, hidden)

        out = self.dropout(out)

        out = out.contiguous().view(-1, self.hid_dims)

        out = self.fc(out)

        return out, hidden


    def init_hidden(self, batch_size):
        
        weight = next(self.parameters()).data

        hidden = (weight.new(self.n_layers, batch_size, self.hid_dims).zero_().to(self.device),
                  weight.new(self.n_layers, batch_size, self.hid_dims).zero_().to(self.device))
                

        return hidden

In [11]:
def train(data_loader, net_object, dataset, epochs=10, batch_size=8, 
                seq_length=50, lr=0.001, clip_grad_value=5, val_frac=0.1, print_every=10, device = 'cpu'):
    net_object.train()

    data, no_of_char = dataset

    optimizer = Adam(net_object.parameters(), lr = lr)
    loss_fn = nn.CrossEntropyLoss()

    # VALIDATION SPLIT OF 0.1 CAN BE DONE BY FINDING THE VAL_INDEX AND SPLITTING IT
    val_index = int(len(data) * (1 - val_frac))
    train_data, val_data = data[:val_index], data[val_index:]

    ctr = 0
    min_val_loss = 10
    for e in range(epochs):
        
        # CREATES HIDDEN LAYER (H, C) FOR LTM AND STM
        h = net_object.init_hidden(batch_size)

        for x, y in data_loader.get_batch(train_data, batch_size, seq_length):
            ctr += 1
            h = tuple([hh.data for hh in h])
            net_object.zero_grad()
            x = data_loader.one_hot(x, no_of_char)
            x = T.Tensor(x).to(device)
            # PYTORCH EXPECTS TARGET OF TYPE LONG FOR CE-LOSS
            y = T.Tensor(y).view(batch_size*seq_length).long().to(device)
            out, h = net_object(x, h)
            #print(out.shape, y.shape, hidden[0].shape, hidden[1].shape)
            loss = loss_fn(out, y)
            loss.backward()

            # TO PREVENT THE PROBLEM OF EXPLODING GRADIENTS WE CLIP THE GRADIENTS
            nn.utils.clip_grad.clip_grad_norm_(net_object.parameters(), clip_grad_value)
            optimizer.step()
            
            if ctr % print_every == 0:
                # Get validation loss
                val_h = net_object.init_hidden(batch_size)
                val_losses = []
                net_object.eval() # NETWORK SET FOR EVALUVATION TO STOP BACK PROP
                for x, y in data_loader.get_batch(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = data_loader.one_hot(x, no_of_char)
                    x, y = T.Tensor(x), T.Tensor(y).view(batch_size*seq_length).long()

                    val_h = tuple([each.data for each in val_h])

                    inputs, targets = x.to(device), y.to(device)
                    

                    output, val_h = net_object(inputs, val_h)
                    val_loss = loss_fn(output, targets)

                    val_losses.append(val_loss.item())

                net_object.train()  

                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(ctr),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))
                min_val_loss = min(val_loss, min_val_loss)
                if val_loss < loss and val_loss <= min_val_loss:
                    T.save(net_object.state_dict(), r'/content/drive/MyDrive/CHARACTER_LEVEL_RNN/checkpoint.pth')

In [12]:
data_loader = DATA_LOADER(r'/content/drive/MyDrive/CHARACTER_LEVEL_RNN/data.txt')
encoded_text, no_of_char = data_loader.load_data()

lstm = LSTM(no_of_char, 512, 2, no_of_char)
lstm.to(lstm.device)
#print(lstm)
batch_size = 128
seq_length = 100
n_epochs = 20  # start smaller if you are just testing initial behavior

# train the model
train(data_loader, lstm, (encoded_text, no_of_char), epochs=n_epochs, batch_size=batch_size,
      seq_length=seq_length, lr=0.001, print_every=10, device='cuda')

Epoch: 1/20... Step: 10... Loss: 3.2653... Val Loss: 3.2310
Epoch: 1/20... Step: 20... Loss: 3.1762... Val Loss: 3.1426
Epoch: 1/20... Step: 30... Loss: 3.1483... Val Loss: 3.1291
Epoch: 1/20... Step: 40... Loss: 3.1504... Val Loss: 3.1228
Epoch: 1/20... Step: 50... Loss: 3.1297... Val Loss: 3.1217
Epoch: 1/20... Step: 60... Loss: 3.1138... Val Loss: 3.1208
Epoch: 1/20... Step: 70... Loss: 3.1102... Val Loss: 3.1182
Epoch: 1/20... Step: 80... Loss: 3.1128... Val Loss: 3.1155
Epoch: 1/20... Step: 90... Loss: 3.0846... Val Loss: 3.1099
Epoch: 1/20... Step: 100... Loss: 3.0915... Val Loss: 3.0965
Epoch: 2/20... Step: 110... Loss: 3.0608... Val Loss: 3.0646
Epoch: 2/20... Step: 120... Loss: 2.9805... Val Loss: 2.9918
Epoch: 2/20... Step: 130... Loss: 2.9263... Val Loss: 2.9178
Epoch: 2/20... Step: 140... Loss: 2.8005... Val Loss: 2.7771
Epoch: 2/20... Step: 150... Loss: 2.7759... Val Loss: 2.7168
Epoch: 2/20... Step: 160... Loss: 2.6518... Val Loss: 2.6202
Epoch: 2/20... Step: 170... Loss:

**TESTING WITH THE ONLINE MODEL**

In [23]:
lstm.eval()
import torch.nn.functional as F
prime = 'Anna'

# TO GIVE A PROPER START TO THE PREDICTION
doc = [ch for ch in prime]

h = lstm.init_hidden(1)

TOP_K = 5

def prediction(dataloader, lstm, character, h):
    h = tuple([each.data for each in h])

    character = np.array([[dataloader.C2I[character]]])
    character = dataloader.one_hot(character, dataloader.n_characters)
    character = T.Tensor(character)

    character = character.to('cuda')

    out, h = lstm(character, h)

    prob = F.softmax(out, dim = 1).data

    prob, top_ch = prob.topk(TOP_K)
    top_ch, prob = top_ch.cpu().numpy().squeeze(), prob.cpu().numpy().squeeze()

    char = np.random.choice(top_ch, p = prob/prob.sum())

    return dataloader.I2C[char], h

for ch in doc:
    char, h = prediction(data_loader, lstm, ch, h)

doc.append(char)

for i in range(1000):
    char, h = prediction(data_loader, lstm, doc[-1], h)
    doc.append(char)

print(''.join(doc))

Anna Alexendrovna's later wished to say
and would have to succe divines, with the country of his.

"I'm not thinking of the rain of a son well, that
you were about their perflections
of the sound that have a
possibility of
his
wife, to brought them with her sincery, what in the wead as some work.

"You know," answered Levin, gave him something
something the sat down.

"Well, as to discrest man in the means to get to be, and I should be as a little," said Levin.

"Oh, I have think I will not talk of to have to be a significance to be say, a mind to supposing you there was to say.

"I was
a pretty was solether to the
religion they, and
that I
could so say ill in a self-girls. But was the famely confinited the certain of
thichif tears when you know why is their same the moment
wourd a stear and the princess and something, I've come
into the sand that I can great this feelings of all other, would have
soliced as a present mild and she have both there's now it went to me to the same hunor a

**LOADING THE MODEL WITH THE STATE-DICT AND TESTING**

In [27]:
lstm = LSTM(data_loader.n_characters, 512, 2, data_loader.n_characters)

chkpt = T.load(r'/content/drive/MyDrive/CHARACTER_LEVEL_RNN/checkpoint.pth')
lstm.load_state_dict(chkpt)
lstm.to('cuda')
lstm.eval()

import torch.nn.functional as F
prime = 'Anna'

# TO GIVE A PROPER START TO THE PREDICTION
doc = [ch for ch in prime]

h = lstm.init_hidden(1)

TOP_K = 5

def prediction(dataloader, lstm, character, h):
    h = tuple([each.data for each in h])

    character = np.array([[dataloader.C2I[character]]])
    character = dataloader.one_hot(character, dataloader.n_characters)
    character = T.Tensor(character)

    character = character.to('cuda')

    out, h = lstm(character, h)

    prob = F.softmax(out, dim = 1).data

    prob, top_ch = prob.topk(TOP_K)
    top_ch, prob = top_ch.cpu().numpy().squeeze(), prob.cpu().numpy().squeeze()

    char = np.random.choice(top_ch, p = prob/prob.sum())

    return dataloader.I2C[char], h

for ch in doc:
    char, h = prediction(data_loader, lstm, ch, h)

doc.append(char)

for i in range(1000):
    char, h = prediction(data_loader, lstm, doc[-1], h)
    doc.append(char)

print(''.join(doc))

Anna it serioully corround, but she droused her, and he
could be
dispositible, and seemed.

"I should have a mature, that I want to be insult. It's so
matter at that
secration of her
musch, and would never be considering that it was stort, too must
hissout and sething tears. Why she
had to see them.
What do
you ask you the porstion
made the strusgle of it?" the doctor's smile was talking of the door.

"Yom to
go to something with you," said Varenka in said of
a finseros of the presence.

"What shall I should not be sure and what you do not ask me," said
Levin, smiling, "I want to see her arms to the master."

"Why, it were that you was to be such as a suffer of tender short for a listling for that man's answer and the carriage, but yes tell her husband and was now he had been an interest it asseed, but
and there's the convisted of sorry with the contorried, there is so sereat in the same socint,
why have not but a lot on the consequence."

"If you do you were!" he asked, graying smile.