# Text Generation with Neural Networks

## Imports

In [None]:
# Standard Library Imports
import logging

# Third-Party Libraries
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt

# MLflow for Experiment Tracking and Model Management
import mlflow
import mlflow.pytorch

torch.manual_seed(0)

## Define Constants and Paths and Configure Logging

In [None]:
# Define global experiment and run names to be used throughout the notebook
EXPERIMENT_SET = "RNN text generation"
RUN_NAME = "RNN Text Generation"
MODEL_NAME = "dict_torch_rnn_model.pt"

# Set up the paths
DATA_PATH = "shakespeare.txt"
MODEL_DECODER_PATH = "models/decoder.pt"
MODEL_ENCODER_PATH = "models/encoder.pt"

# Set up the chunk separator for text processing
CHUNK_SEPARATOR = "\n\n"

In [None]:
# Configure the logging module with desired format and level
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Create a logger for this notebook
logger = logging.getLogger('text-generation-Torch-notebook')
logger.info("Logging configured successfully")

## Get Text Data

This is the text we'll use as a basis for our generations: let's try to generate 'Shakespearean' texts.

This text is from Shakespeare's Sonnet 1. It's one of the 154 sonnets written by William Shakespeare that were first published in 1609. This particular sonnet, like many others, discusses themes of beauty, procreation, and the transient nature of life, urging the beautiful to reproduce so their beauty can live on through their offspring.

In [None]:
with open(DATA_PATH,'r',encoding='utf8') as f:
    text = f.read()

In [None]:
print('First 600 chars: \n')
print(text[:600])

## Preparing textual data

We need to encode our data to give the model a proper numerical representation of our text.

In [None]:
all_characters = set(text) # creates a set of unique characters found in the text

In [None]:
len(all_characters)

In [None]:
decoder = dict(enumerate(all_characters))
# assigns a unique integer to each character in a dictionary format, 
# creating a mapping that can later be used to transform encoded predictions back into characters

In [None]:
encoder = {char: ind for ind, char in decoder.items()} 
# reverses the decoder dictionary, providing a mapping from characters to their respective assigned integers, which is used to encode the text.

In [None]:
torch.save(decoder, MODEL_DECODER_PATH)
torch.save(encoder, MODEL_ENCODER_PATH)

In [None]:
encoded_text = np.array([encoder[char] for char in text])
# encodes the entire text as an array of integers, with each integer representing the character at that position
#in the text according to the encoder dictionary

## One Hot Encoding

One-hot encoding is a way to convert categorical data into a fixed-size vector of numerical values.

This encoding allows the model to treat input data uniformly and is particularly important for models that need to determine the presence or absence of a feature, such as a particular character.

In [None]:
def one_hot_encoder(encoded_text, num_uni_chars):
    """
        Convert categorical data into a fixed-size vector of numerical values.

        Args:
            encoded_text: Batch of encoded text.
            num_uni_chars: Number of unique characters

    """

    # Create a placeholder for zeros
    one_hot = np.zeros((encoded_text.size, num_uni_chars))
    
    # Convert data type for later use with pytorch
    one_hot = one_hot.astype(np.float32)

    # Using indexing fill in the 1s at the correct index locations
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    
    # Reshape it so it matches the batch shape
    one_hot = one_hot.reshape((*encoded_text.shape, num_uni_chars))
    
    return one_hot

# Creating Training Batches

Training batches are a way of dividing the dataset into smaller, manageable groups of data points that are fed into a machine learning model during the training process.

In [None]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    
    '''
    Generate (using yield) batches for training.
    
    X: Encoded Text of length seq_len
    Y: Encoded Text shifted by one
    
    Example:
    
    X:
    
    [[1 2 3]]
    
    Y:
    
    [[ 2 3 4]]
    
    encoded_text : Complete Encoded Text to make batches from
    batch_size : Number of samples per batch
    seq_len : Length of character sequence
       
    '''
    
    # Total number of characters per batch
    # Example: If samp_per_batch is 2 and seq_len is 50, then 100
    # characters come out per batch.
    char_per_batch = samp_per_batch * seq_len
    
    # Number of batches available to make
    # Use int() to roun to nearest integer
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    
    # Cut off end of encoded_text that
    # won't fit evenly into a batch
    encoded_text = encoded_text[:num_batches_avail * char_per_batch]
    
    # Reshape text into rows the size of a batch
    encoded_text = encoded_text.reshape((samp_per_batch, -1))

    # Go through each row in array.
    for n in range(0, encoded_text.shape[1], seq_len):
        # Grab feature characters
        x = encoded_text[:, n:n+seq_len]
        # y is the target shifted over by 1
        y = np.zeros_like(x)
        try:
            y[:, :-1] = x[:, 1:]
            y[:, -1]  = encoded_text[:, n+seq_len]
        except:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, 0]
            
        yield x, y

# Creating the LSTM Model

