In [106]:
import math
import torch 
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from collections import Counter 
from dataclasses import dataclass

In [97]:
DEVICE ="cuda" if torch.cuda.is_available() else "cpu"
NEPOCHS = 20
EVERY = 5
LR = 0.0001

## 1 : Processing

In [23]:
def process_data():
    """
    Process text from multiple files, replacing specific characters and combining the content.

    This function reads three text files, applies character replacements to the first file
    (modifying quotes and apostrophes), and concatenates the contents of all three files into a single string.

    The transformations include:
    - Replacing double quotes with curly quotes based on context.
    - Replacing single quotes with curly apostrophes based on surrounding characters.

    Returns:
        str: The combined and processed text from all three files.
    """
    # Read the first text file and process the quotes and apostrophes
    with open("data/generation/OldManAndSea.txt", "r", encoding='utf-8-sig') as f:
        text = f.read()

    text = list(text)  # Convert text to a list for easier character manipulation
    for i in range(len(text)):
        if text[i] == '"':
            # If followed by a space or newline, replace with closing curly quote
            if text[i + 1] == ' ' or text[i + 1] == '\n':
                text[i] = '”'
            # If not followed by a space or newline, replace with opening curly quote
            else:
                text[i] = '“'
        if text[i] == "'":
            # If preceded by a non-space or non-newline character, replace with closing curly apostrophe
            if text[i - 1] != ' ' and text[i - 1] != '\n':
                text[i] = '’'
    
    text = "".join(text)  # Convert the list back into a string

    # Read the second and third text files
    with open("data/generation/ToWhomTheBellTolls.txt", "r", encoding='utf-8-sig') as f:
        text1 = f.read()

    with open("data/generation/FarewellToArms.txt", "r", encoding='utf-8-sig') as f:
        text2 = f.read()

    # Combine the processed text from all three files
    combined_text = f"{text} {text1} {text2}"
    
    return combined_text

In [93]:
def build_vocab(text):
    """
    Build a word-to-integer and integer-to-word vocabulary from the input text.

    This function processes the input text by:
    - Converting it to lowercase.
    - Adding spaces around punctuation.
    - Tokenizing the text into words.
    - Counting word frequencies.
    - Creating a mapping of words to integers and vice versa.

    An 'UNK' token is added to represent unknown words.

    Args:
        text (str): The input text to be processed.

    Returns:
        tuple: A tuple containing two lists and two dictionaries:
            - punctuations (list): a list of all the punctuations.
            - tokens (list): a list of all the tokens.
            - word_to_int (dict): Mapping from words to their corresponding integer indices.
            - int_to_word (dict): Mapping from integer indices to their corresponding words.
    """
    # Convert text to lowercase and replace newlines with spaces
    text = text.lower().replace("\n", " ")
    
    # Identify all unique characters in the text and extract punctuations
    unique_chars = set(text)
    punctuations = [char for char in unique_chars if not char.isalnum()]
    
    # Add spaces around each punctuation in the text
    for punctuation in punctuations:
        text = text.replace(punctuation, f" {punctuation} ")

    # Tokenize the text into words (splitting by spaces)
    tokens = text.split()

    # Count occurrences of each word in the tokenized text
    word_frequencies = Counter(tokens)

    # Sort words by their frequency in descending order
    sorted_words = sorted(word_frequencies, key=word_frequencies.get, reverse=True)

    # Add the 'UNK' (unknown) token to handle unseen words
    sorted_words.append("UNK")

    # Create a mapping from words to integers and from integers to words
    word_to_int = {word: idx for idx, word in enumerate(sorted_words)}
    int_to_word = {idx: word for word, idx in word_to_int.items()}

    return punctuations, tokens, word_to_int, int_to_word

In [94]:
text = process_data()
punctuations, tokens, word_to_int, int_to_word = build_vocab(text)

## 2 : Batches

In [37]:
def create_batches(tokens, word_to_int, seq_len=128, batch_size=32):
    """
    Create batches of input-output pairs for training a language model.

    This function converts a list of tokens into their corresponding integer representations
    using the `word_to_int` mapping, and then constructs input-output pairs where:
    - The input (`x`) is a sequence of `seq_len` consecutive words.
    - The output (`y`) is the same sequence, shifted by one word.

    These pairs are batched into tensors and returned as a DataLoader for easier training.

    Args:
        tokens (list of str): A list of word tokens.
        word_to_int (dict): A dictionary mapping words to their corresponding integer indices.
        seq_len (int, optional): The length of each input sequence. Defaults to 128.
        batch_size (int, optional): The number of sequences per batch. Defaults to 32.

    Returns:
        DataLoader: A DataLoader object containing batches of input-output tensor pairs.
    """
    # Convert the tokens into their corresponding integer indices
    token_indices = [word_to_int[word] for word in tokens]

    # Prepare input-output pairs for each sequence
    input_output_pairs = []
    for i in range(0, len(token_indices) - seq_len):
        input_seq = token_indices[i:i + seq_len]
        target_seq = token_indices[i + 1:i + seq_len + 1]
        input_output_pairs.append((torch.tensor(input_seq), torch.tensor(target_seq)))

    # Create a DataLoader with the given batch size, shuffling the data
    data_loader = DataLoader(input_output_pairs, batch_size=batch_size, shuffle=True)
    
    return data_loader

