In [18]:
pip install torch



In [19]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import math
from torch.utils.data import Dataset, DataLoader

# **LEARNABLE POSITIONAL ENCODING CLASS**

In [20]:
class LearnablePositionalEncoding(nn.Module):

    def __init__(self, d_model: int, max_seq_len: int = 512, dropout: float = 0.1):
        """
        Args:
            d_model: Dimension of the model (embedding dimension)
            max_seq_len: Maximum sequence length the model will handle
            dropout: Dropout rate applied after adding positional encoding
        """
        super().__init__()

        self.d_model = d_model
        self.max_seq_len = max_seq_len

        # Learnable position embeddings - each position gets a d_model-dimensional vector
        # Initialized with scaled normal distribution for stable training
        self.pos_embedding = nn.Embedding(max_seq_len, d_model)

        # Initialize with scaled values (common practice)
        nn.init.normal_(self.pos_embedding.weight, mean=0, std=d_model ** -0.5)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Add learnable positional encoding to input embeddings.

        Args:
            x: Input tensor of shape [batch_size, seq_len, d_model]

        Returns:
            Tensor of same shape with positional information added
        """
        batch_size, seq_len, _ = x.shape

        # Verify sequence length is within bounds
        if seq_len > self.max_seq_len:
            raise ValueError(f"Sequence length {seq_len} exceeds max_seq_len {self.max_seq_len}")

        # Create position indices: [0, 1, 2, ..., seq_len-1]
        positions = torch.arange(seq_len, device=x.device)  # [seq_len]

        # Get position embeddings and add to input
        # pos_embedding(positions) -> [seq_len, d_model]
        # Broadcasting adds this to each batch element
        pos_enc = self.pos_embedding(positions)  # [seq_len, d_model]

        # Add positional encoding to input embeddings
        # x: [batch_size, seq_len, d_model] + [seq_len, d_model] -> broadcasts correctly
        output = x + pos_enc.unsqueeze(0)

        return self.dropout(output)

# **ALTERNATIVE: LEARNABLE PE WITH nn.Parameter (More flexible)**

In [21]:
class LearnablePositionalEncodingParam(nn.Module):
    def __init__(self, d_model: int, max_seq_len: int = 512, dropout: float = 0.1):
        super().__init__()

        self.d_model = d_model

        # Create learnable parameters directly
        # Shape: [max_seq_len, d_model]
        self.pos_embedding = nn.Parameter(
            torch.randn(max_seq_len, d_model) * (d_model ** -0.5)
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: [batch_size, seq_len, d_model]
        Returns:
            [batch_size, seq_len, d_model] with positional info
        """
        seq_len = x.size(1)

        # Slice the positional embeddings to match input sequence length
        # This automatically handles variable-length sequences up to max_seq_len
        pos_enc = self.pos_embedding[:seq_len, :]  # [seq_len, d_model]

        return self.dropout(x + pos_enc)

# **SIMPLE SELF-ATTENTION MODULE**

In [22]:
class SimpleSelfAttention(nn.Module):
    """
    Basic multi-head self-attention for demonstration.
    Shows how positional encoding integrates with attention mechanism.
    """

    def __init__(self, d_model: int, n_heads: int, dropout: float = 0.1):
        super().__init__()

        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"

        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads  # Dimension per head

        # Linear projections for Q, K, V
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(dropout)
        self.scale = self.d_k ** -0.5

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        """
        Args:
            x: [batch_size, seq_len, d_model] - input with positional encoding already added
            mask: Optional attention mask
        Returns:
            [batch_size, seq_len, d_model]
        """
        batch_size, seq_len, _ = x.shape

        # Project to Q, K, V
        Q = self.W_q(x)  # [batch, seq_len, d_model]
        K = self.W_k(x)
        V = self.W_v(x)

        # Reshape for multi-head attention: [batch, n_heads, seq_len, d_k]
        Q = Q.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)

        # Compute attention scores: [batch, n_heads, seq_len, seq_len]
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) * self.scale

        # Apply mask if provided
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))

        # Softmax and dropout
        attn_weights = F.softmax(attn_scores, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # Apply attention to values: [batch, n_heads, seq_len, d_k]
        context = torch.matmul(attn_weights, V)

        # Reshape back: [batch, seq_len, d_model]
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)

        # Final projection
        output = self.W_o(context)

        return output

# **TRANSFORMER BLOCK WITH LEARNABLE POSITIONAL ENCODING**

In [23]:
class TransformerBlock(nn.Module):
    """
    Single transformer encoder block with Pre-LN (more stable training).
    """

    def __init__(self, d_model: int, n_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()

        self.attention = SimpleSelfAttention(d_model, n_heads, dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),  # Modern activation, smoother than ReLU
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model),
            nn.Dropout(dropout)
        )

        # Pre-LN: normalize before sublayers (more stable training)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        # Pre-LN Self-Attention with residual
        x = x + self.attention(self.norm1(x), mask)

        # Pre-LN Feed-Forward with residual
        x = x + self.feed_forward(self.norm2(x))

        return x


# **COMPLETE TRANSFORMER MODEL**

In [24]:

class SimpleTransformerClassifier(nn.Module):
    """
    Complete transformer for sequence classification.
    Demonstrates integration of learnable positional encoding.
    """

    def __init__(
        self,
        vocab_size: int,
        d_model: int = 256,
        n_heads: int = 8,
        n_layers: int = 4,
        d_ff: int = 1024,
        max_seq_len: int = 128,
        n_classes: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()

        self.d_model = d_model

        # Token embedding
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        nn.init.normal_(self.token_embedding.weight, mean=0, std=d_model ** -0.5)

        # LEARNABLE POSITIONAL ENCODING - the key component
        self.positional_encoding = LearnablePositionalEncoding(
            d_model=d_model,
            max_seq_len=max_seq_len,
            dropout=dropout
        )

        # Stack of transformer blocks
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(d_model, n_heads, d_ff, dropout)
            for _ in range(n_layers)
        ])

        # Final normalization and classification head
        self.final_norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, n_classes)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
        """
        Args:
            x: [batch_size, seq_len] - token indices
            mask: Optional padding mask
        Returns:
            [batch_size, n_classes] - class logits
        """
        # Token embedding with scaling (from original Transformer paper)
        x = self.token_embedding(x) * math.sqrt(self.d_model)

        # Add learnable positional encoding
        x = self.positional_encoding(x)

        # Pass through transformer blocks
        for block in self.transformer_blocks:
            x = block(x, mask)

        # Final normalization
        x = self.final_norm(x)

        # Global average pooling over sequence dimension
        # Alternative: use [CLS] token like BERT
        x = x.mean(dim=1)  # [batch_size, d_model]

        # Classification
        logits = self.classifier(x)  # [batch_size, n_classes]

        return logits

# **DUMMY DATASET FOR DEMONSTRATION**

