https://medium.com/@hhpatil001/transformers-from-scratch-in-simple-python-part-i-b290760c1040

In [None]:
%load_ext autoreload
%autoreload 2

figsize=(14, 4)

import os
import numpy as np
import pandas as pd
import seaborn as sns
from transformers import AutoTokenizer, AutoConfig
import torch
from torch import nn
import torch.nn.functional as F
from math import sqrt, log

## Encoder

In [None]:
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
text = 'I love data science.'
print(tokenizer(text, add_special_tokens=False, return_tensors='pt'))
inputs = tokenizer(text, add_special_tokens=False, return_tensors='pt')

In [None]:
config = AutoConfig.from_pretrained('bert-base-uncased')
config

In [None]:
token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
print(token_embeddings)

In [None]:
input_embeds = token_embeddings(inputs.input_ids)
print(input_embeds.size())
#Batch size, sequence length, hidden dimension

### Attention Weights

In [None]:
def scaled_dot_product_attention(query, key, value, mask=None, dropout=None):
    # query/key/value shapes: (batch, seq_len, head_dim)
    dim_k = query.size(-1)
    # compute scores: (batch, seq_len, seq_len)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)

    if mask is not None:
        # mask: 1 for positions to mask; convert to large negative number
        scores = scores.masked_fill(mask.bool(), float("-1e9"))

    weights = F.softmax(scores, dim=-1)
    if dropout is not None:
        weights = dropout(weights)
    return torch.bmm(weights, value)

### Multi-head attention
One attention head has the tendency to only focus on a certain feature

In [None]:
class AttentionHead(nn.Module):
	def __init__(self, embed_dim, head_dim, attention_dropout=0.0):
		super().__init__()
		self.q = nn.Linear(embed_dim, head_dim)
		self.k = nn.Linear(embed_dim, head_dim)
		self.v = nn.Linear(embed_dim, head_dim)
		self.attention_dropout = nn.Dropout(attention_dropout) if attention_dropout > 0.0 else None
	
	def forward(self, hidden_state, mask=None):
		attention_outputs = scaled_dot_product_attention(self.q(hidden_state), self.k(hidden_state), self.v(hidden_state), mask=mask, dropout=self.attention_dropout)
		return attention_outputs

In [None]:
class MultiHeadAttention(nn.Module):
	def __init__(self, config):
		super().__init__()
		embed_dim = config.hidden_size
		num_heads = config.num_attention_heads
		head_dim = embed_dim // num_heads
		self.heads = nn.ModuleList([AttentionHead(embed_dim, head_dim, attention_dropout=config.attention_probs_dropout_prob) for _ in range(num_heads)])
		self.output_linear = nn.Linear(embed_dim, embed_dim)
		self.out_dropout = nn.Dropout(config.hidden_dropout_prob)
	
	def forward(self, hidden_state, mask):
		x = torch.cat([h(hidden_state, mask=mask) for h in self.heads], dim=-1)
		x = self.output_linear(x)
		x = self.out_dropout(x)
		return x

In [None]:
multihead_attention = MultiHeadAttention(config)
attention_output = multihead_attention(input_embeds, None)
print(attention_output.size())

### Feed-Forward

In [None]:
class FeedForward(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
		self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
		self.gelu = nn.GELU()
		self.dropout = nn.Dropout(config.hidden_dropout_prob)
	
	def forward(self, x):
		x = self.linear_1(x)
		x = self.gelu(x)
		x = self.linear_2(x)
		x = self.dropout(x)
		return x

In [None]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attention_output)
print(ff_outputs.size())

### Positional Encoding

