**Simple Character-Level RNN for Text Generation**

Learning objective: Understand RNN architecture and training loop

Below is the architecture of **Multilayer RNN**

![Multilayer RNN](image.png)


In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:

if torch.cuda.is_available():
    print('GPU is available. Using CUDA.')
    print('GPU Name:', torch.cuda.get_device_name(0))
else:
    print('GPU not found. Using CPU.')


In [None]:
# Hyperparameters
HIDDEN_SIZE = 128
NUM_LAYERS = 2
LEARNING_RATE = 0.0003
NUM_EPOCHS = 200
SEQ_LENGTH = 100

In [None]:
class CharRNN(nn.Module):
    """
    Simple RNN for character-level text generation
    """
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(CharRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # RNN layer: output only the hidden state (h), calculate. It doesn't apply the softmax function, nor does the hidden x output layer
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )

        # Fully connected output layer
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden):
        """
        Forward pass
        x: (batch, seq_len, input_size)
        hidden: (num_layers, batch, hidden_size)
        """
        out, hidden = self.rnn(x, hidden)
        # out: (batch, seq_len, hidden_size)

        # Reshape for FC layer
        out = out.reshape(-1, self.hidden_size)
        out = self.fc(out)

        return out, hidden

    def init_hidden(self, batch_size):
        """Initialize hidden state"""
        return torch.zeros(self.num_layers, batch_size, self.hidden_size)


In [None]:
class TextDataset(Dataset):
    """Dataset for character-level text"""
    def __init__(self, text, seq_length):
        self.text = text
        self.seq_length = seq_length
        self.chars = sorted(list(set(text)))  # extracting characters only (A-Z).
        self.char_to_idx = {ch: i for i, ch in enumerate(self.chars)}  # creating a dictionary/ vocabolary out of it.
        self.idx_to_char = {i: ch for i, ch in enumerate(self.chars)}
        self.vocab_size = len(self.chars)

    def __len__(self):
        return len(self.text) - self.seq_length

    def __getitem__(self, idx):
        """
        Returns input sequence and target (next character)
        """
        seq = self.text[idx:idx+self.seq_length]
        target = self.text[idx+1:idx+self.seq_length+1]

        # Convert to indices
        seq_idx = [self.char_to_idx[ch] for ch in seq]
        target_idx = [self.char_to_idx[ch] for ch in target]

        return torch.tensor(seq_idx), torch.tensor(target_idx)

In [None]:
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [None]:
dataset = TextDataset(text, SEQ_LENGTH)  # return pair of  (in_seq, target_seq).
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, drop_last = True, num_workers= 2, pin_memory=True)  #  returns 32 {batch_size} x pairs of (in,out)

In [None]:
model = CharRNN(
        input_size=dataset.vocab_size,
        hidden_size=HIDDEN_SIZE,
        num_layers=NUM_LAYERS,
        output_size=dataset.vocab_size
    ).to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [None]:
#op_model = torch.load('/content/char_rnn_model.pth')
#model.load_state_dict(op_model)
# If you also want to load the optimizer state, you would need to save it along with the model state
# optimizer.load_state_dict(op_model['optimizer_state_dict']) # This line will still cause an error with current saving method.

In [None]:
loss_arr = []
model.train()
for epoch in range(NUM_EPOCHS):
    total_loss = 0

    for batch_idx, (sequences, targets) in enumerate(dataloader):
        current_batch_size = sequences.shape[0]
        # Initialize hidden state for the current batch and move to device
        hidden = model.init_hidden(current_batch_size).to(device)

        # Move sequences and targets to the device
        sequences = sequences.to(device, non_blocking=True)
        targets = targets.to(device, non_blocking=True)

        # One-hot encode input
        sequences_onehot = torch.nn.functional.one_hot(
            sequences,
            num_classes=dataset.vocab_size
        ).float()

        # Forward pass
        outputs, hidden = model(sequences_onehot, hidden.detach())
        loss = criterion(outputs, targets.view(-1))
        loss_arr.append(loss.item())

        old_weights = {name: param.clone() for name, param in model.named_parameters()}

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        #for name, param in model.named_parameters():
          #if param.grad is not None:
            # Calculate the L2 norm of the gradient
            #grad_norm = param.grad.norm().item()
            #print(f"Layer: {name} | Gradient Norm: {grad_norm:.8f}")
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5)  # Gradient clipping
        optimizer.step()

        total_loss += loss.item()

    if (epoch + 1) % 10 == 0:
        avg_loss = total_loss / len(dataloader)
        print(f'Epoch [{epoch+1}/{NUM_EPOCHS}], Loss: {avg_loss:.4f}')

        for name, param in model.named_parameters():
          weight_change = (param - old_weights[name]).norm().item()
          if weight_change == 0:
            print(f"ALERT: {name} weights did NOT change!")
          else:
            print(f"{name} updated by {weight_change:.8f}")

        # Generate sample
        #sample = generate_text(model, dataset, start_str="The ", length=200)
        #print(f'Generated: {sample}\n')



In [None]:
torch.save(model.state_dict(), 'char_rnn_model.pth')
plt.plot(loss_arr, 'r-')
plt.show()
print('Loss before training', loss_arr[0])
print('Loss after training', loss_arr[-1])

In [None]:
def generate_text(model, dataset, start_str="The ", length=100):
    """Generate text using trained model"""
    model.eval()

    chars = [ch for ch in start_str]
    hidden = model.init_hidden(1).to(device)

    with torch.no_grad():
        for i in range(length):
            # Encode current sequence
            seq_idx = [dataset.char_to_idx[ch] for ch in chars[-SEQ_LENGTH:]]
            seq_tensor = torch.tensor([seq_idx]).to(device)
            seq_onehot = torch.nn.functional.one_hot(
                seq_tensor,
                num_classes=dataset.vocab_size
            ).float()

            # Predict next character
            output, hidden = model(seq_onehot, hidden)

            # Sample from output distribution
            prob = torch.nn.functional.softmax(output[-1], dim=0)
            next_char_idx = torch.multinomial(prob, 1).item()
            next_char = dataset.idx_to_char[next_char_idx]

            chars.append(next_char)

    return ''.join(chars)

In [None]:
# Generate sample
sample = generate_text(model, dataset, start_str="the", length=30)
print(f'Generated: {sample}\n')