# Torch

From this notebook, I am going to use torch to build transformer.

But I will build the tokenizer, embedding, self attention, positional encoding, decoder and so on without using pre-define object.

I will build on my own.

In [1]:
import torch
import torch.nn as nn
from collections import Counter
from typing import List, Dict, Union, Optional, Any
from torch import Tensor
import math
class PyTorchTokenizer(nn.Module):
    def __init__(self, max_vocab_size: int = 10000):
        super().__init__()
        
        # Special token indices
        self.PAD_IDX: int = 0
        self.UNK_IDX: int = 1
        self.SOS_IDX: int = 2
        self.EOS_IDX: int = 3
        
        # Tokenizer attributes
        self.max_vocab_size: int = max_vocab_size
        self.word_to_index: Dict[str, int] = {}
        self.index_to_word: Dict[int, str] = {}
        
        # Special tokens
        special_tokens: List[str] = ['<PAD>', '<UNK>', '<SOS>', '<EOS>']
        for idx, token in enumerate(special_tokens):
            self.word_to_index[token] = idx
            self.index_to_word[idx] = token
        
        # Track vocabulary size
        self.vocab_size: int = len(special_tokens)
    
    def fit_on_texts(self, texts: List[str]) -> None:
        """Build vocabulary from input texts"""
        # Tokenize and count word frequencies
        words: List[str] = [word for text in texts for word in text.split()]
        word_counts: Counter = Counter(words)
        
        # Sort words by frequency, descending order
        sorted_words: List[tuple] = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
        
        # Add most frequent words to vocabulary
        for word, _ in sorted_words:
            if word not in self.word_to_index and self.vocab_size < self.max_vocab_size:
                self.word_to_index[word] = self.vocab_size
                self.index_to_word[self.vocab_size] = word
                self.vocab_size += 1
    
    def texts_to_sequences(
        self, 
        texts: List[str], 
        add_sos_eos: bool = True, 
        max_length: Optional[int] = None
    ) -> torch.Tensor:
        """
        Convert texts to tensor or list of indices
        
        Args:
            texts: List of sentences
            add_sos_eos: Add start/end tokens
            padding: Pad sequences to same length
            max_length: Maximum sequence length
        
        Returns:
            Padded sequence tensor or list of sequences
        """
        # Convert to indices
        sequences: List[List[int]] = []
        for text in texts:
            tokens: List[str] = text.split()
            
            # Add special tokens if requested
            if add_sos_eos:
                tokens = ['<SOS>'] + tokens + ['<EOS>']
            
            # Convert to indices
            sequence: List[int] = [self.word_to_index.get(word, self.UNK_IDX) for word in tokens]
            sequences.append(sequence)
        
        # Determine max length
        if max_length is None:
            max_length = max(len(seq) for seq in sequences)
        
        # Pad sequences
        
        padded_sequences: List[List[int]] = []
        for seq in sequences:
            # Truncate or pad
            seq = seq[:max_length]
            seq = seq + [self.PAD_IDX] * (max_length - len(seq))
            padded_sequences.append(seq)
        
        return torch.tensor(padded_sequences, dtype=torch.long)

    
    def sequences_to_texts(self, sequences: torch.Tensor) -> List[str]:
        """Convert sequences back to texts"""
        texts: List[str] = []
        for sequence in sequences:
            # Convert indices to words; .item to get from torch
            words: List[str] = [self.index_to_word.get(idx.item(), '<UNK>') for idx in sequence]
            # Remove special tokens and padding
            words = [w for w in words if w not in ['<PAD>', '<SOS>', '<EOS>']]
            texts.append(' '.join(words))
        return texts

    # Sample texts
texts: List[str] = [
    "hello world",
    "machine learning is awesome",
    "pytorch is great for deep learning"
]

# Create tokenizer
tokenizer: PyTorchTokenizer = PyTorchTokenizer(max_vocab_size=20)

# Fit on texts
tokenizer.fit_on_texts(texts)

# Convert to sequences
sequences: torch.Tensor = tokenizer.texts_to_sequences(texts)

print("Sequences shape:", sequences.shape)
print("Vocab size:", tokenizer.vocab_size)

# Convert back to texts
reconstructed_texts: List[str] = tokenizer.sequences_to_texts(sequences)
print("\nReconstructed texts:")
for original, reconstructed in zip(texts, reconstructed_texts):
    print(f"Original:     {original}")
    print(f"Reconstructed: {reconstructed}\n")

Sequences shape: torch.Size([3, 8])
Vocab size: 14

Reconstructed texts:
Original:     hello world
Reconstructed: hello world

Original:     machine learning is awesome
Reconstructed: machine learning is awesome

Original:     pytorch is great for deep learning
Reconstructed: pytorch is great for deep learning



## Embedding

