In [None]:
text = """“Next character prediction is a fundamental task in the field of natural language processing (NLP) that involves predicting the next character in a sequence of text based on the characters that precede it. This task is essential for various applications, including text auto-completion, spell checking, and even in the development of sophisticated AI models capable of generating human-like text.

At its core, next character prediction relies on statistical models or deep learning algorithms to analyze a given sequence of text and predict which character is most likely to follow. These predictions are based on patterns and relationships learned from large datasets of text during the training phase of the model.

One of the most popular approaches to next character prediction involves the use of Recurrent Neural Networks (RNNs), and more specifically, a variant called Long Short-Term Memory (LSTM) networks. RNNs are particularly well-suited for sequential data like text, as they can maintain information in 'memory' about previous characters to inform the prediction of the next character. LSTM networks enhance this capability by being able to remember long-term dependencies, making them even more effective for next character prediction tasks.

Training a model for next character prediction involves feeding it large amounts of text data, allowing it to learn the probability of each character's appearance following a sequence of characters. During this training process, the model adjusts its parameters to minimize the difference between its predictions and the actual outcomes, thus improving its predictive accuracy over time.

Once trained, the model can be used to predict the next character in a given piece of text by considering the sequence of characters that precede it. This can enhance user experience in text editing software, improve efficiency in coding environments with auto-completion features, and enable more natural interactions with AI-based chatbots and virtual assistants.

In summary, next character prediction plays a crucial role in enhancing the capabilities of various NLP applications, making text-based interactions more efficient, accurate, and human-like. Through the use of advanced machine learning models like RNNs and LSTMs, next character prediction continues to evolve, opening new possibilities for the future of text-based technology.”"""

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import time

# -------------------------
# Sample Data Setup
# -------------------------
# For demonstration, assume a variable "text" is defined.
# Replace the string below with your own text corpus.
text = "This is a sample text for training our character-level RNN models. " \
       "You can replace this with a longer corpus."

# Character-level tokenization
chars = sorted(list(set(text)))
char_to_idx = {ch: i for i, ch in enumerate(chars)}
idx_to_char = {i: ch for i, ch in enumerate(chars)}

# Convert text to indices
encoded_text = [char_to_idx[ch] for ch in text]

# -------------------------
# Data Preparation Functions
# -------------------------
def create_sequences(data, seq_length):
    """
    Creates input sequences of length `seq_length` and corresponding next-character labels.
    """
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    # Convert to PyTorch tensors
    return torch.tensor(X, dtype=torch.long), torch.tensor(y, dtype=torch.long)

# -------------------------
# Model Definition
# -------------------------
class CharRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, model_type='rnn'):
        super(CharRNN, self).__init__()
        self.hidden_size = hidden_size
        self.model_type = model_type.lower()
        self.embedding = nn.Embedding(input_size, hidden_size)
        if self.model_type == 'rnn':
            self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
        elif self.model_type == 'lstm':
            self.rnn = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        elif self.model_type == 'gru':
            self.rnn = nn.GRU(hidden_size, hidden_size, batch_first=True)
        else:
            raise ValueError("Unsupported model type. Choose 'rnn', 'lstm', or 'gru'.")
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden=None):
        # x: [batch, seq_len]
        x = self.embedding(x)  # [batch, seq_len, hidden_size]
        if self.model_type == 'lstm':
            out, (hidden, cell) = self.rnn(x, hidden)
        else:
            out, hidden = self.rnn(x, hidden)
        # Take the output from the final time step
        out = self.fc(out[:, -1, :])
        return out, hidden

# -------------------------
# Evaluation Function
# -------------------------
def evaluate(model, val_data, val_labels):
    """
    Evaluate the model on validation data and return accuracy.
    """
    model.eval()
    with torch.no_grad():
        output, _ = model(val_data)
        _, predicted = torch.max(output, 1)
        correct = (predicted == val_labels).sum().item()
        accuracy = correct / val_labels.size(0)
    return accuracy