In [38]:
loader = create_batches(tokens, word_to_int)

## 3 : GPT Model

In [107]:
@dataclass
class ModelConfig:
    """
    Configuration class for a Transformer-based language model.

    This class defines the architecture and hyperparameters for the model,
    such as the number of layers, heads, embedding size, vocabulary size, 
    sequence length, and dropout rates. These parameters can be used to 
    initialize and fine-tune the model.

    Attributes:
        vocab_size (int): Size of the vocabulary (number of unique tokens).
        num_layers (int): Number of layers in the transformer model.
        num_heads (int): Number of attention heads per layer.
        embedding_dim (int): Dimensionality of the embedding vectors.
        max_sequence_length (int): Maximum length of the input sequences.
        embedding_dropout (float): Dropout rate for the embedding layer.
        residual_dropout (float): Dropout rate for residual connections.
        attention_dropout (float): Dropout rate for attention weights.
    """
    
    vocab_size: int
    num_layers: int = 3
    num_heads: int = 4
    embedding_dim: int = 256
    max_sequence_length: int = 128
    embedding_dropout: float = 0.1
    residual_dropout: float = 0.1
    attention_dropout: float = 0.1

In [108]:
vocab_size = len(word_to_int)
config = ModelConfig(vocab_size)

#### 3.1 : Causal Self-Attention Model

In [58]:
class CausalSelfAttention(nn.Module):
    """
    Implements causal self-attention for transformer models.

    This class defines the operations for self-attention, which includes:
    - Linear projections to obtain query, key, and value vectors.
    - Scaled dot-product attention with a causal mask to ensure that future tokens
      in the sequence do not influence predictions of the current token.
    - Dropout layers applied to the attention and output projections.

    Attributes:
        query_key_value_proj (nn.Linear): Linear layer to compute query, key, and value vectors.
        output_proj (nn.Linear): Linear layer to project the output back to the embedding dimension.
        attention_dropout (nn.Dropout): Dropout applied to attention weights.
        residual_dropout (nn.Dropout): Dropout applied to the residual connection.
        bias_mask (torch.Tensor): Causal mask to prevent attending to future tokens.
        num_heads (int): Number of attention heads.
        embedding_dim (int): Dimensionality of the input embeddings.
    """

    def __init__(self, config):
        """
        Initializes the CausalSelfAttention module with the specified configuration.

        Args:
            config (ModelConfig): Configuration object containing model parameters.
        """
        super().__init__()
        self.query_key_value_proj = nn.Linear(config.embedding_dim, 3 * config.embedding_dim)
        self.output_proj = nn.Linear(config.embedding_dim, config.embedding_dim)
        self.attention_dropout = nn.Dropout(config.attention_dropout)
        self.residual_dropout = nn.Dropout(config.residual_dropout)

        # Create a causal mask for self-attention (upper triangular mask to prevent attending to future tokens)
        self.register_buffer(
            "bias_mask", 
            torch.tril(torch.ones(config.max_sequence_length, config.max_sequence_length))
                 .view(1, 1, config.max_sequence_length, config.max_sequence_length)
        )

        self.num_heads = config.num_heads
        self.embedding_dim = config.embedding_dim

    def forward(self, x):
        """
        Forward pass of the causal self-attention mechanism.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, sequence_length, embedding_dim).

        Returns:
            torch.Tensor: Output tensor of the same shape as the input.
        """
        batch_size, sequence_length, embedding_dim = x.size()

        # Compute query, key, and value vectors from input
        query, key, value = self.query_key_value_proj(x).split(self.embedding_dim, dim=2)

        # Reshape query, key, and value for multi-head attention
        head_size = embedding_dim // self.num_heads
        query = query.view(batch_size, sequence_length, self.num_heads, head_size).transpose(1, 2)
        key = key.view(batch_size, sequence_length, self.num_heads, head_size).transpose(1, 2)
        value = value.view(batch_size, sequence_length, self.num_heads, head_size).transpose(1, 2)

        # Scaled dot-product attention
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(head_size)
        
        # Apply causal mask to prevent attention to future tokens
        attention_scores = attention_scores.masked_fill(self.bias_mask[:, :, :sequence_length, :sequence_length] == 0, float('-inf'))
        
        # Compute attention weights
        attention_weights = F.softmax(attention_scores, dim=-1)
        attention_weights = self.attention_dropout(attention_weights)

        # Compute the weighted sum of value vectors
        attention_output = attention_weights @ value
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, sequence_length, embedding_dim)

        # Apply output projection and residual dropout
        output = self.residual_dropout(self.output_proj(attention_output))

        return output