In [2]:
class PyTorchEmbedding(nn.Module):
    def __init__(
        self, 
        token_size: int, 
        d_model: int, 
        padding_idx: Optional[int] = None,
        init_method: str = 'uniform'
    ):
        """
        Custom embedding layer without using nn.Embedding
        
        Args:
            token_size: Number of tokens in vocabulary
            d_model: Embedding dimension
            padding_idx: Index to set to zero
            init_method: Weight initialization method
        """
        super().__init__()
        # Initialize weights based on method
        if init_method == 'uniform':
            self.weights = torch.rand(token_size, d_model) * 2 - 1  # [-1, 1]
        elif init_method == 'normal':
            self.weights = torch.randn(token_size, d_model)
        elif init_method == 'xavier':
            self.weights = torch.nn.init.xavier_uniform_(
                torch.empty(token_size, d_model)
            )
        else:
            raise ValueError(f"Unknown init method: {init_method}")
        
        # Zero out padding index if specified
        if padding_idx is not None:
            self.weights[padding_idx].zero_()
        
        self.token_size = token_size
        self.d_model = d_model
    
    def forward(self, token_sequences: Tensor) -> Tensor:
        """
        Lookup embeddings for given indices
        
        Args:
            token_sequences: Tensor of token indices (# sentence, token_size)
        
        Returns:
            Tensor of embedded tokens
        """
        # Create output tensor
        output = torch.zeros(
            token_sequences.shape[0],  # batch size
            token_sequences.shape[1],  # sequence length 
            self.d_model,      # embedding dimension
            dtype=self.weights.dtype
        )
        print(f'Output shape: {output.shape}')
        
        # Manually lookup embeddings
        for i, sentence in enumerate(token_sequences):
            for j, token_idx in enumerate(sentence):
                output[i, j] = self.weights[token_idx]
        
        return output
    def __call__(self, indices: Tensor) -> Tensor:
        """
        Make the class callable for convenience
        """
        return self.forward(indices)
embedding = PyTorchEmbedding(token_size=20,
                             d_model=6)
outputs = embedding.forward(sequences)
print((f'There are {outputs.shape[0]} sentences/batch size;'),
    (f"{outputs.shape[1]} unique tokens;"),
    f"{outputs.shape[2]} d_model")

Output shape: torch.Size([3, 8, 6])
There are 3 sentences/batch size; 8 unique tokens; 6 d_model


# Positional Encoding