# -------------------------
# Training Function
# -------------------------
def train_model(model, train_data, train_labels, val_data, val_labels, epochs=50, lr=0.01):
    """
    Trains the model and returns final loss, total training time, and best validation accuracy.
    """
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    start_time = time.time()
    best_val_acc = 0.0

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output, hidden = model(train_data)
        loss = criterion(output, train_labels)
        loss.backward()
        optimizer.step()

        # Evaluate on validation set
        val_acc = evaluate(model, val_data, val_labels)
        if val_acc > best_val_acc:
            best_val_acc = val_acc

        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1:>2}: Loss = {loss.item():.4f}, Val Acc = {val_acc:.4f}')
    end_time = time.time()
    training_time = end_time - start_time
    return loss.item(), training_time, best_val_acc

# -------------------------
# Main Experiment Loop
# -------------------------
# Define sequence lengths and model types to experiment with
seq_lengths = [10, 20, 30]
model_types = ['rnn', 'lstm', 'gru']
input_size = len(chars)
hidden_size = 128
output_size = len(chars)

# Dictionary to store results
results = {}

for seq_len in seq_lengths:
    # Create dataset for the current sequence length
    X, y = create_sequences(encoded_text, seq_len)

    # Split dataset into training (80%) and validation (20%)
    total_samples = X.size(0)
    split_idx = int(0.8 * total_samples)
    X_train, y_train = X[:split_idx], y[:split_idx]
    X_val, y_val = X[split_idx:], y[split_idx:]

    for mtype in model_types:
        print(f'\nTraining {mtype.upper()} model with sequence length {seq_len}')
        model = CharRNN(input_size, hidden_size, output_size, model_type=mtype)

        # Count the number of trainable parameters
        total_params = sum(p.numel() for p in model.parameters())
        print(f'Number of trainable parameters: {total_params}')

        # Train the model and measure performance
        final_loss, train_time, best_val_acc = train_model(
            model, X_train, y_train, X_val, y_val, epochs=50, lr=0.01
        )
        print(f'Model: {mtype.upper()}, Seq Length: {seq_len}, Final Loss: {final_loss:.4f}, '
              f'Training Time: {train_time:.2f}s, Best Val Acc: {best_val_acc:.4f}')

        # Save results
        results[(mtype, seq_len)] = {
            'final_loss': final_loss,
            'train_time': train_time,
            'best_val_acc': best_val_acc,
            'params': total_params
        }

# -------------------------
# Summary of Results
# -------------------------
print("\nSummary of Results:")
print("Model\tSeqLen\tFinal Loss\tTrain Time (s)\tBest Val Acc\tParams")
for key, val in results.items():
    mtype, seq_len = key
    print(f"{mtype.upper()}\t{seq_len}\t{val['final_loss']:.4f}\t\t{val['train_time']:.2f}\t\t"
          f"{val['best_val_acc']:.4f}\t\t{val['params']}")



Training RNN model with sequence length 10
Number of trainable parameters: 39963
Epoch 10: Loss = 0.0977, Val Acc = 0.0500
Epoch 20: Loss = 0.0044, Val Acc = 0.0500
Epoch 30: Loss = 0.0011, Val Acc = 0.0500
Epoch 40: Loss = 0.0006, Val Acc = 0.0500
Epoch 50: Loss = 0.0004, Val Acc = 0.0500
Model: RNN, Seq Length: 10, Final Loss: 0.0004, Training Time: 0.71s, Best Val Acc: 0.1000

Training LSTM model with sequence length 10
Number of trainable parameters: 139035
Epoch 10: Loss = 0.3333, Val Acc = 0.1000
Epoch 20: Loss = 0.0109, Val Acc = 0.0500
Epoch 30: Loss = 0.0020, Val Acc = 0.1000
Epoch 40: Loss = 0.0009, Val Acc = 0.1000
Epoch 50: Loss = 0.0006, Val Acc = 0.1000
Model: LSTM, Seq Length: 10, Final Loss: 0.0006, Training Time: 1.35s, Best Val Acc: 0.1000

