<a href="https://colab.research.google.com/github/Sharaborina/ChatBot/blob/main/Danish_text_generator_LSTM_pyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [77]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [78]:

from torch import nn
import torch.nn.functional as F
import csv
import torch
import random
import pandas as pd
import numpy as np
import requests

from sklearn.feature_extraction.text import CountVectorizer


In [95]:
import json
def load_json(directory, file):
  with open(f'{directory}/{file}') as f:
    db = json.load(f)
  return db

def upload_json(directory, file, db):
  with open(f'{directory}/{file}', mode='w') as f:
    json.dump(db,f)

Text Generation

In [80]:
da = "/content/drive/MyDrive/ColabNotebooks/MultiWoz/untokenized_data_danish.txt"
text = load_json('/content/drive/MyDrive/ColabNotebooks/MultiWoz', 'untokenized_data_danish.txt')
text = ' '.join(text)
print(text[:500])

Person 1: er på udkig efter et sted at bo, der har en billig prisinterval, det skal være i en type hotel Person 2: Okay, har du et bestemt område, du vil bo i? Person 1: nej, jeg skal bare sørge for, at det er billigt. åh, og jeg har brug for parkering Person 2: Jeg fandt et billigt hotel til dig, der inkluderer parkering. Kan du lide, at jeg bestiller det? Person 1: Ja tak. 6 personer 3 nætter startende på tirsdag. Person 2: Jeg er ked af det, men jeg kunne ikke booke det til dig til tirsdag. E


In [96]:
# Create two dictionaries:
# int2char -- maps integers to characters
# char2int -- maps characters to unique integers
import string
# chars = string.ascii_letters + ".,;:"
chars = sorted(set(list(text)))
int2char, char2int = {}, {}
for i,c in enumerate(chars):
  int2char[i] = c
  char2int[c] = i



upload_json('/content/drive/MyDrive/ColabNotebooks/MultiWoz', 'chars.txt', chars)



# encode the text
encoded = np.array([char2int[ch] for ch in text])

In [82]:
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
def one_hot_encode(arr, n_labels):
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [83]:
def get_batches(arr, batch_size, seq_length):
    # Create a generator that returns batches of size batch_size x seq_length
    batch_size_total = batch_size * seq_length
    ## Get the number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    ## Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    ## Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    ## Iterate over the batches using a window of size seq_length
    for n in range(0, arr.shape[1], seq_length):
        x = arr[:, n:n +seq_length]
        # The target is a version of x shifted by one (do not forget border conditions)
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [84]:
# check if GPU is available
train_on_gpu = torch.cuda.is_available()
if(train_on_gpu):
    print('Training on GPU')
else: 
    print('No GPU available, training on CPU; consider making n_epochs very small.')

Training on GPU


In [85]:
class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        self.vocab_size = len(tokens)
        self.input_dim = n_hidden
        self.output_size = len(tokens)
        self.embedding = nn.Embedding(self.vocab_size, self.vocab_size)

        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        # Define the LSTM layer
        self.lstm = lstm_layer = nn.LSTM(self.input_dim, n_hidden, n_layers, dropout=drop_prob, batch_first=True)
        # Define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        self.rnn = getattr(nn, 'LSTM')(len(self.chars), n_hidden, n_layers, dropout=drop_prob, batch_first=True)

        # Define the final, fully-connected output layer        
        self.fc = nn.Linear(self.n_hidden, self.output_size)
        self.sigmoid = nn.Sigmoid()

        self.decoder = nn.Linear(n_hidden, len(self.chars))

        self.init_weights()
        self.cuda()
        
    def forward(self, x, hidden):
        ''' Forward pass through the network '''
        x, h = self.rnn(x, hidden)
        x = self.dropout(x)
        # x = x.view(x.size(0)*x.size(1), self.n_hidden)
        x = x.reshape(x.size(0)*x.size(1), self.n_hidden)
        x = self.decoder(x)

        return x, h

    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden

    def init_weights(self):
        ''' Initialize weights of decoder (fully connected layer) '''

        # Apply bias tensor to all zeros
        self.decoder.bias.data.fill_(0)

        # Apply random uniform weights to decoder
        self.decoder.weight.data.uniform_(-1, 1)

        

In [86]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network 
    
        Arguments
        ---------
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    
    '''
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if(train_on_gpu):
        net.cuda()
    
    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            net.zero_grad()
            
            output, h = net(inputs, h)
            
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward()

            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if(train_on_gpu):
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                
                    val_losses.append(val_loss.item())
                
                net.train() # reset to train mode after iterationg through validation data
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [87]:
# Set model hyperparameters

n_hidden = 256
n_layers = 5

net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (embedding): Embedding(97, 97)
  (lstm): LSTM(256, 256, num_layers=5, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (rnn): LSTM(97, 256, num_layers=5, batch_first=True, dropout=0.5)
  (fc): Linear(in_features=256, out_features=97, bias=True)
  (sigmoid): Sigmoid()
  (decoder): Linear(in_features=256, out_features=97, bias=True)
)


In [88]:
# set testing hyperparameters

batch_size = 20
seq_length = 500
# start small if you are just testing initial behavior
n_epochs = 5

# train the model
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)

train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.01, print_every=10)


Epoch: 1/5... Step: 10... Loss: 3.4969... Val Loss: 3.3854
Epoch: 1/5... Step: 20... Loss: 3.3192... Val Loss: 3.2077
Epoch: 1/5... Step: 30... Loss: 3.2481... Val Loss: 3.1741
Epoch: 1/5... Step: 40... Loss: 3.1247... Val Loss: 2.9669
Epoch: 1/5... Step: 50... Loss: 2.7272... Val Loss: 2.6054
Epoch: 1/5... Step: 60... Loss: 2.5381... Val Loss: 2.4160
Epoch: 1/5... Step: 70... Loss: 2.4492... Val Loss: 2.3208
Epoch: 1/5... Step: 80... Loss: 2.3814... Val Loss: 2.2437
Epoch: 1/5... Step: 90... Loss: 2.2821... Val Loss: 2.1718
Epoch: 1/5... Step: 100... Loss: 2.1917... Val Loss: 2.0961
Epoch: 1/5... Step: 110... Loss: 2.1817... Val Loss: 2.0153
Epoch: 1/5... Step: 120... Loss: 2.0771... Val Loss: 1.9304
Epoch: 1/5... Step: 130... Loss: 1.9736... Val Loss: 1.8703
Epoch: 1/5... Step: 140... Loss: 1.9135... Val Loss: 1.7996
Epoch: 1/5... Step: 150... Loss: 1.8345... Val Loss: 1.7184
Epoch: 1/5... Step: 160... Loss: 1.8312... Val Loss: 1.6300
Epoch: 1/5... Step: 170... Loss: 1.7701... Val Lo

In [100]:
torch.save(net.state_dict(), '/content/drive/MyDrive/ColabNotebooks/Generative_model_Sharaborina_state_dict.pt' )

In [90]:
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [91]:
def sample(net, size, prime='The', top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [99]:
print(sample(net, 300, prime='Er der hund?', top_k=2))

Er der hund? Person 1: Ja det skal være i centrum af byen. Person 2: Der er flere, hvis du har brug for at rejse efter 10:15 Person 1: Jeg leder også efter oplysninger om et hotel i centrum af byen og en moderat prisklasse. Person 2: Ja, det er et hotel i den østlige del af byen og er dyrt. Vil du have mig til a
