<a href="https://colab.research.google.com/github/devWithDeepak/CodeBreakerProject/blob/master/char_rnn_book_writer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [35]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
import torch
import numpy as np
from torch import nn
import torch.nn.functional as F

In [0]:
def one_hot_encode(arr, n_labels):
    x = np.eye(n_labels)[arr]
    return x.astype(float)

In [0]:
def get_batches(arr, batch_size, seq_length):
    total_batch_size = batch_size * seq_length
    
    # get total number of batches
    n_batches = len(arr) // total_batch_size
    
    # charactes included
    arr = arr[:n_batches * total_batch_size]
    
    # resize arr to batch size
    arr = arr.reshape((batch_size, -1))
    
    for n in range(0, arr.shape[1], seq_length):
        # features
        x = arr[: , n: n + seq_length]
        # targets
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[: , 1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[: , 1] = x[:, 1:], arr[:, 0]
            
    yield x, y

In [39]:
# check if cuda is available
train_on_gpu = torch.cuda.is_available()
if(train_on_gpu):
    print("Training on GPU")
else:
    print("Training on CPU")

Training on CPU


In [0]:
class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.2, lr=0.001):
        super().__init__()
        
        # create dictionary from text
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch:ii for ii, ch in self.int2char.items()}
        
        # create instance variable
        self.n_hidden = n_hidden
        self.n_layers = n_layers
        self.drop_prob = drop_prob
        self.lr = lr
        
        # define lstm layer
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, dropout=drop_prob, batch_first=True)
        
        # Dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        # define fully connected layer
        self.fc = nn.Linear(n_hidden, len(self.chars))
        
    def forward(self, x, hidden):
        # lstm layer 
        r_out, hidden = self.lstm(x, hidden)

        # dropout layer
        r_out = self.dropout(r_out)

        # reshape output
        r_out = r_out.contiguous().view(-1, n_hidden)

        # fully connected layer
        output = self.fc(r_out)

        return output, hidden

    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        # create two tensors for two hidden layer in lstm
        if(train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        return hidden

In [0]:
# train function
def train(net, data, n_epochs = 10, batch_size=10, seq_length=50,
          lr=0.01, val_split = 0.1, clip=5, print_every=10):
    # enable training mode
    net.train()
    
    # define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    
    # validation index
    val_idx = int(len(data) * (1-val_split))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if(train_on_gpu):
        net.cuda()
    counter = 0
    n_labels = len(net.chars)
    for epoch in range(n_epochs):
        # initialize weights
        h = net.init_hidden(batch_size)
        for x, y in get_batches(data, batch_size, seq_length):
            # one hot encoding
            x = one_hot_encode(x, n_labels)
            # convert into tensor
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            # switch to GPU
            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()
                
            # hidden variable state 
            h = tuple([each.data for each in h])
            
            # initialize gradients of all tensor to zero
            optimizer.zero_grad()
            # train the model
            output, h = net(inputs, h)
            # caluclate loss
            loss = criterion(output, targets.view(batch_size * seq_length).long())
            # backpropogation
            loss.backward()
            # clipping to prevent from exploding gradient
            nn.utils.clip_grad_norm(net.parameters(), clip)
            optimizer.step()
            # validation data
            if counter % print_every == 0:
                # initialize validation hidden state
                val_h = net.init_hidden(batch_size)
                
                val_losses = []
                # enable evaluation mode
                for x,y in get_batches(val_data, batch_size, seq_length):
                    x = one_hot_encode(x, n_labels)
                    # convert into tensor
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    # validation hidden state
                    val_h = tuple([each.data for each in val_h])
                    
                    # convert into cuda
                    if(train_on_gpu):
                        inputs, targets = x.cuda(), y.cuda()
                    else:
                        inputs, targets = x, y
                    
                    # output
                    output, h = net(inputs, h)
                    # calculate validation loss
                    val_loss = criterion(output, targets.view(batch_size * seq_length).long())
                    val_losses.append(val_loss)
                net.train()
                print("Epoch: {}".format(epoch),
                     "step: {}".format(counter),
                     "loss: {:.4f}".format(loss.item()),
                     "val_loss: {:.4f}".format(np.mean(val_losses)))
                    

In [45]:
with open("gdrive/My Drive/dataset/anna.txt") as fr:
    text = fr.read()
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch:ii for ii, ch in int2char.items()}
encoded = np.array([char2int[ch] for ch in text])

n_hidden = 512
n_layers = 2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.2)
  (dropout): Dropout(p=0.2)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)


In [46]:
n_epochs = 20
seq_length = 100
batch_size = 128

# train the model
train(net, encoded, n_epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

RuntimeError: ignored