In [3]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_seq_length: int = 5000):
        super().__init__()
        
        # Create positional encoding matrix
        position = torch.arange(max_seq_length).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        
        # Initialize the positional encoding buffer
        pe = torch.zeros(1, max_seq_length, d_model)
        
        # Fill with sinusoidal pattern
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        
        # Register as buffer (not a parameter, but part of the model state)
        self.register_buffer('pe', pe)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Add positional encoding to input embeddings
        
        Args:
            x: Tensor of shape [batch_size, seq_length, d_model]
            
        Returns:
            Tensor with positional encoding added
        """
        # Add positional encoding to input (only up to the sequence length)
        x = x + self.pe[:, :x.size(1), :]
        return x

## Self attention

In [4]:
class PyTorchSelfAttention(nn.Module):
    def __init__(self, num_head: int = 8, d_model: int = 64):
        super().__init__()
        assert (d_model % num_head == 0), 'd_model must be divisible by num_head'
        
        self.num_head = num_head
        self.d_model = d_model
        self.d_head = int(d_model / num_head)
        self.W_q = nn.Parameter(torch.randn(d_model, d_model))
        self.W_v = nn.Parameter(torch.randn(d_model, d_model))
        self.W_k = nn.Parameter(torch.randn(d_model, d_model))
        self.W_o = nn.Parameter(torch.randn(d_model, d_model))

    def split_heads(self, x: torch.Tensor) -> torch.Tensor:
        batch_size = x.shape[0]
        seq_len = x.shape[1]
        return x.reshape(batch_size, self.num_head, seq_len, self.d_head)
    
    def join_heads(self, x: torch.Tensor) -> torch.Tensor:
        batch_size = x.shape[0]
        seq_len = x.shape[2]
        return x.reshape(batch_size, seq_len, self.d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Input x: (batch_size, seq_len, d_model)
        
        # Linear projections
        # Multiply x with weight matrices 
        # q, k, v: (batch_size, seq_len, d_model)
        q = torch.matmul(x, self.W_q)  # (batch_size, seq_len, d_model)
        v = torch.matmul(x, self.W_v)  # (batch_size, seq_len, d_model)
        k = torch.matmul(x, self.W_k)  # (batch_size, seq_len, d_model)
        
        # Split heads
        # Reshape to split model dimension into multiple heads
        # q, k, v: (batch_size, num_heads, seq_len, d_head)
        q = self.split_heads(q)  # Shape: (batch_size, num_heads, seq_len, d_head)
        k = self.split_heads(k)  # Shape: (batch_size, num_heads, seq_len, d_head)
        v = self.split_heads(v)  # Shape: (batch_size, num_heads, seq_len, d_head)
        
        # Compute attention scores
        # Matrix multiplication between query and key transposes
        # q.shape:   (batch_size, num_heads, seq_len, d_head)
        # k.T.shape: (batch_size, num_heads, d_head, seq_len) -> since we want each head to have dim (seq_len,seq_len)
        # Result:    (batch_size, num_heads, seq_len, seq_len)
        attention_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_head)
        
        # Softmax to get attention weights
        # Maintains same shape: (batch_size, num_heads, seq_len, seq_len)
        # softmax on row level
        attention_weights = torch.softmax(attention_scores, dim=-1)
        # Compute weighted sum of values
        # attention_weights: (batch_size, num_heads, seq_len, seq_len)
        # v:                 (batch_size, num_heads, seq_len, d_head)
        # Result:            (batch_size, num_heads, seq_len, d_head)
        head_output = torch.matmul(attention_weights, v)
        
        # Combine heads back to original dimension
        # head_output: (batch_size, num_heads, seq_len, d_head)
        # Result:      (batch_size, seq_len, d_model)
        output = self.join_heads(head_output)
        # Linear projection (batch_size, seq_len, d_model)
        output = torch.matmul(output, self.W_o)
        # Residual connection (batch_size, seq_len, d_model)
        output += x
        return output

In [5]:
test_configs = [
        (2, 10, 64),   # batch_size=2, seq_len=10, d_model=64
        (4, 20, 128),  # batch_size=4, seq_len=20, d_model=128
    ]

for batch_size, seq_len, d_model in test_configs:
    # Create multi-head attention with default 8 heads
    sa = PyTorchSelfAttention(num_head=8, d_model=d_model)
    
    # Create zero tensor input
    x = torch.zeros(batch_size, seq_len, d_model)
    
    # Run forward pass
    output = sa(x)
    
    print(f"\nConfig: batch_size={batch_size}, seq_len={seq_len}, d_model={d_model}")
    print("Input shape:", x.shape)
    
    # Verify intermediate shapes
    q = torch.matmul(x, sa.W_q)
    q_split = sa.split_heads(q)
    print("Q after split shape:", q_split.shape)
    
    # Verifying output shape matches input shape
    print("Output shape:", output.shape)
    assert output.shape == x.shape, "Output shape must match input shape"


Config: batch_size=2, seq_len=10, d_model=64
Input shape: torch.Size([2, 10, 64])
Q after split shape: torch.Size([2, 8, 10, 8])
Output shape: torch.Size([2, 10, 64])

Config: batch_size=4, seq_len=20, d_model=128
Input shape: torch.Size([4, 20, 128])
Q after split shape: torch.Size([4, 8, 20, 16])
Output shape: torch.Size([4, 20, 128])


# Encoder block

In [6]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model: int, num_head: int = 8, 
    d_ff: int = 2048, dropout: float = 0.1):
        super().__init__()
        self.self_attn = PyTorchSelfAttention(num_head=num_head, d_model=d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        
        # Feed-forward network
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        
        # Layer normalization
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        
        # Dropout for regularization
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Add positional encoding
        x = self.pos_encoder(x)
        
        # Self-attention block with residual connection and layer norm
        attn_output = self.self_attn(x)
        x = self.norm1(x + self.dropout(attn_output))
        
        # Feed-forward block with residual connection and layer norm
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        
        return x

In [7]:
test_configs = [
        (2, 10, 64),   # batch_size=2, seq_len=10, d_model=64
        (4, 20, 128),  # batch_size=4, seq_len=20, d_model=128
    ]

for batch_size, seq_len, d_model in test_configs:
    # Create multi-head attention with default 8 heads
    encoder = TransformerEncoderLayer(d_model=d_model,num_head=8)
    
    # Create zero tensor input
    x = torch.zeros(batch_size, seq_len, d_model)
    
    # Run forward pass
    output = encoder(x)
    
    print(f"\nConfig: batch_size={batch_size}, seq_len={seq_len}, d_model={d_model}")
    print("Input shape:", x.shape)
    
    # Verifying output shape matches input shape
    print("Output shape:", output.shape)
    assert output.shape == x.shape, "Output shape must match input shape"


Config: batch_size=2, seq_len=10, d_model=64
Input shape: torch.Size([2, 10, 64])
Output shape: torch.Size([2, 10, 64])

Config: batch_size=4, seq_len=20, d_model=128
Input shape: torch.Size([4, 20, 128])
Output shape: torch.Size([4, 20, 128])
