# RNN-for-Text-Generation

Text generation (encoded variables)

In [None]:
import torch
from torch import nn
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
with open('shakespeare.txt','r',encoding='utf8') as f:
    text = f.read()

In [None]:
print(text[:1000])

In [None]:
len(text)

## Text Encoding

In [None]:
unique_characters = set(text)
unique_characters

In [None]:
decoder = dict(enumerate(unique_characters))

In [None]:
decoder

In [None]:
encoder ={}
for k,v in decoder.items():
    encoder[v] = k

In [None]:
encoder

In [None]:
encoded_text = np.array([encoder[char] for char in text])
length = len(set(encoded_text))
length

In [None]:
encoded_text[:200]

# One-hot Encoding
The data is needed to be one hot encoded to be capable of feeding into the NN structure.

In [None]:
def one_hot_encoder(encoded_text, num_uni_chars):
    
    # Create a placeholder for zeros
    one_hot = np.zeros((encoded_text.size, num_uni_chars))
    
    # Convert data type for later use with pytorch (errors if we dont)
    one_hot = one_hot.astype(np.float32)
    
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    
    return one_hot
    
    

In [None]:
one_hot_encoder(np.array([1,2,0]),3)

# Creating Training Batches
We need to create a function that will generate batches of characters along with the next character in the sequence as a label.

In [None]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    
    '''
    Generate (using yield) batches for training.
    
    X: Encoded Text of length seq_len
    Y: Encoded Text shifted by one
    
    Example:
    
    X:
    
    [[1 2 3]]
    
    Y:
    
    [[ 2 3 4]]
    
    encoded_text : Complete Encoded Text to make batches from
    batch_size : Number of samples per batch
    seq_len : Length of character sequence
       
    '''
    
    # Total number of characters per batch
    # Example: If samp_per_batch is 2 and seq_len is 50, then 100
    # characters come out per batch.
    char_per_batch = samp_per_batch * seq_len
    
    
    # Number of batches available to make
    # Use int() to roun to nearest integer
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    
    # Cut off end of encoded_text that
    # won't fit evenly into a batch
    encoded_text = encoded_text[:num_batches_avail * char_per_batch]
    
    
    # Reshape text into rows the size of a batch
    encoded_text = encoded_text.reshape((samp_per_batch, -1))
    

    # Go through each row in array.
    for n in range(0, encoded_text.shape[1], seq_len):
        
        # Grab feature characters
        x = encoded_text[:, n:n+seq_len]
        
        # y is the target shifted over by 1
        y = np.zeros_like(x)
       
        #
        try:
            y[:, :-1] = x[:, 1:]
            y[:, -1]  = encoded_text[:, n+seq_len]
            
        # FOR POTENTIAL INDEXING ERROR AT THE END    
        except:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, 0]
        
        while True:    
            yield x, y

### Example of generating a batch

In [None]:
sample_text = np.arange(80)

In [None]:
sample_text

In [None]:
batch_generator = generate_batches(sample_text,samp_per_batch=4,seq_len=5)
batch_genertor 

# LSTM model

In [None]:
class LSTMmodel(nn.Module):
    
    def __init__(self, all_chars, num_hidden = 256, num_layers= 4,drop_prob=0.5, use_gpu=False):
        
        super().__init__()
        self.drop_prob = drop_prob
        self.num_hidden = num_hidden
        self.num_layers = num_layers
        self.drop_prob = drop_prob
        self.use_gpu = use_gpu
        
        
        #Character set, Encoder and Decoder
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char:idx for idx,char in self.decoder.items()}
        
        self.lstm = nn.LSTM(input_size=len(self.all_chars),hidden_size=num_hidden,num_layers=num_layers,dropout=drop_prob)
        self.dropout = nn.Dropout(drop_prob)
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))
        
    def forward(self,x,hidden):
        
        lstm_output, hidden = self.lstm(x,hidden)
        drop_output = self.dropout(lstm_output)
        drop_output = dropoutput.reshape(-1,self.num_hidden)
        output = self.fc_linear(drop_output)
        
        return output, hidden
    
    def hidden_state(self,batch_size=128):
        
        if torch.cuda.is_available():
            device = 'cude'
        else:
            device = 'cpu'
            
        hidden = (torch.zeros(self.num_layers, batch_size, self.num_hidden),
                 torch.zeros(self.num_layers, batch_size, self.num_hidden))
        
        return hidden
        
    

