<a href="https://colab.research.google.com/github/MortezaMahdaviMortazavi/DeepLearning-Introduction/blob/master/RNN/TokenLevelRNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import numpy as np
import tensorflow as tf
import string
import re
from torch.autograd import Variable

In [10]:
def getRandomChunk(text,chunk_len=500):
  start_idx = np.random.randint(0,len(text)-chunk_len) # because if we set 0 to len(text) it may random chunk length become less than chunk_len
  end_idx = start_idx + chunk_len + 1
  theChunk = text[start_idx:end_idx]
  return theChunk

# getRandomChunk(corpus)

In [11]:
def char_to_tensor(string):
  tensor = torch.zeros((len(string))).long()
  for c in range(len(string)):
    tensor[c] = all_characters.index(string[c])
  return Variable(tensor)

In [12]:
# for word to vector you shoud get some sentences and isolate words in count of n and create tensor 

In [13]:
SOS_TOKEN = 0
EOS_TOKEN = 1
class Language:
  def __init__(self,corpus,name):
    self.name = name # name of target language
    self.dataset = corpus.split()
    self.vocab = len(sorted(corpus)) # all of unique character
    self.word2count = {}
    self.word2idx = {
        "SOS_TOKEN":SOS_TOKEN,
        "EOS_TOKEN":EOS_TOKEN
    }
    self.idx2word = {
        SOS_TOKEN:"SOS_TOKEN",
        EOS_TOKEN:"EOS_TOKEN"
    }

  def tokenizer(self):
    current_idx = 2
    for word in self.dataset:
      if word not in self.word2idx:
        # words = word.replace(string.punctuation,'')
        self.word2idx[word] = current_idx
        self.idx2word[current_idx] = word
        self.word2count[word] = 1
        current_idx += 1
      else:
        self.word2count[word] += 1

  @property
  def getWord2Idx(self):
    return self.word2idx
  @property
  def getIdx2Word(self):
    return self.idx2word

  @property
  def getWord2Count(self):
    return self.word2count

  @property
  def getCharacters(self):
    return self.vocab

corpus = open('shakespeare.txt','r').read()
language = Language(corpus=corpus,name='English')
corpus.split('\n')[:10]

['THE SONNETS',
 '',
 'by William Shakespeare',
 '',
 'From fairest creatures we desire increase,',
 "That thereby beauty's rose might never die,",
 'But as the riper should by time decease,',
 'His tender heir might bear his memory:',
 'But thou contracted to thine own bright eyes,',
 "Feed'st thy light's flame with self-substantial fuel,"]

In [14]:
class RNN(nn.Module):
  def __init__(self,input_size,hidden_size,output_size,num_layers):
    super(RNN,self).__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    self.encoder = nn.Embedding(input_size,hidden_size)
    self.dropout = nn.Dropout(0.2)
    self.lstm = nn.LSTM(hidden_size,hidden_size,num_layers,batch_first=True)
    self.decoder = nn.Linear(hidden_size,output_size)

  def forward(self,X,hidden,cell):
    X = X.to(self.device)
    output = self.encoder(X)
    output = self.dropout(output)
    output , (hidden,cell) = self.lstm(output.unsqueeze(1),(hidden,cell))
    output = self.decoder(output.reshape(output.shape[0],-1))
    # output = self.decoder(output)
    return output , hidden , cell
    

  def init_hidden(self,batch_size):
    hidden = torch.zeros(self.num_layers,batch_size,self.hidden_size).to(self.device)
    cell = torch.zeros(self.num_layers,batch_size,self.hidden_size).to(self.device)

    hidden = Variable(hidden)
    cell = Variable(cell)
    return hidden , cell


In [15]:
def model_testing():
  X = word2vec(lang=language,sentences=sentences)
  model = RNN(len(X),len(X),len(X),num_layers=25)
  hidden , cell = model.init_hidden(1)
  out = model.forward(X,hidden,cell)
  print(f"output shape is {out.shape}")
  print(f"hidden and cell shape is {hidden.shape}")
  print(f"out is {out}")
