In [1]:
# Q1. Character-Level RNN Language Model

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import math
import random

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# --------------------------
# 1. Prepare Text Corpus
# --------------------------

# Small toy corpus (you can replace/extend this with a larger text)
toy_text = """
hello hello help hello hero hello helium
hi there how are you doing today
this is a simple character level language model example
we are testing recurrent neural networks on text
"""

# Optionally: If you have a text file ~50â€“200KB, you can use:
# with open("my_corpus.txt", "r", encoding="utf-8") as f:
#     toy_text = f.read().lower()

text = toy_text.lower()
chars = sorted(list(set(text)))
vocab_size = len(chars)

print("Vocab size:", vocab_size)
print("Sample characters:", chars[:50])

char2idx = {ch: i for i, ch in enumerate(chars)}
idx2char = {i: ch for ch, i in char2idx.items()}

def encode_text(text):
    return [char2idx[ch] for ch in text]

def decode_indices(indices):
    return "".join(idx2char[i] for i in indices)

encoded_text = encode_text(text)

# --------------------------
# 2. Dataset Definition
# --------------------------

class CharDataset(Dataset):
    def __init__(self, encoded_text, seq_len=50):
        self.data = encoded_text
        self.seq_len = seq_len

    def __len__(self):
        # last index where we can take seq_len+1 chars
        return len(self.data) - self.seq_len - 1

    def __getitem__(self, idx):
        x = self.data[idx:idx + self.seq_len]
        y = self.data[idx + 1:idx + 1 + self.seq_len]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

seq_len = 50
dataset = CharDataset(encoded_text, seq_len=seq_len)

# Train/val split (e.g., 90/10)
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])

batch_size = 64
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)

# --------------------------
# 3. Model Definition
# --------------------------

class CharRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=64, hidden_size=128, num_layers=1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embed(x)  # (batch, seq, embed_dim)
        out, hidden = self.lstm(x, hidden)  # out: (batch, seq, hidden)
        logits = self.fc(out)  # (batch, seq, vocab_size)
        return logits, hidden