In [None]:
class CharModel(nn.Module):
    def __init__(self, all_chars, num_hidden=256, num_layers=4,drop_prob=0.5, use_gpu=False):
        """Initializes CharModel

        Args:
            all_chars: Set of unique characters found in the text.
            num_hidden: Number of hidden layers. Defaults to 256.
            num_layers: Number of layers. Defaults to 4.
            drop_prob: Regularization technique to prevent overfitting. Defaults to 0.5.
            use_gpu: If the model uses GPU. Defaults to False.
        """
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        self.all_chars = all_chars
        self.decoder = torch.load(MODEL_DECODER_PATH)
        self.encoder = torch.load(MODEL_ENCODER_PATH)
        
        self.lstm = nn.LSTM(len(self.all_chars), num_hidden, num_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc_linear = nn.Linear(num_hidden, len(self.all_chars))
      
    
    def forward(self, x, hidden):
        lstm_output, hidden = self.lstm(x, hidden)       
        drop_output = self.dropout(lstm_output)
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        final_out = self.fc_linear(drop_output)
        
        return final_out, hidden
    
    
    def hidden_state(self, batch_size):
        if self.use_gpu:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
        else:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden))
        
        return hidden
        

## Instance of the Model

In [None]:
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=True
)

### Optimizer and Loss

In [None]:
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

## Training Data and Validation Data

In [None]:
# percentage of data to be used for training
train_percent = 0.5

In [None]:
int(len(encoded_text) * (train_percent))

In [None]:
train_ind = int(len(encoded_text) * (train_percent))

In [None]:
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

# Training the Network

## Variables

In [None]:
# Epochs to train for
epochs = 30
# batch size 
batch_size = 128
# Length of sequence
seq_len = 100
# for printing report purposes
# always start at 0
tracker = 0
# number of characters in text
num_char = max(encoded_text)+1

In [None]:
mlflow.set_experiment(EXPERIMENT_SET)

In [None]:
mlflow.start_run(run_name = RUN_NAME)

mlflow.log_param("epochs", epochs)
mlflow.log_param("batch_size", batch_size)

# Set model to train
model.train()

# Check to see if using GPU
if model.use_gpu:
    torch.cuda.manual_seed_all(0)
    model.cuda()

for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    
    for x,y in generate_batches(train_data, batch_size, seq_len):
        
        tracker += 1
        
        # One Hot Encode incoming data
        x = one_hot_encoder(x, num_char)
        
        # Convert Numpy Arrays to Tensor
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        # Adjust for GPU if necessary
        if model.use_gpu:
            inputs = inputs.cuda()
            targets = targets.cuda()
            
        # Reset Hidden State
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs, hidden)
        loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
        
        loss.backward()
        
        # Clipping gradients to avoid explosion
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)
        
        optimizer.step()
        
        if tracker % 100 == 0:
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x,y in generate_batches(val_data, batch_size, seq_len):
                x = one_hot_encoder(x, num_char)
                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)
                
                if model.use_gpu:
                    inputs = inputs.cuda()
                    targets = targets.cuda()
                
                val_hidden = tuple([state.data for state in val_hidden])
                
                lstm_output, val_hidden = model.forward(inputs, val_hidden)
                val_loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
        
                val_losses.append(val_loss.item())
            
  
            mlflow.log_metric("Val Loss", val_loss.item(), step=tracker)
        
            model.train()
            
    logger.info(f"Epoch: {i} Step: {tracker} Val Loss: {val_loss.item()}")


mlflow.end_run()

## Saving the Model

In [None]:
model_name = MODEL_NAME

In [None]:
torch.save(model.state_dict(), f'models/{model_name}')

## Load Model

In [None]:
model = CharModel(
    all_chars=all_characters,
    num_hidden=512,
    num_layers=3,
    drop_prob=0.5,
    use_gpu=False
)

In [None]:
model.load_state_dict(torch.load(f'models/{model_name}'))
model.eval()

# Generating Predictions

--------

In [None]:
def predict_next_char(model, char, hidden=None, k=1):
    
        encoded_text = model.encoder[char]
        encoded_text = np.array([[encoded_text]])
        encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))
        inputs = torch.from_numpy(encoded_text)
    
        if(model.use_gpu):
            inputs = inputs.cuda()  

        hidden = tuple([state.data for state in hidden])
        lstm_out, hidden = model(inputs, hidden)        
        probs = F.softmax(lstm_out, dim=1).data
    
        if(model.use_gpu):
            probs = probs.cpu()

    # Getting the top 'k' for next char probs
        probs, index_positions = probs.topk(k)        
        index_positions = index_positions.numpy().squeeze()
        probs = probs.numpy().flatten()
        probs = probs/probs.sum()
        char = np.random.choice(index_positions, p=probs)    
    
        return model.decoder[char], hidden

In [None]:
def generate_text(model, size, seed='The', k=1):
    
    if(model.use_gpu):
        model.cuda()
    else:
        model.cpu()
        
    model.eval()
    output_chars = [c for c in seed]
    hidden = model.hidden_state(1)
    
    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)

    output_chars.append(char)
    for i in range(size):
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        output_chars.append(char)
        
    return ''.join(output_chars)

#### Generating a text with 1000 chars starting with word 'Confidence'

In [None]:
print(generate_text(model, 1000, seed='Confidence ', k=3))

#### Generating a text with 1000 chars starting with word 'Love'

In [None]:
print(generate_text(model, 1000, seed='Love ', k=3))