#### 3.2 GPT Block

In [59]:
class TransformerBlock(nn.Module):
    """
    A single transformer block consisting of layer normalization, causal self-attention,
    and a feed-forward multi-layer perceptron (MLP).

    This block forms the core of a transformer architecture, combining self-attention 
    and a feed-forward network with residual connections and normalization.

    Attributes:
        layer_norm_1 (nn.LayerNorm): First layer normalization before the attention layer.
        attention (CausalSelfAttention): Causal self-attention layer.
        layer_norm_2 (nn.LayerNorm): Second layer normalization before the MLP.
        mlp (nn.Sequential): Feed-forward network with GELU activation and dropout.
    """

    def __init__(self, config):
        """
        Initializes the transformer block with the specified configuration.

        Args:
            config (ModelConfig): Configuration object containing model parameters.
        """
        super().__init__()
        
        # First layer normalization before self-attention
        self.layer_norm_1 = nn.LayerNorm(config.embedding_dim)
        
        # Causal self-attention module
        self.attention = CausalSelfAttention(config)
        
        # Second layer normalization before the MLP
        self.layer_norm_2 = nn.LayerNorm(config.embedding_dim)
        
        # Feed-forward network (MLP) with activation and dropout
        self.mlp = nn.Sequential(
            nn.Linear(config.embedding_dim, 4 * config.embedding_dim), 
            nn.GELU(),  
            nn.Linear(4 * config.embedding_dim, config.embedding_dim),
            nn.Dropout(config.residual_dropout)  
        )

    def forward(self, x):
        """
        Forward pass through the transformer block.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, sequence_length, embedding_dim).

        Returns:
            torch.Tensor: Output tensor of the same shape as the input.
        """
        # Apply layer normalization and self-attention with residual connection
        x = x + self.attention(self.layer_norm_1(x))
        
        # Apply layer normalization and MLP with residual connection
        x = x + self.mlp(self.layer_norm_2(x))
        
        return x

In [60]:
class TransformerLanguageModel(nn.Module):
    """
    A transformer-based language model for autoregressive text generation.

    This model is composed of multiple transformer blocks, each consisting of self-attention
    and feed-forward networks. Positional embeddings are added to token embeddings to retain
    the order of the input sequence. The model can be used for next-token prediction.

    Attributes:
        max_sequence_length (int): Maximum length of the input sequences.
        transformer (nn.ModuleDict): Dictionary containing the embedding layers, transformer blocks, 
                                     and the final layer normalization.
        lm_head (nn.Linear): Linear layer for mapping transformer outputs to the vocabulary size.
    """

    def __init__(self, config):
        """
        Initializes the transformer language model with the specified configuration.

        Args:
            config (ModelConfig): Configuration object containing model parameters.
        """
        super().__init__()

        self.max_sequence_length = config.max_sequence_length
        
        # Transformer components
        self.transformer = nn.ModuleDict({
            "token_embedding": nn.Embedding(config.vocab_size, config.embedding_dim),  
            "position_embedding": nn.Embedding(config.max_sequence_length, config.embedding_dim),  
            "dropout": nn.Dropout(config.embedding_dropout), 
            "transformer_blocks": nn.ModuleList([TransformerBlock(config) 
                                                 for _ in range(config.num_layers)]),
            "layer_norm": nn.LayerNorm(config.embedding_dim)  
        })
        
        # Output head: project the transformer output back to the vocabulary size for token prediction
        self.lm_head = nn.Linear(config.embedding_dim, config.vocab_size, bias=False)
        
        # Custom initialization for projection layers in the MLP of each block
        self._initialize_parameters(config)

    def _initialize_parameters(self, config):
        """
        Custom initialization for the projection weights in the model.

        Args:
            config (ModelConfig): Configuration object containing model parameters.
        """
        for param_name, param in self.named_parameters():
            if param_name.endswith('c_proj.weight'):
                torch.nn.init.normal_(param, mean=0.0, std=0.02 / math.sqrt(2 * config.num_layers))

    def forward(self, input_indices, targets=None):
        """
        Forward pass through the transformer model.

        Args:
            input_indices (torch.Tensor): Input tensor of shape (batch_size, sequence_length) containing token indices.
            targets (torch.Tensor, optional): Target tensor for calculating loss (if any). Default is None.

        Returns:
            torch.Tensor: Logits of shape (batch_size, sequence_length, vocab_size) representing the next-token predictions.
        """
        batch_size, sequence_length = input_indices.size()

        # Generate position indices for positional embeddings
        position_indices = torch.arange(0, sequence_length, dtype=torch.long).unsqueeze(0).to(input_indices.device)

        # Token and position embeddings
        token_embeddings = self.transformer["token_embedding"](input_indices)
        position_embeddings = self.transformer["position_embedding"](position_indices)

        # Combine token and positional embeddings, then apply dropout
        x = self.transformer["dropout"](token_embeddings + position_embeddings)

        # Pass through each transformer block
        for block in self.transformer["transformer_blocks"]:
            x = block(x)

        # Final layer normalization
        x = self.transformer["layer_norm"](x)

        # Output logits for vocabulary prediction
        logits = self.lm_head(x)

        return logits

