# Baseline LSTM model

Code inspired by [this blog post](https://www.analyticsvidhya.com/blog/2020/08/build-a-natural-language-generation-nlg-system-using-pytorch/)

### Dataset Class

In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset
import re

class RapDataset(Dataset):

  def __init__(self, file_name):
    file_pd = pd.read_csv(file_name)
    # Artist not considered
    # Returns numpy array, shape: (num_lines, 1)
    x = file_pd.iloc[1:1000, 1:].values.flatten().tolist()

    # Clean text
    x = [re.sub("[^-a-z0-9' ]", "", i) for i in x]

    # Create ngrams
    ngrams = [self.create_ngram(i) for i in x]

    # merge list-of-lists into a single list
    ngrams = sum(ngrams, [])

    # create inputs and targets
    inputs = []
    targets = []

    for s in ngrams:
      inputs.append(" ".join(s.split()[:-1]))
      targets.append(" ".join(s.split()[1:]))
 
    # create integer-to-token mapping
    self.int2token = {}
    cnt = 0

    for w in set(" ".join(x).split()):
      self.int2token[cnt] = w
      cnt+= 1

    # create token-to-integer mapping
    self.token2int = {t: i for i, t in self.int2token.items()}

    # convert text sequences to integer sequences
    self.inputs_int = [self.get_integer_seq(i) for i in inputs]
    self.targets_int = [self.get_integer_seq(i) for i in targets]

    self.inputs_int = torch.tensor(self.inputs_int, dtype=torch.long)
    self.targets_int = torch.tensor(self.targets_int, dtype=torch.long)

    # set vocabulary size
    self.vocab_size = len(self.int2token)

  def create_ngram(self, text, n=5):
    ngrams = []

    if len(text.split()) > n:
      for i in range(n, len(text.split())):
        # select ngram of tokens
        seq = text.split()[i-n:i+1]
        # add to the list
        ngrams.append(" ".join(seq))

      return ngrams
    else:
      return [text]

  def get_integer_seq(self, seq):
    return [self.token2int[w] for w in seq.split()]

  def __len__(self):
    return len(self.inputs_int)

  def __getitem__(self, idx):
    return self.inputs_int[idx], self.targets_int[idx]


In [None]:
# Create Dataset
D = RapDataset('/data-disk/rap/cleaned_lyrics.csv')

### LSTM Model Class

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class WordLSTM(nn.Module):
    
    def __init__(self, vocab_size, n_hidden=256, n_layers=4, drop_prob=0.3, lr=0.001):
        super().__init__()

        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        self.emb_layer = nn.Embedding(vocab_size, 200)

        ## define the LSTM
        self.lstm = nn.LSTM(200, n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        ## define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        ## define the fully-connected layer
        self.fc = nn.Linear(n_hidden, vocab_size)      
    
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hidden`. '''

        ## pass input through embedding layer
        embedded = self.emb_layer(x)     
        
        ## Get the outputs and the new hidden state from the lstm
        lstm_output, hidden = self.lstm(embedded, hidden)
        
        ## pass through a dropout layer
        out = self.dropout(lstm_output)
        
        ## stack output embeddings 
        out = out.reshape(-1, self.n_hidden) 

        ## put "out" through the fully-connected layer
        out = self.fc(out)

        # return the final output and the hidden state
        return out, hidden
    
    
    def init_hidden(self, batch_size):
        ''' initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data

        # if GPU is available
        if (torch.cuda.is_available()):
          hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                    weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        
        # if GPU is not available
        else:
          hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                    weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden

In [None]:
# instantiate the model
net = WordLSTM(vocab_size=D.vocab_size)

net.cuda()

print(net)

WordLSTM(
  (emb_layer): Embedding(21291, 200)
  (lstm): LSTM(200, 256, num_layers=4, batch_first=True, dropout=0.3)
  (dropout): Dropout(p=0.3, inplace=False)
  (fc): Linear(in_features=256, out_features=21291, bias=True)
)


### Model Training

In [None]:
def train(net, data_set, epochs=10, batch_size=32, lr=0.001, clip=1, print_every=32):
    
    # optimizer
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    
    # loss
    criterion = nn.CrossEntropyLoss()

    # Data Loader
    train_loader = torch.utils.data.DataLoader(data_set, batch_size=batch_size, shuffle=True, drop_last=True)
    
    # push model to GPU
    net.cuda()
    
    counter = 0

    net.train()

    for e in range(epochs):

        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for inputs, targets in train_loader:
            counter+= 1
            
            # push tensors to GPU
            inputs, targets = inputs.cuda(), targets.cuda()

            # detach hidden states
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()
            
            # get the output from the model
            output, h = net(inputs, h)
            
            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(-1))

            # back-propagate error
            loss.backward()

            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)

            # update weigths
            opt.step()            
            
            if counter % print_every == 0:
            
              print("Epoch: {}/{}...".format(e+1, epochs),
                    "Step: {}...".format(counter))

In [None]:
# Train the model
train(net, D, batch_size = 32, epochs=20, print_every=256)
torch.save(net.state_dict(), '/data-disk/rap/baseline.md')

Epoch: 1/20... Step: 256...
Epoch: 1/20... Step: 512...
Epoch: 1/20... Step: 768...
Epoch: 1/20... Step: 1024...
Epoch: 1/20... Step: 1280...
Epoch: 1/20... Step: 1536...
Epoch: 1/20... Step: 1792...
Epoch: 1/20... Step: 2048...
Epoch: 1/20... Step: 2304...
Epoch: 1/20... Step: 2560...
Epoch: 1/20... Step: 2816...
Epoch: 1/20... Step: 3072...
Epoch: 1/20... Step: 3328...
Epoch: 1/20... Step: 3584...
Epoch: 1/20... Step: 3840...
Epoch: 1/20... Step: 4096...
Epoch: 1/20... Step: 4352...
Epoch: 1/20... Step: 4608...
Epoch: 1/20... Step: 4864...
Epoch: 1/20... Step: 5120...
Epoch: 1/20... Step: 5376...
Epoch: 1/20... Step: 5632...
Epoch: 1/20... Step: 5888...
Epoch: 1/20... Step: 6144...
Epoch: 1/20... Step: 6400...
Epoch: 1/20... Step: 6656...
Epoch: 1/20... Step: 6912...
Epoch: 1/20... Step: 7168...
Epoch: 1/20... Step: 7424...
Epoch: 1/20... Step: 7680...
Epoch: 1/20... Step: 7936...
Epoch: 1/20... Step: 8192...
Epoch: 1/20... Step: 8448...
Epoch: 1/20... Step: 8704...
Epoch: 1/20... St

### Prediction

In [None]:
net.load_state_dict(torch.load('/data-disk/rap/baseline.md'))

<All keys matched successfully>

In [None]:
import random

# predict next token
def predict(net, tkn, h=None):
         
  # tensor inputs
  x = np.array([[D.token2int[tkn]]])
  inputs = torch.from_numpy(x)
  
  # push to GPU
  inputs = inputs.cuda()

  # detach hidden state from history
  h = tuple([each.data for each in h])

  # get the output of the model
  out, h = net(inputs, h)

  # get the token probabilities
  p = F.softmax(out, dim=1).data

  p = p.cpu()

  p = p.numpy()
  p = p.reshape(p.shape[1],)

  # get indices of top 3 values
  top_n_idx = p.argsort()[-3:][::-1]

  # randomly select one of the three indices
  sampled_token_index = top_n_idx[random.sample([0,1,2],1)[0]]

  # return the encoded value of the predicted char and the hidden state
  return D.int2token[sampled_token_index], h


# function to generate text
def sample(net, size, prime='it is'):
        
    # push to GPU
    net.cuda()
    
    net.eval()

    # batch size is 1
    h = net.init_hidden(1)

    toks = prime.split()

    # predict next token
    for t in prime.split():
      token, h = predict(net, t, h)
    
    toks.append(token)

    # predict subsequent tokens
    for i in range(size-1):
        token, h = predict(net, toks[-1], h)
        toks.append(token)

    return ' '.join(toks)

In [None]:
sample(net, 100, prime = "this rap is lit")

"this rap is lit it out the club bottle full of bub t all on my dick 'm the one in a pot prinkle the money on me 'm so whoa whoa whoa 'll be your best friends close for me ou know that we handsome e ballin away for a track of a kind of bub ook mami come up with me 'm a smooth nigga you ain't a game and the hood you can be my bitch and the sun and my name is noccut eft up in a pot a lil mama put me in a pot prinkle a little something"

In [None]:
torch.save(net, '/data-disk/rap/baseline.md')

  "type " + obj.__name__ + ". It won't be checked "
