In [2]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.3.1-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.3.1-py3-none-any.whl (484 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m484.9/484.9 kB[0m [31m15.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading multiprocess-0.70.16-py311-none-any.whl (143 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.5/143.5 kB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading 

In [3]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from datasets import load_dataset
from tqdm import tqdm

In [4]:
# 1. Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, embed_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-math.log(10000.0) / embed_size))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # shape: (1, max_len, embed_size)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (batch_size, seq_length, embed_size)
        return x + self.pe[:, :x.size(1), :]

In [5]:
# 2. Self-Attention
class SelfAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(SelfAttention, self).__init__()
        assert embed_size % num_heads == 0, "Embedding size must be divisible by num_heads"
        self.num_heads = num_heads
        self.head_dim = embed_size // num_heads

        self.query = nn.Linear(embed_size, embed_size)
        self.key   = nn.Linear(embed_size, embed_size)
        self.value = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, x):
        batch_size, seq_length, embed_size = x.shape
        Q = self.query(x)
        K = self.key(x)
        V = self.value(x)

        # Reshape into (batch_size, num_heads, seq_length, head_dim)
        Q = Q.view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
        K = K.view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        attention = torch.softmax(scores, dim=-1)
        out = torch.matmul(attention, V)
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_length, embed_size)
        return self.fc_out(out)

In [6]:
# 3. Transformer Block
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, num_heads, ff_hidden_mult=4, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.attention = SelfAttention(embed_size, num_heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.ff = nn.Sequential(
            nn.Linear(embed_size, ff_hidden_mult * embed_size),
            nn.ReLU(),
            nn.Linear(ff_hidden_mult * embed_size, embed_size),
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        attn_out = self.attention(x)
        x = self.norm1(x + self.dropout(attn_out))
        ff_out = self.ff(x)
        x = self.norm2(x + self.dropout(ff_out))
        return x

In [7]:
# 4. Simple Transformer Model
class SimpleTransformer(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, num_heads, max_len, dropout=0.1):
        super(SimpleTransformer, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, embed_size)
        self.position_encoding = PositionalEncoding(embed_size, max_len)
        self.layers = nn.ModuleList([
            TransformerBlock(embed_size, num_heads, dropout=dropout) for _ in range(num_layers)
        ])
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, x):
        # x: (batch_size, seq_length)
        x = self.token_embedding(x)  # (batch_size, seq_length, embed_size)
        x = self.position_encoding(x)
        for layer in self.layers:
            x = layer(x)
        logits = self.fc_out(x)  # (batch_size, seq_length, vocab_size)
        return logits

In [8]:
# 5. Preparing the Dataset
class WikiTextDataset(Dataset):
    def __init__(self, texts, tokenizer, block_size=128):
        self.tokenizer = tokenizer
        self.block_size = block_size
        self.data = []
        for text in texts:
            tokens = self.tokenizer(text)
            # Break tokens into blocks
            for i in range(0, len(tokens) - block_size):
                self.data.append((
                    torch.tensor(tokens[i:i+block_size], dtype=torch.long),
                    torch.tensor(tokens[i+1:i+block_size+1], dtype=torch.long)
                ))
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]

# A simple character-level tokenizer (for demonstration).
def simple_tokenizer(text):
    return [ord(c) for c in text if ord(c) < 128]  # only ascii

# Load WikiText-2 from Hugging Face Datasets (using only a subset for demo)
dataset = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:1%]")
texts = dataset["text"]

# Build our dataset
block_size = 64
train_dataset = WikiTextDataset(texts, simple_tokenizer, block_size=block_size)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Determine vocabulary size (assuming ascii)
vocab_size = 128


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/733k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/6.36M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/657k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/4358 [00:00<?, ? examples/s]

Generating train split:   0%|          | 0/36718 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3760 [00:00<?, ? examples/s]

In [10]:
# 6. Initialize Model and Optimizer
embed_size = 128
num_layers = 2
num_heads = 4
max_len = block_size

model = SimpleTransformer(vocab_size, embed_size, num_layers, num_heads, max_len, dropout=0.1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = optim.AdamW(model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss()
# Optional: Learning rate scheduler
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)


In [11]:
# 7. Training Loop
epochs = 5
model.train()
for epoch in range(epochs):
    epoch_loss = 0
    for batch_x, batch_y in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)
        optimizer.zero_grad()
        logits = model(batch_x)  # (batch_size, seq_length, vocab_size)
        loss = criterion(logits.view(-1, vocab_size), batch_y.view(-1))
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    scheduler.step()
    print(f"Epoch {epoch+1} Loss: {epoch_loss/len(train_loader):.4f}")

Epoch 1: 100%|██████████| 4560/4560 [00:30<00:00, 149.71it/s]


Epoch 1 Loss: 0.4008


Epoch 2: 100%|██████████| 4560/4560 [00:28<00:00, 158.56it/s]


Epoch 2 Loss: 0.0358


Epoch 3: 100%|██████████| 4560/4560 [00:28<00:00, 160.79it/s]


Epoch 3 Loss: 0.0320


Epoch 4: 100%|██████████| 4560/4560 [00:28<00:00, 160.67it/s]


Epoch 4 Loss: 0.0297


Epoch 5: 100%|██████████| 4560/4560 [00:28<00:00, 160.30it/s]

Epoch 5 Loss: 0.0282





In [12]:
# 8. Inference: Generate Text Function
def generate_text(model, start_tokens, length=50, block_size=block_size):
    model.eval()
    input_ids = torch.tensor([start_tokens], dtype=torch.long).to(device)
    with torch.no_grad():
        for _ in range(length):
            # Ensure we only take the last block_size tokens
            input_cond = input_ids[:, -block_size:]
            logits = model(input_cond)
            next_token_logits = logits[:, -1, :]
            probs = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
            input_ids = torch.cat([input_ids, next_token], dim=1)
    return input_ids.squeeze().tolist()

In [13]:
# 9. Generate and print text
start_tokens = simple_tokenizer("The meaning of life is")
generated = generate_text(model, start_tokens, length=100)
# Convert tokens back to characters
generated_text = ''.join([chr(t) for t in generated if 0 <= t < 128])
print("Generated text:")
print(generated_text)

Generated text:
The meaning of life issssssssssaaaaaaaaaaaaaaaaaaaaaaaaaannnnnnioma thi sucomatunal also phial behold @-@ is lits , hed th