In [None]:
class PositionalEncoding(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.max_len = config.max_position_embeddings
		self.hidden_size = config.hidden_size

		pos_enc = torch.zeros(self.max_len, self.hidden_size)
		position = torch.arange(0, self.max_len, dtype=torch.float).unsqueeze(1)
		div_term = torch.exp(torch.arange(0, self.hidden_size, 2).float() * (-log(10000.0) / self.hidden_size))
		pos_enc[:, 0::2] = torch.sin(position * div_term)
		pos_enc[:, 1::2] = torch.cos(position * div_term)
		pos_enc = pos_enc.unsqueeze(0)
		self.register_buffer('pos_enc', pos_enc)
	
	def forward(self, x):
		seq_len = x.size(1)
		return self.pos_enc[:, :seq_len, :]


### Encoder Layer

In [None]:
class EncoderLayer(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.MHA = MultiHeadAttention(config)
		self.FFN = FeedForward(config)
		self.LN1 = nn.LayerNorm(config.hidden_size, eps=1e-12)
		self.LN2 = nn.LayerNorm(config.hidden_size, eps=1e-12)
		self.dropout = nn.Dropout(config.hidden_dropout_prob)
	
	def forward(self, x, attention_mask=None):
		MHA_out = self.MHA(x, mask=attention_mask)
		x = x + self.dropout(MHA_out) #Add
		x = self.LN1(x) #And Norm

		FFN_out = self.FFN(x)
		x = x + self.dropout(FFN_out) #Add
		x = self.LN2(x) #And Norm

## Decoder

In [None]:
class DecoderLayer(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.masked_MHA = MultiHeadAttention(config)
		self.cross_MHA = MultiHeadAttention(config)
		self.FFN = FeedForward(config)
		self.LN1 = nn.LayerNorm(config.hidden_size, eps=1e-12)
		self.LN2 = nn.LayerNorm(config.hidden_size, eps=1e-12)
		self.LN3 = nn.LayerNorm(config.hidden_size, eps=1e-12)
		self.dropout = nn.Dropout(config.hidden_dropout_prob)

	def forward(self, x, enc_out, masked_self_attn_mask=None, cross_attn_mask=None):
		self_attn_out = self.masked_MHA(x, mask=masked_self_attn_mask)
		x = x + self.dropout(self_attn_out)
		x = self.LN1(x)

		#Cross-attention to get encoder-output
		cross_attn_out = self._cross_attention(x, enc_out, cross_attn_mask)
		x = x + self.dropout(cross_attn_out)
		x = self.LN2(x)

		FFN_out = self.FFN(x)
		x = x + self.dropout(FFN_out)
		x = self.LN3(x)
		return x
	
	def _cross_attention(self, dec_state, enc_state, mask=None):
		batch, target_len, hidden = dec_state.size()
		src = enc_state.size(1)

		config = type('C', (), {})()
		config.hidden_size = hidden
		config.num_attention_heads = self.masked_MHA.num_heads
		embed_dim = hidden
		num_heads = config.num_attention_heads
		head_dim = embed_dim // num_heads

		q_linears = nn.ModuleList([nn.Linear(embed_dim, head_dim) for _ in range(num_heads)]).to(dec_state.device)
		k_linears = nn.ModuleList([nn.Linear(embed_dim, head_dim) for _ in range(num_heads)]).to(dec_state.device)
		v_linears = nn.ModuleList([nn.Linear(embed_dim, head_dim) for _ in range(num_heads)]).to(dec_state.device)
		out_linear = nn.Linear(embed_dim, embed_dim).to(dec_state.device)

		head_outputs = []
		for i in range(num_heads):
			q = q_linears[i](dec_state)
			k = k_linears[i](enc_state)
			v = v_linears[i](enc_state)

			#Kan vel egentlig bruke attention-funksjonen jeg lagde lenger opp?
			scores = torch.bmm(q, k.transpose(1,2)) / sqrt(head_dim)
			if mask is not None:
				scores = scores.masked_fill(mask.bool(), float('-1e9'))
			weights = F.softmax(scores, dim=-1)
			head_out = torch.bmm(weights, v)
			head_outputs.append(head_out)
		concat = torch.cat(head_outputs, dim=-1)
		out = out_linear(concat)
		return out


## Stack

In [None]:
class Encoder(nn.Module):
	def __init__(self, config, num_layers=6):
		super().__init__()
		self.layers = nn.ModuleList([EncoderLayer(config) for _ in range(num_layers)])
		self.ln_final = nn.LayerNorm(config.hidden_size, eps=1e-12)
	
	def forward(self, x, attn_mask=None):
		for layer in self.layers:
			x = layer(x, attention_mask=attn_mask)
		x = self.ln_final(x)
		return x

In [None]:
class Decoder(nn.Module):
	def __init__(self, config, num_layers=6):
		super().__init__()
		self.layers = nn.ModuleList([DecoderLayer(config) for _ in range(num_layers)])
		self.ln_final = nn.LayerNorm(config.hidden_size, eps=1e-12)
	
	def forward(self, x, enc_state, masked_self_attn_mask=None, cross_attn_mask=None):
		for layer in self.layers:
			x = layer(x, enc_state, masked_self_attn_mask=masked_self_attn_mask, cross_attn_mask=cross_attn_mask)
		x = self.ln_final(x)
		return x

## Transformer
<div>
<img src="images/transformer_architecture.png" width="500"/>
</div>

In [None]:
class SimpleTransformer(nn.Module):
	def __init__(self, config, encoder_layers=6, decoder_layers=6):
		super().__init__()
		print(config)
		self.config = config
		self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
		self.pos_encoder = PositionalEncoding(config)
		self.encoder = Encoder(config, num_layers=encoder_layers)
		self.decoder = Decoder(config, num_layers=decoder_layers)
		self.final_linear = nn.Linear(config.hidden_size, config.vocab_size)
		self.dropout = nn.Dropout(config.hidden_dropout_prob)
	
	def forward(self, input_ids, output_ids, input_mask=None, output_mask=None):
		input_embeddings = self.token_embeddings(input_ids) + self.pos_encoder(self.token_embeddings(input_ids))
		input_embeddings = self.dropout(input_embeddings)

		output_embeddings = self.token_embeddings(output_ids) + self.pos_encoder(self.token_embeddings(output_ids))
		output_embeddings = self.dropout(output_embeddings)

		encoder_attn_mask = None
		if input_mask is not None:
			batch, input_len = input_ids.size()
			encoder_attn_mask = input_mask.unsqueeze(1).expand(batch, input_len, input_len)
		
		batch, output_len = output_ids.size()
		causal_mask = torch.triu(torch.ones((output_len, output_len), device=output_ids.device), diagonal=1).bool()
		masked_self_attn_mask = causal_mask.unsqueeze(0).expand(batch, output_len, output_len)

		if output_mask is not None:
			pad_mask = output_mask.unsqueeze(1).expand(batch, output_len, output_len)
			masked_self_attn_mask = masked_self_attn_mask | pad_mask.bool()
		
		cross_attn_mask = None
		if input_mask is not None:
			cross_attn_mask = input_mask.unsqueeze(1).expand(batch, output_len, input_len)
		
		enc_out = self.encoder(input_embeddings, attn_mask=encoder_attn_mask)
		dec_out = self.decoder(output_embeddings, enc_out, masked_self_attn_mask=masked_self_attn_mask, cross_attn_mask=cross_attn_mask)

		logits = self.final_linear(dec_out)
		return logits

In [None]:
import torch
import torch.nn as nn
import random

# --------------------------
# 1️⃣ Tiny English–Norwegian dictionary
# --------------------------
eng2nor = {
    "i": "jeg",
    "you": "du",
    "he": "han",
    "she": "hun",
    "we": "vi",
    "they": "de",
    "like": "liker",
    "eat": "spiser",
    "play": "spiller",
    "see": "ser",
    "love": "elsker",
    "dog": "hund",
    "cat": "katt",
    "pizza": "pizza",
    "apple": "eple",
    "soccer": "fotball",
    "milk": "melk",
    "bread": "brød"
}

# generate simple sentences like “i like pizza”, “they play soccer”
subjects = ["i", "you", "he", "she", "we", "they"]
verbs = ["like", "eat", "play", "see", "love"]
objects = ["pizza", "apple", "soccer", "milk", "bread", "dog", "cat"]

def make_sentence():
    s = f"{random.choice(subjects)} {random.choice(verbs)} {random.choice(objects)}"
    t = " ".join(eng2nor[w] for w in s.split())
    return s, t

pairs = [make_sentence() for _ in range(50)]
print("Example pairs:")
for e, n in pairs[:5]:
    print(f"{e:20s} → {n}")

# --------------------------
# 2️⃣ Build vocabulary
# --------------------------
def build_vocab(pairs):
    vocab = {"<pad>": 0, "<s>": 1, "</s>": 2}
    for eng, nor in pairs:
        for w in (eng + " " + nor).split():
            if w not in vocab:
                vocab[w] = len(vocab)
    inv_vocab = {v: k for k, v in vocab.items()}
    return vocab, inv_vocab

vocab, inv_vocab = build_vocab(pairs)
vocab_size = len(vocab)
print("Vocab size:", vocab_size)

# --------------------------
# 3️⃣ Encode sequences
# --------------------------
def encode(sentence, vocab, max_len):
    ids = [vocab["<s>"]] + [vocab[w] for w in sentence.split()] + [vocab["</s>"]]
    if len(ids) < max_len:
        ids += [vocab["<pad>"]] * (max_len - len(ids))
    return ids[:max_len]

max_len = max(len((e + " " + n).split()) + 2 for e, n in pairs)

input_ids = torch.tensor([encode(e, vocab, max_len) for e, _ in pairs])
output_ids = torch.tensor([encode(n, vocab, max_len) for _, n in pairs])
input_mask = (input_ids == vocab["<pad>"])
output_mask = (output_ids == vocab["<pad>"])

# --------------------------
# 4️⃣ Init Transformer
# --------------------------
cfg = AutoConfig.from_pretrained('bert-base-uncased')
model = SimpleTransformer(cfg, encoder_layers=2, decoder_layers=2)

optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss(ignore_index=vocab["<pad>"])

# --------------------------
# 5️⃣ Training loop (very small)
# --------------------------
for epoch in range(20):
    model.train()
    optimizer.zero_grad()
    
    decoder_input = output_ids[:, :-1]
    target_output = output_ids[:, 1:]

    logits = model(input_ids, decoder_input, input_mask, output_mask[:, :-1])
    loss = criterion(logits.reshape(-1, vocab_size), target_output.reshape(-1))
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 5 == 0:
        print(f"Epoch {epoch+1}: loss = {loss.item():.3f}")


In [None]:
# --------------------------
# 6️⃣ Try your own sentence!
# --------------------------
def translate(sentence, model, vocab, inv_vocab, max_len):
    model.eval()
    with torch.no_grad():
        src = torch.tensor([encode(sentence, vocab, max_len)])
        src_mask = (src == vocab["<pad>"])

        # start token for decoder
        decoder_input = torch.tensor([[vocab["<s>"]]])
        output = []

        for _ in range(max_len):
            logits = model(src, decoder_input, src_mask)
            next_token = torch.argmax(logits[0, -1])
            if next_token.item() == vocab["</s>"]:
                break
            output.append(inv_vocab[next_token.item()])
            decoder_input = torch.cat([decoder_input, next_token.unsqueeze(0).unsqueeze(0)], dim=1)
        return " ".join(output)

test_sentence = "i like pizza"
print(f"\nInput: {test_sentence}")
print("Output:", translate(test_sentence, model, vocab, inv_vocab, max_len))