<a href="https://colab.research.google.com/github/eunjiWon/SoftwareDefectPredictionMetricUsingDeepLearning/blob/master/Get_Test_Set_Cross_Entropy_Using_LSTM_With_Pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Get Test Set Cross-Entropy Using LSTM With Pytorch
I refer this site: 
  [https://machinetalk.org/2019/02/08/text-generation-with-pytorch/](https://machinetalk.org/2019/02/08/text-generation-with-pytorch/)
     

### oliver_ch1.txt as a train set and trump.txt as a test set

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np
from collections import Counter
import os
from argparse import Namespace

flags = Namespace(
    train_file='/content/oliver_ch1.txt',
    test_file='/content/trump.txt',
    seq_size=32,
    batch_size=16,
    embedding_size=64,
    lstm_size=64, # LSTM hidden size
    gradients_norm=5, # norm to clip gradients
    initial_words=['I', 'am'], # Initial words to start prediction from
    predict_top_k=5, # top k results ro sample word from
    # checkpoint_path='checkpoint',
)

def get_data_from_file(train_file, batch_size, seq_size):
    with open(train_file, 'r') as f:
        text = f.read()
    text = text.split()
    # Create two dictionaries, one to convert words into integers indices, 
    # and the other one to convert integer indices back to word tokens
    word_counts = Counter(text)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    int_to_vocab = {k: w for k, w in enumerate(sorted_vocab)}
    vocab_to_int = {w: k for k, w in int_to_vocab.items()}
    n_vocab = len(int_to_vocab)
    print('Vocabulary size', n_vocab)
    
    # Covert word tokens into integer indices. 
    # These will be the input to the network
    # We will train a mini-batch each iteration 
    # so we split the data into batches evenly. 
    # Chopping out the last uneven batch
    int_text = [vocab_to_int[w] for w in text]
    num_batches = int(len(int_text) / (seq_size * batch_size))
    in_text = int_text[:num_batches * batch_size * seq_size]
    
    # In next generation problem, 
    # the target of each input word will be its consecutive wold,
    # so we just shift the whole input data to the left by one step
    out_text = np.zeros_like(in_text)
    out_text[:-1] = in_text[1:] # in_text의 두번째 부터 out_text의 처음으로 복사
    out_text[-1] = in_text[0] # in_text의 처음을 out_text의 마지막으로 복사
    in_text = np.reshape(in_text, (batch_size, -1))
    out_text = np.reshape(out_text, (batch_size, -1))
    return int_to_vocab, vocab_to_int, n_vocab, in_text, out_text
  
def get_batches(in_text, out_text, batch_size, seq_size):
    num_batches = np.prod(in_text.shape) // (seq_size * batch_size)
    for i in range(0, num_batches * seq_size, seq_size):
        yield in_text[:, i:i+seq_size], out_text[:, i:i+seq_size]

class RNNModule(nn.Module):
  def __init__(self, n_vocab, seq_size, embedding_size, lstm_size):
    super(RNNModule, self).__init__()
    self.seq_size = seq_size
    self.lstm_size = lstm_size
    self.embedding = nn.Embedding(n_vocab, embedding_size)
    self.lstm = nn.LSTM(embedding_size, lstm_size, batch_first=True)
    self.dense = nn.Linear(lstm_size, n_vocab)

  # Take an input sequence and the previous states (hidden states) and produce the output together with states of the currents timestamp
  def forward(self, x, prev_state):
    embed = self.embedding(x)
    output, state = self.lstm(embed, prev_state)
    logits = self.dense(output)
    return logits, state # why return state variable?
  
  # Define one more method to help us set states to zero because we need to reset states at the beginning of every epoch.
  def zero_state(self, batch_size):
    return (torch.zeros(1, batch_size, self.lstm_size), # hidden state (the short-term memory)
            torch.zeros(1, batch_size, self.lstm_size)) # cell state (the long-term memory)

def get_loss_and_train_op(net, lr=0.001):
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(net.parameters(), lr=lr)
  return criterion, optimizer
  # gradient clipping doesn't apply here!

def main():
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  int_to_vocab, vocab_to_int, n_vocab, in_text, out_text = get_data_from_file(
      flags.train_file, flags.batch_size, flags.seq_size)
  test_int_to_vocab, test_vocab_to_int, test_n_vocab, test_in_text, test_out_text = get_data_from_file(
      flags.test_file, 1, flags.seq_size)
  net = RNNModule(n_vocab, flags.seq_size, flags.embedding_size, flags.lstm_size)
  net = net.to(device)
  criterion, optimizer = get_loss_and_train_op(net, 0.01)
  iteration = 0
  
  # Train
  # for each epoch, we will loop through the batches to compute loss valuse and update network's parameters.
  for e in range(50):
    batches = get_batches(in_text, out_text, flags.batch_size, flags.seq_size)
    state_h, state_c = net.zero_state(flags.batch_size)
    
    # Transfer data to GPU
    state_h = state_h.to(device)
    state_c = state_c.to(device)
    for x, y in batches: # x is in_text and y is out_text
      iteration += 1
      
      # Tell it we are in training mode
      net.train()

      # Reset all gradients
      optimizer.zero_grad()

      # Transfer data to GPU
      x = torch.tensor(x).to(device)
      y = torch.tensor(y).to(device)

      logits, (state_h, state_c) = net(x, (state_h, state_c))
      loss = criterion(logits.transpose(1, 2), y) # why we transpose the logits?
      
      # Avoid autograd which is given by Pytorch to keep track of the tensor's flow to perform back-propagation.
      state_h = state_h.detach()
      state_c = state_c.detach()

      loss_value = loss.item() # this loss is cross-entropy which is thing I want!!! 
      
      # Perform back-propagation
      loss.backward()
      
      # Gradient clipping
      _ = torch.nn.utils.clip_grad_norm_(net.parameters(), flags.gradients_norm)


      # Update the network's parameters
      optimizer.step()

      # Print the loss value and have the model generate some text for us during training
      if iteration % 100 == 0:
        print('Epoch: {}/{}'.format(e, 200), 'Iteration: {}'.format(iteration), 'Loss (C.E): {}'.format(loss_value))

      # if iteration % 1000 == 0:
      #   predict(device, net, flags.initial_words, n_vocab, vocab_to_int, int_to_vocab, top_k=5)
      #   torch.save(net.state_dict(), 'checkpoint_pt/model-{}.pth'.format(iteration))
  
  # Test
  net.eval() # Tell it we are in evaluation mode
  # 학습된 parameters을 이용하고 hidden and cell states는 초기화 시켜야함. 
  state_h, state_c = net.zero_state(1) #
  state_h = state_h.to(device) #
  state_c = state_c.to(device) #
  # Transfer data to GPU
  x = torch.tensor(test_in_text).to(device)
  y = torch.tensor(test_out_text).to(device)
  logits, (state_h, state_c) = net(x, (state_h, state_c))
  loss = criterion(logits.transpose(1, 2), y) # why we transpose the logits?
  loss_value = loss.item() # this loss is cross-entropy which is thing I want!!! 
  print("test set loss value (C.E): ", loss_value)

if __name__ == '__main__':
  main()




Vocabulary size 4694
Vocabulary size 1421
Epoch: 3/200 Iteration: 100 Loss (C.E): 5.773972034454346
Epoch: 6/200 Iteration: 200 Loss (C.E): 4.309805870056152
Epoch: 9/200 Iteration: 300 Loss (C.E): 3.118530511856079
Epoch: 12/200 Iteration: 400 Loss (C.E): 2.314215660095215
Epoch: 16/200 Iteration: 500 Loss (C.E): 1.917165756225586
Epoch: 19/200 Iteration: 600 Loss (C.E): 1.917675256729126
Epoch: 22/200 Iteration: 700 Loss (C.E): 1.3184051513671875
Epoch: 25/200 Iteration: 800 Loss (C.E): 1.0215789079666138
Epoch: 29/200 Iteration: 900 Loss (C.E): 0.7629404664039612
Epoch: 32/200 Iteration: 1000 Loss (C.E): 0.7583263516426086
Epoch: 35/200 Iteration: 1100 Loss (C.E): 0.6304391622543335
Epoch: 38/200 Iteration: 1200 Loss (C.E): 0.43913599848747253
Epoch: 41/200 Iteration: 1300 Loss (C.E): 0.30777496099472046
Epoch: 45/200 Iteration: 1400 Loss (C.E): 0.2341328263282776
Epoch: 48/200 Iteration: 1500 Loss (C.E): 0.21265681087970734
test set loss value (C.E):  13.988075256347656


### oliver_ch1 as a train set and oliver_test.txt (which is some part of the oliver_ch1) as a test set

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np
from collections import Counter
import os
from argparse import Namespace

flags = Namespace(
    train_file='/content/oliver_ch1.txt',
    test_file='/content/oliver_test.txt',
    seq_size=32,
    batch_size=16,
    embedding_size=64,
    lstm_size=64, # LSTM hidden size
    gradients_norm=5, # norm to clip gradients
    initial_words=['I', 'am'], # Initial words to start prediction from
    predict_top_k=5, # top k results ro sample word from
    # checkpoint_path='checkpoint',
)

def get_data_from_file(train_file, batch_size, seq_size):
    with open(train_file, 'r') as f:
        text = f.read()
    text = text.split()
    # Create two dictionaries, one to convert words into integers indices, 
    # and the other one to convert integer indices back to word tokens
    word_counts = Counter(text)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    int_to_vocab = {k: w for k, w in enumerate(sorted_vocab)}
    vocab_to_int = {w: k for k, w in int_to_vocab.items()}
    n_vocab = len(int_to_vocab)
    print('Vocabulary size', n_vocab)
    
    # Covert word tokens into integer indices. 
    # These will be the input to the network
    # We will train a mini-batch each iteration 
    # so we split the data into batches evenly. 
    # Chopping out the last uneven batch
    int_text = [vocab_to_int[w] for w in text]
    num_batches = int(len(int_text) / (seq_size * batch_size))
    in_text = int_text[:num_batches * batch_size * seq_size]
    
    # In next generation problem, 
    # the target of each input word will be its consecutive wold,
    # so we just shift the whole input data to the left by one step
    out_text = np.zeros_like(in_text)
    out_text[:-1] = in_text[1:] # in_text의 두번째 부터 out_text의 처음으로 복사
    out_text[-1] = in_text[0] # in_text의 처음을 out_text의 마지막으로 복사
    in_text = np.reshape(in_text, (batch_size, -1))
    out_text = np.reshape(out_text, (batch_size, -1))
    return int_to_vocab, vocab_to_int, n_vocab, in_text, out_text
  
def get_batches(in_text, out_text, batch_size, seq_size):
    num_batches = np.prod(in_text.shape) // (seq_size * batch_size)
    for i in range(0, num_batches * seq_size, seq_size):
        yield in_text[:, i:i+seq_size], out_text[:, i:i+seq_size]

class RNNModule(nn.Module):
  def __init__(self, n_vocab, seq_size, embedding_size, lstm_size):
    super(RNNModule, self).__init__()
    self.seq_size = seq_size
    self.lstm_size = lstm_size
    self.embedding = nn.Embedding(n_vocab, embedding_size)
    self.lstm = nn.LSTM(embedding_size, lstm_size, batch_first=True)
    self.dense = nn.Linear(lstm_size, n_vocab)

  # Take an input sequence and the previous states (hidden states) and produce the output together with states of the currents timestamp
  def forward(self, x, prev_state):
    embed = self.embedding(x)
    output, state = self.lstm(embed, prev_state)
    logits = self.dense(output)
    return logits, state # why return state variable?
  
  # Define one more method to help us set states to zero because we need to reset states at the beginning of every epoch.
  def zero_state(self, batch_size):
    return (torch.zeros(1, batch_size, self.lstm_size), # hidden state (the short-term memory)
            torch.zeros(1, batch_size, self.lstm_size)) # cell state (the long-term memory)

def get_loss_and_train_op(net, lr=0.001):
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(net.parameters(), lr=lr)
  return criterion, optimizer
  # gradient clipping doesn't apply here!

def main():
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  int_to_vocab, vocab_to_int, n_vocab, in_text, out_text = get_data_from_file(
      flags.train_file, flags.batch_size, flags.seq_size)
  test_int_to_vocab, test_vocab_to_int, test_n_vocab, test_in_text, test_out_text = get_data_from_file(
      flags.test_file, 1, flags.seq_size)
  net = RNNModule(n_vocab, flags.seq_size, flags.embedding_size, flags.lstm_size)
  net = net.to(device)
  criterion, optimizer = get_loss_and_train_op(net, 0.01)
  iteration = 0
  
  # Train
  # for each epoch, we will loop through the batches to compute loss valuse and update network's parameters.
  for e in range(50):
    batches = get_batches(in_text, out_text, flags.batch_size, flags.seq_size)
    state_h, state_c = net.zero_state(flags.batch_size)
    
    # Transfer data to GPU
    state_h = state_h.to(device)
    state_c = state_c.to(device)
    for x, y in batches: # x is in_text and y is out_text
      iteration += 1
      
      # Tell it we are in training mode
      net.train()

      # Reset all gradients
      optimizer.zero_grad()

      # Transfer data to GPU
      x = torch.tensor(x).to(device)
      y = torch.tensor(y).to(device)

      logits, (state_h, state_c) = net(x, (state_h, state_c))
      loss = criterion(logits.transpose(1, 2), y) # why we transpose the logits?
      
      # Avoid autograd which is given by Pytorch to keep track of the tensor's flow to perform back-propagation.
      state_h = state_h.detach()
      state_c = state_c.detach()

      loss_value = loss.item() # this loss is cross-entropy which is thing I want!!! 
      
      # Perform back-propagation
      loss.backward()
      
      # Gradient clipping
      _ = torch.nn.utils.clip_grad_norm_(net.parameters(), flags.gradients_norm)


      # Update the network's parameters
      optimizer.step()

      # Print the loss value and have the model generate some text for us during training
      if iteration % 100 == 0:
        print('Epoch: {}/{}'.format(e, 200), 'Iteration: {}'.format(iteration), 'Loss (C.E): {}'.format(loss_value))

      # if iteration % 1000 == 0:
      #   predict(device, net, flags.initial_words, n_vocab, vocab_to_int, int_to_vocab, top_k=5)
      #   torch.save(net.state_dict(), 'checkpoint_pt/model-{}.pth'.format(iteration))
  
  # Test
  net.eval() # Tell it we are in evaluation mode
  # 학습된 parameters을 이용하고 hidden and cell states는 초기화 시켜야함. 
  state_h, state_c = net.zero_state(1) #
  state_h = state_h.to(device) #
  state_c = state_c.to(device) #
  # Transfer data to GPU
  x = torch.tensor(test_in_text).to(device)
  y = torch.tensor(test_out_text).to(device)
  logits, (state_h, state_c) = net(x, (state_h, state_c))
  loss = criterion(logits.transpose(1, 2), y) # why we transpose the logits?
  loss_value = loss.item() # this loss is cross-entropy which is thing I want!!! 
  print("test set loss value (C.E): ", loss_value)

if __name__ == '__main__':
  main()




Vocabulary size 4694
Vocabulary size 40
Epoch: 3/200 Iteration: 100 Loss (C.E): 5.741121292114258
Epoch: 6/200 Iteration: 200 Loss (C.E): 4.286619186401367
Epoch: 9/200 Iteration: 300 Loss (C.E): 2.989633321762085
Epoch: 12/200 Iteration: 400 Loss (C.E): 2.2182981967926025
Epoch: 16/200 Iteration: 500 Loss (C.E): 1.6998649835586548
Epoch: 19/200 Iteration: 600 Loss (C.E): 1.6040774583816528
Epoch: 22/200 Iteration: 700 Loss (C.E): 1.0732179880142212
Epoch: 25/200 Iteration: 800 Loss (C.E): 0.8132094740867615
Epoch: 29/200 Iteration: 900 Loss (C.E): 0.5646193623542786
Epoch: 32/200 Iteration: 1000 Loss (C.E): 0.5946887731552124
Epoch: 35/200 Iteration: 1100 Loss (C.E): 0.4816804826259613
Epoch: 38/200 Iteration: 1200 Loss (C.E): 0.33147957921028137
Epoch: 41/200 Iteration: 1300 Loss (C.E): 0.2688245475292206
Epoch: 45/200 Iteration: 1400 Loss (C.E): 0.23053620755672455
Epoch: 48/200 Iteration: 1500 Loss (C.E): 0.16258715093135834
test set loss value (C.E):  10.106332778930664
