<a href="https://colab.research.google.com/github/Pedro69491/Neural-Networks/blob/main/LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import re

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))


Using cuda device


In [None]:
with open('800053.txt', 'r') as f:
    text = f.read()
    




In [None]:
text = re.sub('<.>', '', text)
text = re.sub('[^a-zA-Z]', ' ', text).lower()
text = text.replace('  ', '')

print(text[:10])

In [None]:
chars = list(set(text))
print(len(chars))

ind_char = dict(enumerate(chars))
 
char_ind = {ch: ii for ii, ch in ind_char.items()}
 
encoded = np.array([char_ind[ch] for ch in text[:10000]])
print(encoded.shape)


In [None]:
def one_hot_encode(arr, n_labels):
    print(arr.shape)
    print(n_labels)
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [None]:
def get_batches(arr, n_seqs, n_steps):
    
    batch_size = n_seqs * n_steps
    n_batches = len(arr)//batch_size

    arr = arr[:n_batches * batch_size]

    arr = arr.reshape((n_seqs, -1))
    
    for n in range(0, arr.shape[1], n_steps):

        x = arr[:, n:n+n_steps]
      
        y = np.zeros_like(x)
    
        try: 
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+n_steps]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y 

In [None]:
batches = get_batches(encoded, 10, 50)
print(len(encoded))
x, y = next(batches)
print(x)


In [None]:
class LSTM(nn.Module):
    
    def __init__(self, tokens, n_steps=50, n_hidden=100, n_layers=2,
                               drop_prob=0.2, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        self.chars = tokens
        self.ind_char = dict(enumerate(self.chars))
        self.char_ind = {ch: ii for ii, ch in self.ind_char.items()}
        
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc = nn.Linear(n_hidden, len(self.chars))
        

    def init_hidden(self, batch_size):
        return (torch.randn(self.n_layers, batch_size, self.n_hidden).to(device),
                torch.randn(self.n_layers, batch_size, self.n_hidden).to(device))  
    
    def forward(self, x, hc):
        
        print(x.shape)
        x, (h, c) = self.lstm(x, hc)
        print(x.shape)
        x = self.dropout(x)
        
        x = x.reshape(x.size()[0]*x.size()[1], self.n_hidden)
        
        print(x.shape)
        x = self.fc(x)
        
        return x, (h, c)
    
    
    def predict(self, char, top_k=None):
       
        h = self.init_hidden(1)
        x = np.array([[self.char_ind[char]]])
        x = one_hot_encode(x, len(self.chars))
        
        inputs = torch.from_numpy(x).to(device)
       
        
        h = tuple([each.data for each in h])
        out, h = self.forward(inputs, h)

        p = F.softmax(out, dim=1).data.cpu()
        
    
        if top_k is None:
            top_ch = np.arange(len(self.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        character = self.ind_char[torch.argmax(p).item()]
        p = p.numpy().squeeze()
        print(p)   
        return character, p
        

In [None]:
def train(net, data, epochs=10, n_seqs=10, n_steps=50, lr=0.001, clip=5, print_every=10):
 
    
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    
    criterion = nn.CrossEntropyLoss()
  
    
    counter = 0
    n_chars = len(net.chars)
  
    
    for e in range(epochs):
        
        h = net.init_hidden(n_seqs)
        
        for x, y in get_batches(data, n_seqs, n_steps):
            
            counter += 1
            
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            inputs, targets = inputs.to(device), targets.to(device)
 
            h = tuple([each.data for each in h])
 
            net.zero_grad()
            
            output, h = net.forward(inputs, h)
            
            loss = criterion(output, targets.reshape(n_seqs*n_steps).type(torch.cuda.LongTensor))
            loss = torch.exp(loss)
 
            loss.backward()
            
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
 
            opt.step()
           
            #print("Epoch: {}/{}...".format(e+1, epochs),
                  #"Step: {}...".format(counter),
                  #"Loss: {:.4f}...".format(loss.item()),)

In [None]:
net = LSTM(chars, n_hidden=512, n_layers=2) 
net.to(device)

print(net)
#10,50
n_seqs, n_steps = 10, 50


train(net, encoded, epochs=3, n_seqs=n_seqs, n_steps=n_steps, lr=0.001, print_every=10)
chars_to_predict = [ch for ch in text[:16]]
for c in chars_to_predict:
  print(net.predict(c))
#print(net.predict('a'))