In [63]:
model=TransformerLanguageModel(config)
model.to(DEVICE)
num=sum(p.numel() for p in model.transformer.parameters())
print(f"Number of parameters: {num/1e6:.2f} M")

Number of parameters: 5.12 M


## 4 : Training

In [96]:
def train_model(model, data_loader, num_epochs, learning_rate, save_frequency):
    """
    Train a given model using the specified data loader.

    This function performs the training loop for a specified number of epochs,
    calculating the loss and updating the model parameters using the Adam optimizer.
    The model's state is saved at specified intervals.

    Args:
        model (nn.Module): The model to be trained.
        data_loader (DataLoader): DataLoader providing batches of training data.
        num_epochs (int): The number of epochs to train the model.
        learning_rate (float): The learning rate for the optimizer.
        save_frequency (int): Frequency of saving the model state (in epochs).

    Returns:
        None
    """
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    loss_function = nn.CrossEntropyLoss()
    model.train()  

    for epoch in range(1, num_epochs + 1):
        total_loss = 0.0
        for batch_idx, (inputs, targets) in enumerate(data_loader):
            inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)

            # Forward pass
            outputs = model(inputs)

            # Compute loss
            loss = loss_function(outputs.view(-1, outputs.size(-1)), targets.view(-1))

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            total_loss += loss.item()
        
        # Average loss for the epoch
        average_loss = total_loss / (batch_idx + 1)
        print(f'Epoch {epoch} loss: {average_loss:.3f}')

        # Save model state if necessary
        if epoch % save_frequency == 0:
            torch.save(model.state_dict(), f'models/generation/GPT_{epoch}.pth')

In [99]:
train_model(model, loader, NEPOCHS, LR, EVERY)

Epoch 1 loss: 2.129
Epoch 2 loss: 1.770
Epoch 3 loss: 1.512
Epoch 4 loss: 1.316
Epoch 5 loss: 1.164
Epoch 6 loss: 1.044
Epoch 7 loss: 0.945
Epoch 8 loss: 0.865
Epoch 9 loss: 0.800
Epoch 10 loss: 0.746
Epoch 11 loss: 0.700
Epoch 12 loss: 0.662
Epoch 13 loss: 0.629
Epoch 14 loss: 0.600
Epoch 15 loss: 0.575
Epoch 16 loss: 0.553
Epoch 17 loss: 0.533
Epoch 18 loss: 0.516
Epoch 19 loss: 0.500
Epoch 20 loss: 0.485


## 5 : Prediction