# X = word2vec(lang=language,sentences=sentences)
# model = RNN(len(word2idx),len(X),len(X),num_layers=25)
# hidden , cell = model.init_hidden(100)
# model.forward(X,hidden,cell)[0]

In [73]:
class Generator:
  def __init__(self,file):
    super().__init__()
    self.n_sentences = 15
    self.max_words = 100
    self.batch_size = 1
    self.hidden_size = 256
    self.num_layers = 10
    self.lr = 0.005
    self.epochs = 1000
    self.sentences = self.corpusProcessing(file)[0]
    self.corpus = self.corpusProcessing(file)[1]
    self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    self.language = None  
    self.rnn = None
    self.optimizer = None
    self.criterion = None
    

  def setModel(self,input_size,hidden_size,num_layers):
    self.rnn = RNN(
        input_size=input_size,
        hidden_size=hidden_size,
        output_size = input_size,
        num_layers = num_layers
      )
  
  def tokenize(self):
    self.language.tokenizer()
    
  def setLanguage(self,name):
    self.language = Language(self.corpus,name)
    self.tokenize()
  
  def setOptimizer(self):
    self.optimizer = torch.optim.Adam(filter(
        lambda p:p.requires_grad,self.rnn.parameters()),lr=self.lr)
    
  def setCriterion(self):
    self.criterion = nn.CrossEntropyLoss()



  def corpusProcessing(self,file):
    corpus = open(file,'r').read() # read text file
    corpus.translate(str.maketrans('','',string.punctuation)) # try to eliminate punctuations
    new_corpus = re.sub(r'[^\w\s]','',corpus) # eliminate all punctuations that remind
    all_characters = string.printable # all characters that exists in keyboard
    n_characters = len(all_characters) # length of characters
    sentences = new_corpus.split('\n') # split corpus base on sentences
    for item in sentences:
      if item == '' or item == ' ' or item == '  ' or item == '   ': # delete all sentences that are empty
        sentences.pop(sentences.index(item))
    return sentences , new_corpus



  def getChunkOfSentences(self):
    start_idx = np.random.randint(0,len(self.sentences)-self.n_sentences)
    end_idx = start_idx + self.n_sentences + 1
    chunk_sentences = self.sentences[start_idx:end_idx]
    theChunk = ''
    for sen in chunk_sentences:
      theChunk += (sen) + ' '
    return theChunk

  # def word2vec(self,lang,chunk_corpus):
  #   # Default argument values are evaluated at function define-time, but self is an argument only available at function call time.
  #   # Thus arguments in the argument list cannot refer each other
  #   # chunk_corpus = getChunkOfSentences(sentences=sentences,n_sentence=15) # get a chunk of corpus
  #   separate_words = chunk_corpus.split()[:self.max_words] # separate words in number of max_length
  #   # print(len(separate_words))
  #   tensor = torch.zeros(self.max_words).long() # create a tensor of zeros in length of max_length
  #   word2idx = lang.getWord2Idx
  #   for idx in range(self.max_words): 
  #     tensor[idx] = word2idx[separate_words[idx]] # for each index of tensor,add word2idx relation index
    
  #   return tensor
  def word2vec(self, lang, chunk_corpus):
    separate_words = chunk_corpus.split()
    tensor = torch.zeros(self.max_words).type(torch.LongTensor)
    word2idx = lang.getWord2Idx
    for idx in range(self.max_words):
        if idx < len(separate_words):
            tensor[idx] = word2idx[separate_words[idx]]
    return tensor

  def get_random_batch(self):
    chunks = []
    for i in range(self.batch_size):
      chunk = self.getChunkOfSentences()
      chunk = chunk.strip()
      chunks.append(chunk)
    # return chunks
    tensor_input = torch.zeros(self.batch_size,self.max_words-1)
    tensor_target = torch.zeros(self.batch_size,self.max_words-1)

    for i in range(self.batch_size):
      # w2vec = self.word2vec(lang=self.language,chunk_corpus=chunks[i])
      # tensor_input[i,:] = w2vec[:-1]
      # tensor_target[i,:] = w2vec[1:]
      tensor_input[i,:] = self.word2vec(lang=self.language,chunk_corpus=chunks[i])[:-1]
      tensor_target[i,:] = self.word2vec(lang=self.language,chunk_corpus=chunks[i])[1:]
      # break
      
    tensor_input = tensor_input.long()
    tensor_target = tensor_target.long()

    return tensor_input , tensor_target

        
  def generate(self,initial_str='From some',predict_len=100,temperature=0.85):
    hidden , cell = self.rnn.init_hidden(self.batch_size)
    initial_input = self.word2vec(self.language,initial_str)
    predicted = initial_str
    for p in range(len(initial_str.split())-1):
      _ , hidden,cell = self.rnn(
          initial_input[p].view(1).to(self.device),hidden,cell
      )
    last_token = initial_input[-1]


    for p in range(predict_len):
      print(last_token , initial_input.shape)
      break
      out , hidden , cell = self.rnn(
          last_token.view(1).to(self.device),hidden,cell
      )
      output_dist = out.data.view(-1).div(temperature).exp()
      top_char = torch.multinomial(output_dist, 1)[0]
      predicted_token = self.language.getIdx2Word[int(top_char.cpu())]
      predicted += predicted_token
      last_token = self.word2vec(self.language,predicted_token)

    return predicted

  def train(self):
    self.rnn = self.rnn.to(self.device)
    print("************ Training is start *********")
    for epoch in range(self.epochs+1):
      # try:
      inp , target = self.get_random_batch()
      hidden , cell = self.rnn.init_hidden(batch_size=self.batch_size)

      self.rnn.zero_grad()
      loss = 0
      inp = inp.to(self.device)
      target = target.to(self.device)

      for c in range(self.max_words-1):
        # return self.rnn(inp[:,c],hidden,cell)
        # break
        # print(inp[:,c] , target[:,c])
        out , hidden,cell = self.rnn(inp[:,c],hidden,cell)
        loss += self.criterion(out,target[:,c])


      loss.backward()
      self.optimizer.step()
      loss = loss.item()/self.batch_size
      # print(f"loss is {loss}")
      
      if epoch % 10 == 0:
        print(f"In epoch {epoch} loss is: {loss:2f}")
        self.generate()
      # except:
        # print(f"ExceptionError raised in epoch {epoch}")


  # def generate(self, input_seq, max_length):
  #   hidden,cell_state = self.rnn.init_hidden(self.batch_size)
  #   output_seq = torch.tensor(input_seq).to(self.device)
  #   for i in range(max_length):
  #     print(output_seq.shape)
  #     break
  #     logits, (hidden, cell_state) = self.rnn.forward(output_seq, hidden, cell_state)
  #     probs = torch.softmax(logits, dim=-1)
  #     output_token = torch.multinomial(probs, 1).item()
  #     if output_token == EOS_TOKEN:
  #         break
  #     output_seq = torch.cat([output_seq, torch.tensor([output_token]).to(self.device)])
  #   return output_seq.tolist()


  # def generate_text(self,input_text,max_length):
  #   input_tokens = input_text.split() # tokenize input text
  #   input_indices = [self.language.getWord2Idx[token] for token in input_tokens] # convert tokens to indices
  #   output_indices = self.generate(input_indices,max_length=max_length)
  #   output_tokens = [self.language.getIdx2Word[indice] for indice in output_indices]
  #   return ''.join(output_tokens)


In [74]:
generator = Generator('shakespeare.txt')
generator.setLanguage('English')
input_size = len(generator.language.getWord2Idx)
hidden_size = 512
num_layers = 15
generator.setModel(input_size=input_size,hidden_size=hidden_size,num_layers=num_layers)
generator.setOptimizer()
generator.setCriterion()
generator.language.getWord2Idx
generator.train()

************ Training is start *********
In epoch 0 loss is: 806.611328
tensor(0) torch.Size([100])


KeyboardInterrupt: ignored