Training GRU model with sequence length 10
Number of trainable parameters: 106011
Epoch 10: Loss = 0.1864, Val Acc = 0.1000
Epoch 20: Loss = 0.0057, Val Acc = 0.1000
Epoch 30: Loss = 0.0012, Val Acc = 0.1000
Epoch 40: Loss = 0.000

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import requests
import time

# ============================
# Data Preparation Functions
# ============================

def download_and_prepare(seq_length):
    """
    Downloads the Tiny Shakespeare dataset, encodes it as integers,
    creates input sequences of length `seq_length` with the next character as target,
    and returns train and test DataLoaders along with the mapping dictionaries.
    """
    # Download text data
    url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
    response = requests.get(url)
    text = response.text

    # Create character mapping
    chars = sorted(list(set(text)))
    char_to_int = {ch: i for i, ch in enumerate(chars)}
    int_to_char = {i: ch for i, ch in enumerate(chars)}

    # Encode the text as integers
    encoded_text = [char_to_int[ch] for ch in text]

    # Create sequences and targets
    sequences = []
    targets = []
    for i in range(0, len(encoded_text) - seq_length):
        seq = encoded_text[i:i+seq_length]
        target = encoded_text[i+seq_length]
        sequences.append(seq)
        targets.append(target)

    sequences = torch.tensor(sequences, dtype=torch.long)
    targets = torch.tensor(targets, dtype=torch.long)

    # Create a Dataset class
    class CharDataset(Dataset):
        def __init__(self, sequences, targets):
            self.sequences = sequences
            self.targets = targets

        def __len__(self):
            return len(self.sequences)

        def __getitem__(self, index):
            return self.sequences[index], self.targets[index]

    dataset = CharDataset(sequences, targets)

    # Split into train and test (80/20 split)
    total_samples = len(dataset)
    train_size = int(0.8 * total_samples)
    test_size = total_samples - train_size
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

    batch_size = 128
    train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size)

    return train_loader, test_loader, char_to_int, int_to_char, len(chars)

# ============================
# Model Definition
# ============================

class CharModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, model_type='lstm', num_layers=1, fc_hidden_size=None):
        """
        model_type: 'lstm' or 'gru'
        num_layers: number of recurrent layers
        fc_hidden_size: if provided, adds an extra FC layer before final output.
        """
        super(CharModel, self).__init__()
        self.model_type = model_type.lower()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_size, hidden_size)
        if self.model_type == 'lstm':
            self.rnn = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers, batch_first=True)
        elif self.model_type == 'gru':
            self.rnn = nn.GRU(hidden_size, hidden_size, num_layers=num_layers, batch_first=True)
        else:
            raise ValueError("Unsupported model type; choose 'lstm' or 'gru'.")

        # Optionally add an extra fully connected hidden layer
        if fc_hidden_size is not None:
            self.fc1 = nn.Linear(hidden_size, fc_hidden_size)
            self.relu = nn.ReLU()
            self.fc2 = nn.Linear(fc_hidden_size, output_size)
        else:
            self.fc1 = None
            self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden=None):
        # x: [batch, seq_length]
        x = self.embedding(x)  # [batch, seq_length, hidden_size]
        if hidden is None:
            # Let PyTorch handle default initial hidden state (zeros)
            pass
        out, hidden = self.rnn(x, hidden)  # out: [batch, seq_length, hidden_size]
        # Use output from last time step
        out = out[:, -1, :]
        if self.fc1 is not None:
            out = self.fc1(out)
            out = self.relu(out)
            out = self.fc2(out)
        else:
            out = self.fc2(out)
        return out, hidden

# ============================
# Training and Evaluation Functions
# ============================