model = CharRNN(vocab_size=vocab_size, embed_dim=128, hidden_size=256, num_layers=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# --------------------------
# 4. Training & Validation
# --------------------------

def train_epoch(model, dataloader):
    model.train()
    total_loss = 0.0
    for x, y in dataloader:
        x = x.to(device)
        y = y.to(device)

        optimizer.zero_grad()
        logits, _ = model(x)
        # reshape for CE: (batch*seq, vocab)
        loss = criterion(logits.view(-1, vocab_size), y.view(-1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)  # gradient clipping
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate(model, dataloader):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for x, y in dataloader:
            x = x.to(device)
            y = y.to(device)
            logits, _ = model(x)
            loss = criterion(logits.view(-1, vocab_size), y.view(-1))
            total_loss += loss.item()
    return total_loss / len(dataloader)

num_epochs = 10
train_losses = []
val_losses = []

for epoch in range(1, num_epochs + 1):
    train_loss = train_epoch(model, train_loader)
    val_loss = evaluate(model, val_loader)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    print(f"Epoch {epoch}/{num_epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

# --------------------------
# 5. Text Generation
# --------------------------

def sample_next_char(probs, temperature=1.0):
    probs = probs.cpu().numpy()
    # temperature scaling: higher temp = more random
    probs = np.log(probs + 1e-9) / temperature
    probs = np.exp(probs)
    probs = probs / np.sum(probs)
    return np.random.choice(len(probs), p=probs)

def generate_text(model, start_text="hello ", length=300, temperature=1.0):
    model.eval()
    chars_input = [char2idx.get(ch, 0) for ch in start_text.lower()]
    input_seq = torch.tensor(chars_input, dtype=torch.long).unsqueeze(0).to(device)
    hidden = None
    generated = chars_input.copy()

    with torch.no_grad():
        # warm up with the start_text
        logits, hidden = model(input_seq, hidden)
        last_char = input_seq[:, -1]

        for _ in range(length):
            logits, hidden = model(last_char.unsqueeze(1), hidden)
            probs = torch.softmax(logits[:, -1, :], dim=-1).squeeze(0)
            next_idx = sample_next_char(probs, temperature=temperature)
            generated.append(next_idx)
            last_char = torch.tensor([next_idx], dtype=torch.long).to(device)

    return decode_indices(generated)

print("\n=== Generated (temperature = 0.7) ===")
print(generate_text(model, start_text="hello ", length=300, temperature=0.7))

print("\n=== Generated (temperature = 1.0) ===")
print(generate_text(model, start_text="hello ", length=300, temperature=1.0))

print("\n=== Generated (temperature = 1.2) ===")
print(generate_text(model, start_text="hello ", length=300, temperature=1.2))


Using device: cpu
Vocab size: 23
Sample characters: ['\n', ' ', 'a', 'c', 'd', 'e', 'g', 'h', 'i', 'k', 'l', 'm', 'n', 'o', 'p', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y']
Epoch 1/10 | Train Loss: 3.1118 | Val Loss: 3.0561
Epoch 2/10 | Train Loss: 3.0257 | Val Loss: 2.9204
Epoch 3/10 | Train Loss: 2.8730 | Val Loss: 2.8569
Epoch 4/10 | Train Loss: 2.8275 | Val Loss: 2.7758
Epoch 5/10 | Train Loss: 2.7500 | Val Loss: 2.7214
Epoch 6/10 | Train Loss: 2.7016 | Val Loss: 2.6680
Epoch 7/10 | Train Loss: 2.6390 | Val Loss: 2.5874
Epoch 8/10 | Train Loss: 2.5511 | Val Loss: 2.4898
Epoch 9/10 | Train Loss: 2.4487 | Val Loss: 2.3773
Epoch 10/10 | Train Loss: 2.3305 | Val Loss: 2.2452

=== Generated (temperature = 0.7) ===
hello thel thwexca ruel   leale eell  eoaaes ee  eio le rheel ae heoo hol gear oaeldr eel eeh ohpa x eee oouctmounay iyeid h tee lou hairer r ehre k eelel laoeh meoro  helaee hheh enlo h huheoog haee  eo ecaepraare e amlrerlwleu lhell hhuao melho gllgomima gegd eitae ee llae e ec 

In [2]:
# Q2. Mini Transformer Encoder for Sentences

import torch
import torch.nn as nn
import math

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# --------------------------
# 1. Toy Sentence Dataset
# --------------------------

sentences = [
    "i love natural language processing",
    "this course is about deep learning",
    "attention is all you need",
    "transformers use self attention",
    "rnn models process sequences",
    "pytorch makes experiments easier",
    "we are building a mini transformer",
    "nlp tasks include classification",
    "sequence to sequence models translate text",
    "embedding captures word meaning"
]

# simple whitespace tokenization
tokenized = [s.lower().split() for s in sentences]
vocab = sorted({w for sent in tokenized for w in sent})
word2idx = {w: i+2 for i, w in enumerate(vocab)}  # reserve 0=PAD, 1=UNK
word2idx["<pad>"] = 0
word2idx["<unk>"] = 1
idx2word = {i: w for w, i in word2idx.items()}

vocab_size = len(word2idx)
print("Vocab size:", vocab_size)

max_len = max(len(s) for s in tokenized)

def encode_sentence(words, max_len):
    ids = [word2idx.get(w, 1) for w in words]
    if len(ids) < max_len:
        ids += [0] * (max_len - len(ids))  # pad
    return ids[:max_len]

encoded = [encode_sentence(s, max_len) for s in tokenized]
input_ids = torch.tensor(encoded, dtype=torch.long).to(device)  # (batch, seq_len)

batch_size, seq_len = input_ids.shape
print("Input shape:", input_ids.shape)

# --------------------------
# 2. Positional Encoding
# --------------------------

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)  # even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # odd indices
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # x: (batch, seq_len, d_model)
        x = x + self.pe[:, :x.size(1), :]
        return x

# --------------------------
# 3. Scaled Dot-Product Attention
# --------------------------

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, Q, K, V, mask=None):
        # Q,K,V: (batch, heads, seq_len, head_dim)
        dk = Q.size(-1)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(dk)  # (batch, heads, seq, seq)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-inf"))
        attn_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attn_weights, V)  # (batch, heads, seq, head_dim)
        return output, attn_weights

# --------------------------
# 4. Multi-Head Attention
# --------------------------

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=64, num_heads=2):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

        self.attention = ScaledDotProductAttention()

    def forward(self, x, mask=None):
        batch_size, seq_len, d_model = x.size()

        Q = self.W_q(x)  # (batch, seq, d_model)
        K = self.W_k(x)
        V = self.W_v(x)

        # reshape to (batch, heads, seq, head_dim)
        def split_heads(t):
            return t.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        Q = split_heads(Q)
        K = split_heads(K)
        V = split_heads(V)

        attn_output, attn_weights = self.attention(Q, K, V, mask=mask)
        # attn_output: (batch, heads, seq, head_dim)

        # combine heads
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)
        out = self.W_o(attn_output)  # (batch, seq, d_model)
        return out, attn_weights  # attn_weights: (batch, heads, seq, seq)

