<a href="https://colab.research.google.com/github/Sindhu213/Pytorch/blob/main/NLP/rnn_language_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount("/content/drive",force_remount=True)

Mounted at /content/drive


In [2]:
%cd drive/My\ Drive/assets

/content/drive/My Drive/assets


In [3]:
import re
import torch
from pathlib import Path
from torch import nn, Tensor
from typing import List,Tuple
from torchtext.vocab import vocab
from collections import Counter,OrderedDict
from torch.utils.data import DataLoader,Dataset

## Text Preprocessing

In [4]:
file_dir = Path('./AndThenThereWereNone.txt')
with open(file_dir, 'r') as file:
  text = file.read()

In [5]:
def get_tokenizer(sentence):
  tokenized = re.sub(r'[^\w\s]+',' ',sentence.lower())
  return tokenized.split()

In [6]:
counter = Counter(get_tokenizer(text))
sorted_by_freq = sorted(counter.items(),key=lambda x: x[1], reverse=True)
ordered_dict = OrderedDict(sorted_by_freq)

Vocab = vocab(ordered_dict,min_freq=2,specials=["<unk>"])    
Vocab.set_default_index(0)

In [7]:
text_pipeline = lambda x: Vocab(get_tokenizer(x))  

In [8]:
class TextDataset:

  def __init__(self,input:List[str],seq_length:int):    
    self.input = input
    self.sl = seq_length

  def collate(self) -> Tuple[Tensor,Tensor]:
    container = []
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    for i in range(0,len(self.input)-self.sl,self.sl):
        data = torch.tensor(self.input[i:i+self.sl], dtype=torch.float32, device=device)   
        label = torch.tensor(self.input[i+1:i+self.sl+1], dtype=torch.float32, device=device)    
        container.append((data,label))

    return container

In [9]:
input_data = text_pipeline(text)

In [10]:
## sanity check

torch.manual_seed(42)
input_dataset = TextDataset(input_data,seq_length =4).collate()
test_dataloader = DataLoader(input_dataset, batch_size=3,shuffle=True,drop_last=False)

for input,label in test_dataloader:
  print("Input: ",input)
  print("Label: ",label)
  break

Input:  tensor([[  12.,   16., 1892.,   58.],
        [  23.,   76.,   19.,   46.],
        [  11.,  153.,   34.,   64.]])
Label:  tensor([[  16., 1892.,   58.,   12.],
        [  76.,   19.,   46.,  444.],
        [ 153.,   34.,   64.,  529.]])


In [11]:
seq_length = 60       ## small seq_length due to long term dependency problem, might change 
batch_size = 32

torch.manual_seed(42)
input_dataset = TextDataset(input_data,seq_length).collate()
dataloader = DataLoader(input_dataset,batch_size=batch_size,drop_last=True,shuffle=True)  

## Model Definition

In [12]:
vocab_size = len(Vocab)
embed_dim = 128
rnn_hidden_size = 64

In [58]:
class LanguageModelling(nn.Module):

  def __init__(self,vocab_size,embed_dim,rnn_hidden_dim):
    super(LanguageModelling,self).__init__()
    self.embedding = nn.Embedding(vocab_size, embed_dim)  
    self.lstm_layer1 = nn.LSTM(embed_dim,rnn_hidden_size,batch_first=True)   
    self.lstm_layer2 = nn.LSTM(rnn_hidden_dim,vocab_size,batch_first=True)
    self.softmax = nn.Softmax(dim=1)

  def forward(self,input,hidden_1,cell_1,hidden_2,cell_2):
    input = input.to(torch.int64) 
    out = self.embedding(input)
    out,(_,_) = self.lstm_layer1(out,(hidden_1,cell_1))
    out,(_,_) = self.lstm_layer2(out,(hidden_2,cell_2))    ## might output of hidden and cell states if needed
    out = self.softmax(out) 
    return out

  @staticmethod
  def init_hidden_and_cell(batch_size,rnn_hidden_size):
    hidden = torch.zeros(1,batch_size,rnn_hidden_size)
    cell = torch.zeros(1,batch_size,rnn_hidden_size)
    return hidden,cell

## Model Training and Evaluation

In [59]:
model = LanguageModelling(vocab_size,embed_dim,rnn_hidden_size)
model

LanguageModelling(
  (embedding): Embedding(2618, 128)
  (lstm_layer1): LSTM(128, 64, batch_first=True)
  (lstm_layer2): LSTM(64, 2618, batch_first=True)
  (softmax): Softmax(dim=1)
)

In [60]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.001) 

In [61]:
num_epochs = 100
def train(data_iter):
  for epoch in range(num_epochs):
    x_batch, y_batch = next(iter(data_iter))
    hidden, cell = model.init_hidden_and_cell(batch_size)
    optimizer.zero_grad()
    loss = 0.0
    for c in range(seq_length):
      out, hidden, cell = model(x_batch[:,c],hidden,cell)
      loss += loss_fn(y_batch[:,c],out)
    loss.backward()
    optimizer.step()
    loss = loss.item()/seq_length
    if epoch % 10 == 0:
      print(f'Epoch {epoch} loss: {loss:.4f}')

In [62]:
train(dataloader)

TypeError: ignored

In [None]:
def generate_text(seeded_text,max_length):
  pass