def evaluate(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    loss_total = 0.0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for inputs, targets in data_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs, _ = model(inputs)
            loss = criterion(outputs, targets)
            loss_total += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, dim=1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    avg_loss = loss_total / total
    accuracy = correct / total
    return avg_loss, accuracy

def train_model(model, train_loader, test_loader, device, epochs=10, lr=0.001):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    start_time = time.time()
    best_val_acc = 0.0
    train_losses = []
    val_losses = []

    for epoch in range(1, epochs+1):
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs, _ = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        avg_train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(avg_train_loss)

        val_loss, val_acc = evaluate(model, test_loader, device)
        val_losses.append(val_loss)
        if val_acc > best_val_acc:
            best_val_acc = val_acc
        print(f"Epoch {epoch:2d}: Train Loss = {avg_train_loss:.4f}, Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")

    total_time = time.time() - start_time
    model_params = sum(p.numel() for p in model.parameters())

    # Also measure inference time on one batch
    model.eval()
    with torch.no_grad():
        inputs, _ = next(iter(test_loader))
        inputs = inputs.to(device)
        start_inf = time.time()
        _ , _ = model(inputs)
        inference_time = time.time() - start_inf

    return {
        'final_train_loss': train_losses[-1],
        'final_val_loss': val_losses[-1],
        'best_val_acc': best_val_acc,
        'training_time': total_time,
        'inference_time': inference_time,
        'num_params': model_params,
        'train_losses': train_losses,
        'val_losses': val_losses
    }

def generate_text(model, seed, int_to_char, gen_length=100, device='cpu'):
    """
    Generate text from the model given a seed string.
    """
    model.eval()
    # Convert seed to tensor
    seed_idxs = [ord(c) if c in int_to_char.values() else 0 for c in seed]
    # Alternatively, map seed characters using the inverse mapping (int_to_char)
    # Here we assume seed characters are in our vocabulary
    seed_tensor = torch.tensor([ [char_to_int[ch] for ch in seed] ], dtype=torch.long).to(device)
    generated = seed
    hidden = None
    with torch.no_grad():
        for _ in range(gen_length):
            output, hidden = model(seed_tensor, hidden)
            probs = nn.functional.softmax(output, dim=1).cpu().numpy().flatten()
            char_idx = np.random.choice(len(probs), p=probs)
            generated += int_to_char[char_idx]
            # Append the new char and remove the first character (slide window)
            new_input = torch.cat([seed_tensor[:, 1:], torch.tensor([[char_idx]], dtype=torch.long).to(device)], dim=1)
            seed_tensor = new_input
    return generated

# ============================
# Experiment Configurations
# ============================

# We will run experiments over three sequence lengths:
seq_lengths = [20, 30, 50]

# Model types to compare:
model_types = ['lstm', 'gru']

# Hyperparameter configurations (adjustable)
# Config 1: Smaller model
config1 = {'num_layers': 1, 'hidden_size': 128, 'fc_hidden_size': None, 'lr': 0.001, 'epochs': 10}
# Config 2: Bigger model with extra fully connected layer
config2 = {'num_layers': 2, 'hidden_size': 256, 'fc_hidden_size': 128, 'lr': 0.001, 'epochs': 10}

# We'll compare these two configurations across both model types.
experiments = []
for seq_len in seq_lengths:
    for mtype in model_types:
        for config, config_name in zip([config1, config2], ['Config1', 'Config2']):
            experiments.append({
                'seq_length': seq_len,
                'model_type': mtype,
                'config': config,
                'config_name': config_name
            })

# ============================
# Run Experiments
# ============================

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
results = []

for exp in experiments:
    seq_len = exp['seq_length']
    mtype = exp['model_type']
    config = exp['config']
    config_name = exp['config_name']
    print("\n" + "="*60)
    print(f"Running {mtype.upper()} with sequence length {seq_len} [{config_name}]")

    # Prepare the dataset for this sequence length
    train_loader, test_loader, char_to_int, int_to_char, vocab_size = download_and_prepare(seq_len)

    # Create model with specified hyperparameters
    model = CharModel(
        input_size=vocab_size,
        hidden_size=config['hidden_size'],
        output_size=vocab_size,
        model_type=mtype,
        num_layers=config['num_layers'],
        fc_hidden_size=config['fc_hidden_size']
    )

    # Count parameters
    total_params = sum(p.numel() for p in model.parameters())
    print(f"Total trainable parameters: {total_params}")

    # Train the model
    exp_result = train_model(
        model, train_loader, test_loader, device,
        epochs=config['epochs'],
        lr=config['lr']
    )

    # Optionally, generate a sample text using a short seed from the dataset
    seed_str = "The "  # Adjust as desired
    sample_text = generate_text(model, seed_str, int_to_char, gen_length=100, device=device)

    # Store results
    result = {
        'seq_length': seq_len,
        'model_type': mtype,
        'config_name': config_name,
        'final_train_loss': exp_result['final_train_loss'],
        'final_val_loss': exp_result['final_val_loss'],
        'best_val_acc': exp_result['best_val_acc'],
        'training_time': exp_result['training_time'],
        'inference_time': exp_result['inference_time'],
        'num_params': total_params,
        'sample_text': sample_text
    }
    results.append(result)

    # Print experiment summary
    print(f"--- Summary ---")
    print(f"Sequence Length: {seq_len}")
    print(f"Model Type: {mtype.upper()} | {config_name}")
    print(f"Final Train Loss: {exp_result['final_train_loss']:.4f}")
    print(f"Final Val Loss: {exp_result['final_val_loss']:.4f}")
    print(f"Best Val Accuracy: {exp_result['best_val_acc']:.4f}")
    print(f"Training Time: {exp_result['training_time']:.2f} sec")
    print(f"Inference Time (per batch): {exp_result['inference_time']*1000:.2f} ms")
    print(f"Model Size (params): {total_params}")
    print(f"Sample Output: {sample_text}")

# ============================
# Overall Summary of Experiments
# ============================
print("\n\n=== OVERALL SUMMARY ===")
print("SeqLen\tModel\tConfig\tTrainLoss\tValLoss\tValAcc\tTrainTime(s)\tParams")
for r in results:
    print(f"{r['seq_length']}\t{r['model_type'].upper()}\t{r['config_name']}\t"
          f"{r['final_train_loss']:.4f}\t\t{r['final_val_loss']:.4f}\t{r['best_val_acc']:.4f}\t"
          f"{r['training_time']:.2f}\t\t{r['num_params']}")




Running LSTM with sequence length 20 [Config1]
Total trainable parameters: 148801
Epoch  1: Train Loss = 1.8374, Val Loss = 1.6412, Val Acc = 0.5105
Epoch  2: Train Loss = 1.5817, Val Loss = 1.5531, Val Acc = 0.5323
Epoch  3: Train Loss = 1.5154, Val Loss = 1.5101, Val Acc = 0.5438
Epoch  4: Train Loss = 1.4778, Val Loss = 1.4876, Val Acc = 0.5497
Epoch  5: Train Loss = 1.4532, Val Loss = 1.4687, Val Acc = 0.5547
Epoch  6: Train Loss = 1.4352, Val Loss = 1.4574, Val Acc = 0.5568
Epoch  7: Train Loss = 1.4205, Val Loss = 1.4503, Val Acc = 0.5586
Epoch  8: Train Loss = 1.4087, Val Loss = 1.4410, Val Acc = 0.5602
Epoch  9: Train Loss = 1.3993, Val Loss = 1.4327, Val Acc = 0.5632
Epoch 10: Train Loss = 1.3907, Val Loss = 1.4324, Val Acc = 0.5624
--- Summary ---
Sequence Length: 20
Model Type: LSTM | Config1
Final Train Loss: 1.3907
Final Val Loss: 1.4324
Best Val Accuracy: 0.5632
Training Time: 271.68 sec
Inference Time (per batch): 0.57 ms
Model Size (params): 148801
Sample Output: The m