# --------------------------
# 5. Feed-Forward + Encoder Layer
# --------------------------

class PositionwiseFFN(nn.Module):
    def __init__(self, d_model=64, d_ff=128, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.fc2(self.dropout(self.relu(self.fc1(x))))

class EncoderLayer(nn.Module):
    def __init__(self, d_model=64, num_heads=2, d_ff=128, dropout=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.ffn = PositionwiseFFN(d_model=d_model, d_ff=d_ff, dropout=dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Self-attention + Add & Norm
        attn_out, attn_weights = self.mha(x, mask)
        x = self.norm1(x + self.dropout(attn_out))
        # FFN + Add & Norm
        ffn_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_out))
        return x, attn_weights

# --------------------------
# 6. Whole Encoder
# --------------------------

class MiniTransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model=64, num_heads=2, num_layers=1, max_len=100):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len=max_len)
        self.layers = nn.ModuleList([
            EncoderLayer(d_model=d_model, num_heads=num_heads)
            for _ in range(num_layers)
        ])

    def forward(self, input_ids, mask=None):
        x = self.embed(input_ids)  # (batch, seq, d_model)
        x = self.pos_encoding(x)
        attn_weights_list = []
        for layer in self.layers:
            x, attn_weights = layer(x, mask)
            attn_weights_list.append(attn_weights)
        return x, attn_weights_list

encoder = MiniTransformerEncoder(vocab_size=vocab_size, d_model=64, num_heads=2, num_layers=1, max_len=seq_len).to(device)

# no training here, just a forward pass to see contextual embeddings & attention
with torch.no_grad():
    contextual_embeddings, attn_weights_list = encoder(input_ids)

print("Contextual embeddings shape:", contextual_embeddings.shape)  # (batch, seq, d_model)

# --------------------------
# 7. Show Outputs
# --------------------------

# Print input tokens and first sentence contextual vectors
first_sentence_ids = input_ids[0].cpu().tolist()
first_tokens = [idx2word[idx] for idx in first_sentence_ids]

print("\n=== Input Tokens (Sentence 1) ===")
print(first_tokens)

print("\n=== Contextual Embeddings (Sentence 1, first 5 tokens) ===")
print(contextual_embeddings[0, :5, :])  # first 5 tokens, all dims

# Attention heatmap (head 0, layer 0, sentence 1)
attn_weights = attn_weights_list[0]  # from first (and only) layer
# shape: (batch, heads, seq, seq)
attn_sentence1_head0 = attn_weights[0, 0].cpu().numpy()  # (seq, seq)

print("\n=== Attention Weights (Layer 1, Head 1, Sentence 1) ===")
print(attn_sentence1_head0)

Using device: cpu
Vocab size: 46
Input shape: torch.Size([10, 6])
Contextual embeddings shape: torch.Size([10, 6, 64])

=== Input Tokens (Sentence 1) ===
['i', 'love', 'natural', 'language', 'processing', '<pad>']

