<a href="https://colab.research.google.com/github/haruka-inb/pytorch_practice/blob/main/RNNLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RNN-based Language Model

In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.nn.utils import clip_grad_norm_

In [None]:
print(torch. __version__)


2.1.0+cu121


In [None]:
class Dictionary(object):
  """
  This class maps words to index and te index to the words.
  """

  def __init__(self):
    self.word2idx = {}
    self.idx2word = {}
    self.idx = 0

  def add_word(self, word):
    if not word in self.word2idx:
      self.word2idx[word] = self.idx
      self.idx2word[self.idx]= word
      self.idx += 1

  def __len__(self):
    return len(self.word2idx)


class Corpus():
  """
  This function reads text, tokenizes them, quantize them,
  splits them into row by batch_size, and returns it as a tensor matrices
  """
  def __init__(self):
    self.dictionary = Dictionary()

  def get_data(self, path, batch_size=20):

    # add words to the dictionary
    with open(path, 'r') as f:
      tokens = 0
      for line in f:
        words = line.split()  + ['<eos>'] # split passage by <eos> and word-level
        tokens += len(words) # count number of words
        for word in words:
          self.dictionary.add_word(word)

    # tokenize the file content
    ids = torch.LongTensor(tokens)
    token = 0
    with open(path, 'r') as f:
      for line in f:
        words = line.split() + ['<eos>']
        for word in words:
          ids[token] = self.dictionary.word2idx[word] # assign the index that I mapped to the tokenized words as tensor
          token += 1

    # reshape the long sequence by batch_size e.g. tensor (929589) -> (20, 46479)
    num_batches = ids.size(0) // batch_size
    ids = ids[:num_batches*batch_size]
    return ids.view(batch_size, -1)

In [None]:
# device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# hyper parameter
embed_size = 128
hidden_size = 1024
num_layers = 1
num_epochs = 100
num_samples = 1000
batch_size = 20
seq_length = 30
learning_rate = 0.002

# load "Penn Treebank" dataset
path = "/content/train.txt"
corpus = Corpus()
ids = corpus.get_data(path, batch_size)
vocab_size = len(corpus.dictionary)
num_batches = ids.size(1) // seq_length

print(f"unique number of vocaburaries in a text: {vocab_size}")
print(f"shape of curpos: {ids.shape}")
print(f"number of batches: {num_batches}")

unique number of vocaburaries in a text: 10000
shape of curpos: torch.Size([20, 46479])
number of batches: 1549


In [None]:
# what nn.Embedding() does is one-hot encoding using a linear layer.
# Instead giving a big one hot encoing vector, it gives the index where the 1 is mapped.

# RNN based language model
class RNNLM(nn.Module):
  def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
    super(RNNLM, self).__init__()
    self.embed = nn.Embedding(vocab_size, embed_size)
    self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
    self.linear = nn.Linear(hidden_size, vocab_size)

  # forward method
  def forward(self, x, h):
    # embed word ids to vectors
    x = self.embed(x)

    # forward propagate LSTM
    out, (h, c) = self.lstm(x, h)

    # reshape output to (batch_size*sequence_length, hidden_size)
    out = out.reshape(out.size(0)*out.size(1), out.size(2))

    # decode hidden states of all time steps
    out = self.linear(out)

    return out, (h,c)

In [None]:
model = RNNLM(vocab_size, embed_size, hidden_size, num_layers)
model

RNNLM(
  (embed): Embedding(10000, 128)
  (lstm): LSTM(128, 1024, batch_first=True)
  (linear): Linear(in_features=1024, out_features=10000, bias=True)
)

In [None]:
# load model
model = RNNLM(vocab_size, embed_size, hidden_size, num_layers).to(device)

# loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# truncated backpropagation
# propagate loss by rolling out until T time, not entire cell
def detach(states):
  return [state.detach() for state in states]