In [100]:
def generate_samples(config, input_indices, model, max_new_tokens, 
                     temperature=1.0, top_k=None):
    """
    Generate new token samples from the language model based on a given input sequence.

    This function iteratively predicts the next token(s) using the model and appends them
    to the input sequence. It allows for the adjustment of the sampling strategy through
    temperature scaling and top-k filtering.

    Args:
        config (ModelConfig): Configuration object containing model parameters.
        input_indices (torch.Tensor): Input tensor of shape (batch_size, sequence_length) containing token indices.
        model (nn.Module): The language model used for generating samples.
        max_new_tokens (int): Maximum number of new tokens to generate.
        temperature (float, optional): Controls the randomness of predictions. Default is 1.0.
        top_k (int, optional): If specified, only the top k tokens will be considered for sampling. Default is None.

    Returns:
        torch.Tensor: A tensor of shape (batch_size, new_sequence_length) containing the newly generated tokens.
    """
    model.eval()
    original_length = input_indices.size(1)  # Keep track of the original length of the input indices

    for _ in range(max_new_tokens):
        # Trim the input if it exceeds the maximum allowed length
        if input_indices.size(1) <= config.max_sequence_length:
            conditional_indices = input_indices
        else:
            conditional_indices = input_indices[:, -config.max_sequence_length:]

        # Predict the logits for the next token
        logits = model(conditional_indices.to(DEVICE))

        # Get logits for the last token in the sequence and apply temperature scaling
        logits = logits[:, -1, :] / temperature

        # Apply top-k filtering if specified
        if top_k is not None:
            top_values, _ = torch.topk(logits, top_k)
            logits[logits < top_values[:, [-1]]] = -float('Inf')

        # Convert logits to probabilities
        probabilities = F.softmax(logits, dim=-1)

        # Sample the next token from the distribution
        next_token = torch.multinomial(probabilities, num_samples=1)
        
        # Append the sampled token to the input indices
        input_indices = torch.cat((input_indices, next_token.cpu()), dim=1)

    # Return only the newly generated tokens
    return input_indices[:, original_length:]

def generate_text(config, prompt, model, max_new_tokens, punctuation_marks, word_to_int, 
                  temperature=1.0, top_k=None):
    """
    Generate text based on a given prompt using a language model.

    This function takes an initial prompt, tokenizes it, and generates a specified number of new tokens
    using the model. The generated tokens are then formatted and returned as a complete text string.

    Args:
        config (ModelConfig): Configuration object containing model parameters.
        prompt (str): The initial text prompt to generate text from.
        model (nn.Module): The language model used for generating new tokens.
        max_new_tokens (int): Maximum number of new tokens to generate.
        punctuation_marks (list): List of punctuation marks to format in the output text.
        word_to_int (dict): Mapping from words to their corresponding integer indices.
        temperature (float, optional): Controls the randomness of predictions. Default is 1.0.
        top_k (int, optional): If specified, only the top k tokens will be considered for sampling. Default is None.

    Returns:
        str: The generated text, combining the prompt and the newly generated tokens.
    """
    assert len(prompt) > 0, "Prompt must contain at least one token"

    # Preprocess the prompt: lowercase and replace newlines
    processed_text = prompt.lower().replace("\n", " ")
    
    # Add spaces around punctuation marks for tokenization
    for punctuation in punctuation_marks:
        processed_text = processed_text.replace(f"{punctuation}", f" {punctuation} ")
    
    # Tokenize the processed text
    tokenized_text = processed_text.split() 
    token_indices = [word_to_int.get(word, word_to_int["UNK"]) for word in tokenized_text]
    token_tensor = torch.LongTensor(token_indices).unsqueeze(0)

    # Generate new tokens based on the prompt
    generated_indices = generate_samples(config, token_tensor, model, max_new_tokens, temperature, top_k)
    
    # Convert generated indices back to text
    generated_tokens = [int_to_word[i] for i in generated_indices.squeeze().numpy()]
    generated_text = " ".join(generated_tokens)

    # Format the generated text to remove unwanted spaces around punctuation
    for punctuation in '''”).:;!?,-‘’''':
        generated_text = generated_text.replace(f" {punctuation}", f"{punctuation}")
    for punctuation in '''“(-‘’''':
        generated_text = generated_text.replace(f"{punctuation} ", f"{punctuation}")
    
    return prompt + " " + generated_text

In [103]:
%%capture
trained_weights=torch.load(f"models/generation/GPT_20.pth", map_location=DEVICE)
model.load_state_dict(trained_weights)
model.eval()

In [105]:
prompt="UNK"
generate_text(config, prompt, model, max_new_tokens=200, 
              punctuation_marks=punctuations, word_to_int=word_to_int)[4:]

', over his head. “he’s been around a long line.” it showed bright in the water that ran across his shoulders rose slowly over the hole behind his shoulders. he was touching its tail-gate of the water splashing down from the sudden eruption of a geyser. the deaf man shook his head at robert jordan and grinned in delight. he continued to shake his head happily as pilar went on vilifying and robert jordan knew that it was all right again now. finally she stopped cursing, reached for the water jug, tipped it up and took a drink and said, calmly, “then just shut up about what we are to do afterwards, will you, inglés? you go back to the republic and you take your piece with you and leave us others alone here to decide what part of these hills we’ll die in.” “live in,” el sordo said. he had put his hand in the deep voice up to the table. “live thee,” pilar said. “'