=== Contextual Embeddings (Sentence 1, first 5 tokens) ===
tensor([[ 6.5521e-01,  7.3332e-01, -1.4841e+00,  9.0919e-01, -8.4982e-01,
          1.9129e+00, -5.5165e-01,  2.7812e-01,  1.4224e-01,  7.4082e-01,
         -1.8592e+00, -6.2796e-01, -1.0853e-01,  4.8727e-01, -2.8122e-01,
          1.1624e+00, -3.7091e-01,  5.0361e-01, -1.0767e+00,  2.2217e-02,
         -1.9465e-01,  1.0568e+00, -9.0730e-01,  1.4075e+00, -1.2517e+00,
         -6.6078e-01, -1.5069e-01,  1.2949e+00, -2.5458e-01,  5.5996e-01,
         -6.4785e-02, -5.9826e-01,  2.9142e-01,  9.4069e-01, -4.6100e-01,
          1.5428e+00,  2.4195e-01,  6.1674e-01, -1.4491e-01, -1.5349e-02,
         -1.2044e+00, -1.1046e+00, -1.2592e+00,  1.8414e+00,  2.9753e-02,
          5.9955e-01, -8.7926e-01,  1.1300e+00, -1.7822e+00,

In [3]:
# Q3. Implement Scaled Dot-Product Attention

import torch
import torch.nn.functional as F
import math

def scaled_dot_product_attention(Q, K, V, mask=None):
    """
    Q, K, V: (batch, seq_len, d_k)
    mask: (batch, seq_len, seq_len) with 0 where masked, 1 where valid (optional)
    """
    dk = Q.size(-1)

    # scores: (batch, seq, seq)
    scores = torch.matmul(Q, K.transpose(-2, -1))  # unscaled
    print("Raw scores (no scaling):")
    print(scores)

    scaled_scores = scores / math.sqrt(dk)
    print("\nScaled scores (divided by sqrt(d_k)):")
    print(scaled_scores)

    if mask is not None:
        scaled_scores = scaled_scores.masked_fill(mask == 0, float("-inf"))

    attn_weights = F.softmax(scaled_scores, dim=-1)
    print("\nAttention weights after softmax:")
    print(attn_weights)

    # output: (batch, seq, d_k)
    output = torch.matmul(attn_weights, V)
    print("\nAttention output vectors:")
    print(output)

    return output, attn_weights

# --------------------------
# Test with random Q, K, V
# --------------------------

batch_size = 1
seq_len = 4
d_k = 8

torch.manual_seed(42)
Q = torch.randn(batch_size, seq_len, d_k)
K = torch.randn(batch_size, seq_len, d_k)
V = torch.randn(batch_size, seq_len, d_k)

print("Q shape:", Q.shape)
print("K shape:", K.shape)
print("V shape:", V.shape)

output, attn_weights = scaled_dot_product_attention(Q, K, V)

Q shape: torch.Size([1, 4, 8])
K shape: torch.Size([1, 4, 8])
V shape: torch.Size([1, 4, 8])
Raw scores (no scaling):
tensor([[[-5.8108, -3.6962, -4.3623, -8.6428],
         [-3.5381,  4.3192, -1.4130,  1.1963],
         [-3.0255, -1.0973, -1.4376,  1.4581],
         [-1.5793, -1.7662, -1.5380,  0.7021]]])

Scaled scores (divided by sqrt(d_k)):
tensor([[[-2.0544, -1.3068, -1.5423, -3.0557],
         [-1.2509,  1.5271, -0.4996,  0.4229],
         [-1.0697, -0.3879, -0.5083,  0.5155],
         [-0.5584, -0.6244, -0.5437,  0.2482]]])

Attention weights after softmax:
tensor([[[0.1942, 0.4102, 0.3242, 0.0714],
         [0.0408, 0.6555, 0.0864, 0.2173],
         [0.1041, 0.2057, 0.1824, 0.5078],
         [0.1926, 0.1803, 0.1955, 0.4316]]])

Attention output vectors:
tensor([[[ 0.2667,  0.2371, -0.0554,  0.1298,  0.3541, -0.1906, -0.6448,
          -0.0085],
         [ 0.1086,  0.2444, -0.2164,  0.3814,  0.0631, -0.5633, -1.1007,
          -0.3306],
         [ 0.4947, -0.1095, -0.5350,  0.34