In [25]:
class DummySequenceDataset(Dataset):
    """
    Synthetic dataset for demonstration.

    Task: Classify sequences based on whether they contain more
    tokens from the first half of vocabulary (class 0) or second half (class 1).

    This requires the model to understand token distributions across positions,
    making positional encoding important for learning patterns.
    """

    def __init__(self, vocab_size: int, seq_len: int, num_samples: int):
        self.vocab_size = vocab_size
        self.seq_len = seq_len
        self.num_samples = num_samples

        # Generate data
        self.data = []
        self.labels = []

        for _ in range(num_samples):
            # Randomly choose class
            label = torch.randint(0, 2, (1,)).item()

            if label == 0:
                # Class 0: More tokens from first half of vocabulary
                # with some position-dependent patterns
                seq = torch.randint(1, vocab_size // 2, (seq_len,))
                # Add some noise
                noise_positions = torch.randint(0, seq_len, (seq_len // 4,))
                seq[noise_positions] = torch.randint(vocab_size // 2, vocab_size, (len(noise_positions),))
            else:
                # Class 1: More tokens from second half
                seq = torch.randint(vocab_size // 2, vocab_size, (seq_len,))
                # Add some noise
                noise_positions = torch.randint(0, seq_len, (seq_len // 4,))
                seq[noise_positions] = torch.randint(1, vocab_size // 2, (len(noise_positions),))

            # Add position-dependent signal: early positions matter more for class
            if label == 0:
                seq[:seq_len // 4] = torch.randint(1, vocab_size // 4, (seq_len // 4,))
            else:
                seq[:seq_len // 4] = torch.randint(3 * vocab_size // 4, vocab_size, (seq_len // 4,))

            self.data.append(seq)
            self.labels.append(label)

        self.data = torch.stack(self.data)
        self.labels = torch.tensor(self.labels)

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# **Training Function**

In [26]:
def train_transformer():
    """
    Complete training loop demonstrating learnable positional encoding.
    """
    # Configuration
    VOCAB_SIZE = 1000
    D_MODEL = 128
    N_HEADS = 4
    N_LAYERS = 3
    D_FF = 512
    MAX_SEQ_LEN = 64
    N_CLASSES = 2
    DROPOUT = 0.1

    BATCH_SIZE = 32
    NUM_EPOCHS = 20
    LEARNING_RATE = 3e-4

    TRAIN_SAMPLES = 2000
    VAL_SAMPLES = 500

    # Device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Create datasets
    train_dataset = DummySequenceDataset(VOCAB_SIZE, MAX_SEQ_LEN, TRAIN_SAMPLES)
    val_dataset = DummySequenceDataset(VOCAB_SIZE, MAX_SEQ_LEN, VAL_SAMPLES)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

    # Create model
    model = SimpleTransformerClassifier(
        vocab_size=VOCAB_SIZE,
        d_model=D_MODEL,
        n_heads=N_HEADS,
        n_layers=N_LAYERS,
        d_ff=D_FF,
        max_seq_len=MAX_SEQ_LEN,
        n_classes=N_CLASSES,
        dropout=DROPOUT
    ).to(device)

    # Count parameters
    total_params = sum(p.numel() for p in model.parameters())
    pos_params = model.positional_encoding.pos_embedding.weight.numel()
    print(f"Total parameters: {total_params:,}")
    print(f"Positional encoding parameters: {pos_params:,} ({100*pos_params/total_params:.1f}%)")

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)

    # Learning rate scheduler with warmup
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=LEARNING_RATE,
        epochs=NUM_EPOCHS,
        steps_per_epoch=len(train_loader),
        pct_start=0.1  # 10% warmup
    )

    # Training loop
    print("\n" + "="*60)
    print("Starting training...")
    print("="*60)

    best_val_acc = 0.0

    for epoch in range(NUM_EPOCHS):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        for batch_idx, (sequences, labels) in enumerate(train_loader):
            sequences, labels = sequences.to(device), labels.to(device)

            optimizer.zero_grad()

            # Forward pass
            logits = model(sequences)
            loss = criterion(logits, labels)

            # Backward pass
            loss.backward()

            # Gradient clipping (important for transformer stability)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            scheduler.step()

            # Track metrics
            train_loss += loss.item()
            _, predicted = logits.max(1)
            train_correct += predicted.eq(labels).sum().item()
            train_total += labels.size(0)

        train_loss /= len(train_loader)
        train_acc = 100.0 * train_correct / train_total

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for sequences, labels in val_loader:
                sequences, labels = sequences.to(device), labels.to(device)

                logits = model(sequences)
                loss = criterion(logits, labels)

                val_loss += loss.item()
                _, predicted = logits.max(1)
                val_correct += predicted.eq(labels).sum().item()
                val_total += labels.size(0)

        val_loss /= len(val_loader)
        val_acc = 100.0 * val_correct / val_total

        # Update best
        if val_acc > best_val_acc:
            best_val_acc = val_acc

        # Print progress
        print(f"Epoch {epoch+1:2d}/{NUM_EPOCHS} | "
              f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.1f}% | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.1f}%")

    print("="*60)
    print(f"Training complete! Best validation accuracy: {best_val_acc:.1f}%")
    print("="*60)

    # Demonstrate positional encoding values learned
    print("\n--- Learned Positional Encoding Analysis ---")
    with torch.no_grad():
        pos_weights = model.positional_encoding.pos_embedding.weight.cpu()
        print(f"Positional embedding shape: {pos_weights.shape}")
        print(f"Position 0 mean: {pos_weights[0].mean():.4f}, std: {pos_weights[0].std():.4f}")
        print(f"Position 31 mean: {pos_weights[31].mean():.4f}, std: {pos_weights[31].std():.4f}")

        # Compute similarity between adjacent positions
        similarities = []
        for i in range(min(10, MAX_SEQ_LEN - 1)):
            sim = F.cosine_similarity(
                pos_weights[i].unsqueeze(0),
                pos_weights[i+1].unsqueeze(0)
            ).item()
            similarities.append(sim)
        print(f"Cosine similarities between adjacent positions (0-9): {[f'{s:.3f}' for s in similarities]}")

    return model

In [27]:
if __name__ == "__main__":
    print("Learnable Positional Encoding Demonstration")
    print("=" * 60)

    # Run training
    trained_model = train_transformer()

    # Quick inference example
    print("\n--- Inference Example ---")
    device = next(trained_model.parameters()).device
    trained_model.eval()

    # Create a sample sequence
    sample_seq = torch.randint(1, 500, (1, 64)).to(device)  # Class 0 range
    with torch.no_grad():
        output = trained_model(sample_seq)
        probs = F.softmax(output, dim=-1)
        pred_class = output.argmax(dim=-1).item()

    print(f"Sample sequence (tokens 1-500 dominant)")
    print(f"Predicted class: {pred_class}")
    print(f"Class probabilities: {probs[0].cpu().numpy()}")

Learnable Positional Encoding Demonstration
Using device: cuda
Total parameters: 731,522
Positional encoding parameters: 8,192 (1.1%)

Starting training...
Epoch  1/20 | Train Loss: 0.6256 | Train Acc: 70.6% | Val Loss: 0.2975 | Val Acc: 99.8%
Epoch  2/20 | Train Loss: 0.0365 | Train Acc: 99.9% | Val Loss: 0.0003 | Val Acc: 100.0%
Epoch  3/20 | Train Loss: 0.0005 | Train Acc: 100.0% | Val Loss: 0.0003 | Val Acc: 100.0%
Epoch  4/20 | Train Loss: 0.0003 | Train Acc: 100.0% | Val Loss: 0.0002 | Val Acc: 100.0%
Epoch  5/20 | Train Loss: 0.0003 | Train Acc: 100.0% | Val Loss: 0.0002 | Val Acc: 100.0%
Epoch  6/20 | Train Loss: 0.0002 | Train Acc: 100.0% | Val Loss: 0.0001 | Val Acc: 100.0%
Epoch  7/20 | Train Loss: 0.0002 | Train Acc: 100.0% | Val Loss: 0.0001 | Val Acc: 100.0%
Epoch  8/20 | Train Loss: 0.0002 | Train Acc: 100.0% | Val Loss: 0.0001 | Val Acc: 100.0%
Epoch  9/20 | Train Loss: 0.0001 | Train Acc: 100.0% | Val Loss: 0.0001 | Val Acc: 100.0%
Epoch 10/20 | Train Loss: 0.0001 | Tr