## Instance of the model

In [None]:
model = LSTMmodel(all_chars = unique_characters, num_hidden =512,num_layers= 3)

In [None]:
total_param=[]
for p in model.parameters():
    total_param.append(int(p.numel()))
    
sum(total_param)

In [None]:
len(encoded_text)

## Optimizer and loss

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr = 0.001)
criterion = nn.CrossEntropyLoss()

## Training Data and Validation Data

In [None]:
idx = int(len(encoded_text) * 0.9)

In [None]:
train_data = encoded_text[:idx]
val_data = encoded_text[idx:] 

# Training Networks

In [None]:
##variables 

epochs = 50
batch_size = 128
seq_len = 100
num_char = max(encoded_text)+1


In [None]:
#set model to train
if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'
    
###########################################################

model.train()

i = 0

for epoch in range(epochs):
    
    hidden = model.hidden_state(128)
    
    for x,y in generate_batches(train_data, batch_size, seq_len):
        i += 1
        x = one_hot_encoder(x,num_char)
        
        #convert numpy array to tensor
        
        inputs = torch.from_numpy(x).to(device)
        targets = torch.from_numpy(y).to(device)
        
        #reset hidden state after each batch since batches are cosidered independant
        hidden = tuple([state.detach() for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs, hidden)
        loss = criterion(lstm_output, targets.view(bathc_size*seq_len).long())
        loss.backward()
        
        #CLIP for tackle gradient exploding
        nn.utils.clip_grad_norm(model.parameters())
        
        optimizer.step()
        
        ##########################
        ### validation set
        if i % 25 == 0:
            
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data, batch_size, seq_len):
                
                x = one_hot_encoder(x, num_char)
                
                #convert Numpy arrays to Tensor
                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)
                
                val_hidden = tuple([state.detach() for state in val_hidden])
                
                lstm_output, val_hidden = model.forwrd(inputs,val_hidden)
                val_loss = criterion(lstm_output, targets.view(batch_size*seq_len))
                
                val_losses.append(val_loss.item())
                
                model.train()
                
                print(f" Epoch :{epoch} ,Step :{i} ,Val Loss:{val_loss.item()} ")
        
        
        
            
    

## Saving the model

In [None]:
torch.save(model.dict_state(),'model_Shakspeare.pt')

## Load Model

In [None]:
model.load_state_dict(torch.load('model_Shakspeare.pt'))
model.eval()

In [None]:
model = CharModel(
    all_chars=unique_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True,
)

# Generating Predictions

In [4]:
def predict_next_char(model,char, hidden=None,k=1):
    
    #Encode raw letters with model
    encoded_text = model.encoder[char]
    
    #Need numpy array for one-hot encoding
    encoded_text = np.array([[encoded_text]])
    
    #One-hot encoding
    encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
    
    #Convert to Tensor
    inputs = torch.from_numpy(encoded_text)
    
    #detach hidden states
    hidden = tuple([state.data for state in hidden])
    
    #Run model and get predictions
    lstm_out, hidden = model(inputs, hidden)
    
    probs = F.softmax(lstm_out,dim = 1).detach().cpu()
    top_k , idxs = probs.topk(k)
    
    idxs = idxs.numpy().squeeze()
    probs = probs.numpy().flatten()
    
    char = np.random.choice(index_positions, p=probs/probs.sum())
    
    return model.decoder[char], hidden
    

In [11]:
def generate_text(model, size, seed="The", k = 1):
    
    if cuda.is_available():
        device = 'cuda'
    else:
        device = 'cpu'
        
    model.eval()
    
    output_chars = [s for s in seed]
    hidden = model.hidden_state(1)

    for char in seed:
        char, hidden = predict_next_char(model,char,hidden, k=k)
    
    output_chars.append(char)
    
    
    for i in range(size):
        
        char, hidden = predict_next_char(model,output_chars[-1], hidden, k=k)
        output_chars.append(char)
    
    return ''.join(output_chars)