# train the model
for e in range(num_epochs):
  # set initial hidden and cell states
  states = (torch.zeros(num_layers, batch_size, hidden_size).to(device),
           torch.zeros(num_layers, batch_size, hidden_size).to(device))

  for i in range(0, ids.size(1) - seq_length, seq_length):
    # get mini-batch inputs and targets
    # model learns what the next charatcer comes, so the target is the next character of input
    inputs = ids[:, i:i+seq_length].to(device)
    targets = ids[:, (i+1):(i+1)+seq_length].to(device)

    # forward pass
    states = detach(states)
    outputs, states = model(inputs, states)
    loss = criterion(outputs, targets.reshape(-1)) # RNNLM outputs 1d while targets 2d, s reshape it

    # backward and optimizer
    optimizer.zero_grad()
    loss.backward()
    clip_grad_norm_(model.parameters(), 0.5) # rescale gradients if it's out of the threshold
    optimizer.step()

    step = (i+1) // seq_length
    if step % 1500 == 0:
      print("Epoch {}/{}, Step {}/{}, Loss: {:.4f}, Perplexity: {:5.2f}"
      .format(e+1, num_epochs, step, num_batches, loss.item(), np.exp(loss.item())))

# test the model
with torch.no_grad():
  with open('sample.txt', 'w') as f:
    # set initial hidden and cell states
    states = (torch.zeros(num_layers, 1, hidden_size).to(device),
              torch.zeros(num_layers, 1, hidden_size).to(device))

    # select one word id randomly
    prob = torch.ones(vocab_size)
    input = torch.multinomial(prob, num_samples=1).unsqueeze(1).to(device)

    for i in range(num_samples):
      # forward propagate RNN
      output, states = model(input, states)

      # sample a word id
      prob = output.exp()
      word_id = torch.multinomial(prob, num_samples=1).item()

      # fill input with sampled word id for the nect time step
      input.fill_(word_id)

      # file write
      word = corpus.dictionary.idx2word[word_id]
      word = '\n' if word == '<eos>' else word + ''
      f.write(word)

      if (i+1) % 100 == 0:
        print("Sampled {}/{} words and save to {}"
        .format(i+1, num_samples, 'sample.ttx'))

# save the model checkpoints
torch.save(model.state_dict(), "model.ckpt")

Epoch 1/100, Step 0/1549, Loss: 9.2089, Perplexity: 9985.80
Epoch 1/100, Step 1500/1549, Loss: 5.1501, Perplexity: 172.45
Epoch 2/100, Step 0/1549, Loss: 5.4171, Perplexity: 225.24
Epoch 2/100, Step 1500/1549, Loss: 4.3822, Perplexity: 80.01
Epoch 3/100, Step 0/1549, Loss: 4.3853, Perplexity: 80.26
Epoch 3/100, Step 1500/1549, Loss: 3.6564, Perplexity: 38.72
Epoch 4/100, Step 0/1549, Loss: 3.6055, Perplexity: 36.80
Epoch 4/100, Step 1500/1549, Loss: 3.1436, Perplexity: 23.19
Epoch 5/100, Step 0/1549, Loss: 3.0865, Perplexity: 21.90
Epoch 5/100, Step 1500/1549, Loss: 2.8264, Perplexity: 16.88
Epoch 6/100, Step 0/1549, Loss: 2.7846, Perplexity: 16.19
Epoch 6/100, Step 1500/1549, Loss: 2.6163, Perplexity: 13.69
Epoch 7/100, Step 0/1549, Loss: 2.5256, Perplexity: 12.50
Epoch 7/100, Step 1500/1549, Loss: 2.5134, Perplexity: 12.35
Epoch 8/100, Step 0/1549, Loss: 2.4436, Perplexity: 11.51
Epoch 8/100, Step 1500/1549, Loss: 2.4229, Perplexity: 11.28
Epoch 9/100, Step 0/1549, Loss: 2.2983, Perp

In [None]:
result = []

with open("/content/sample.txt", 'r') as f:
  for line in f:
    result.append(line.split('<eos>'))

result