In [3]:
import numpy as np

# =================== Hyperparams ====================
np.random.seed(42)
vocab = {}
reverse_vocab = []
vocab_size = 1000
embedding_dim = 32
max_len = 16


# =================== Tokenizer ====================
def simple_tokenize(text):
    return text.lower().replace(".", "").replace(",", "").split()

def build_vocab(sentences):
    global vocab, reverse_vocab
    words = set()
    for s in sentences:
        words.update(simple_tokenize(s))
    vocab = {"[PAD]": 0, "[CLS]": 1, "[SEP]": 2, "[MASK]": 3}
    for i, w in enumerate(sorted(words), start=4):
        vocab[w] = i
    reverse_vocab = [k for k, v in sorted(vocab.items(), key=lambda x: x[1])]

def encode(s1, s2):
    ids = [vocab["[CLS]"]] + [vocab[w] for w in simple_tokenize(s1)] + [vocab["[SEP]"]]
    ids += [vocab[w] for w in simple_tokenize(s2)] + [vocab["[SEP]"]]
    ids += [vocab["[PAD]"]] * (max_len - len(ids))
    return np.array(ids[:max_len])


# =================== Positional Encoding ====================
def positional_encoding(max_len, dim):
    pos = np.arange(max_len)[:, None]
    i = np.arange(dim)[None, :]
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(dim))
    angle_rads = pos * angle_rates
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    return angle_rads


# =================== Layer Norm ====================
def norm(x, eps=1e-6):
    mean = np.mean(x, axis=-1, keepdims=True)
    std = np.std(x, axis=-1, keepdims=True)
    return (x - mean) / (std + eps)


# =================== Attention ====================
def attention(q, k, v):
    dk = q.shape[-1]
    scores = q @ k.T / np.sqrt(dk)
    weights = np.exp(scores) / np.sum(np.exp(scores), axis=-1, keepdims=True)
    return weights @ v


# =================== Multi-Head Attention ====================
class MiniMHA:
    def __init__(self, dim):
        self.Wq = np.random.randn(dim, dim) * 0.01
        self.Wk = np.random.randn(dim, dim) * 0.01
        self.Wv = np.random.randn(dim, dim) * 0.01
        self.Wo = np.random.randn(dim, dim) * 0.01

    def forward(self, x):
        q = x @ self.Wq
        k = x @ self.Wk
        v = x @ self.Wv
        attn = attention(q, k, v)
        out = attn @ self.Wo
        return out


# =================== Feed Forward ====================
class MiniFFN:
    def __init__(self, dim):
        self.W1 = np.random.randn(dim, dim * 2) * 0.01
        self.b1 = np.zeros(dim * 2)
        self.W2 = np.random.randn(dim * 2, dim) * 0.01
        self.b2 = np.zeros(dim)

    def forward(self, x):
        h = np.maximum(0, x @ self.W1 + self.b1)
        return h @ self.W2 + self.b2


# =================== Mini BERT Encoder ====================
class MiniBERTEncoder:
    def __init__(self, vocab_size, dim, max_len):
        self.embedding = np.random.randn(vocab_size, dim) * 0.01
        self.pos_encoding = positional_encoding(max_len, dim)
        self.mha = MiniMHA(dim)
        self.ffn = MiniFFN(dim)

    def forward(self, input_ids):
        x = self.embedding[input_ids] + self.pos_encoding[:len(input_ids)]
        attn_out = self.mha.forward(x)
        x = norm(x + attn_out)
        ffn_out = self.ffn.forward(x)
        x = norm(x + ffn_out)
        return x


# =================== NSP Head ====================
class NSPHead:
    def __init__(self, dim):
        self.W = np.random.randn(dim, 2) * 0.01
        self.b = np.zeros(2)

    def forward(self, cls_vec):
        logits = cls_vec @ self.W + self.b
        probs = np.exp(logits) / np.sum(np.exp(logits))
        return probs


# =================== Full BERT NSP Model ====================
class MiniBERTForNSP:
    def __init__(self, vocab_size, dim, max_len):
        self.encoder = MiniBERTEncoder(vocab_size, dim, max_len)
        self.nsp = NSPHead(dim)

    def forward(self, input_ids):
        x = self.encoder.forward(input_ids)
        cls_vec = x[0]
        return self.nsp.forward(cls_vec)


# =================== Sample Sentences & Testing ====================
sentences = [
    "Kucing tidur di atas sofa",
    "Anjing bermain di taman",
    "Langit berwarna biru hari ini",
    "Dia sedang belajar matematika"
]

build_vocab(sentences)

# Pasangan yang benar (IsNext)
s1 = "Kucing tidur di atas sofa"
s2 = "Langit berwarna biru hari ini"

# Pasangan acak (NotNext)
s3 = "Kucing tidur di atas sofa"
s4 = "Dia sedang belajar matematika"

input_ids_true = encode(s1, s2)
input_ids_false = encode(s3, s4)

model = MiniBERTForNSP(vocab_size=len(vocab), dim=embedding_dim, max_len=max_len)

print("Input TRUE:", s1, " + ", s2)
print("NSP Output Prob (IsNext vs NotNext):", model.forward(input_ids_true))

print("\nInput FALSE:", s3, " + ", s4)
print("NSP Output Prob (IsNext vs NotNext):", model.forward(input_ids_false))


Input TRUE: Kucing tidur di atas sofa  +  Langit berwarna biru hari ini
NSP Output Prob (IsNext vs NotNext): [0.48686526 0.51313474]

Input FALSE: Kucing tidur di atas sofa  +  Dia sedang belajar matematika
NSP Output Prob (IsNext vs NotNext): [0.48686528 0.51313472]


In [None]:
tokens = set()
for s1, s2, _ in dataset:
    tokens.update(s1.lower().split())
    tokens.update(s2.lower().split())

tokens = sorted(list(tokens))
vocab = {w: i+2 for i, w in enumerate(tokens)}  # 0 = PAD, 1 = CLS
vocab["[PAD]"] = 0
vocab["[CLS]"] = 1
vocab["[SEP]"] = len(vocab)


In [None]:
s1 = "Aku pergi ke toko".lower().split()
s2 = "Lalu aku membeli roti".lower().split()

input_ids = [vocab["[CLS]"]] + [vocab[w] for w in s1] + [vocab["[SEP]"]] + [vocab[w] for w in s2] + [vocab["[SEP]"]]
max_len = 16

if len(input_ids) < max_len:
    input_ids += [vocab["[PAD]"]] * (max_len - len(input_ids))
else:
    input_ids = input_ids[:max_len]

input_ids = np.array(input_ids)


In [None]:
# Kita pakai Mini-BERT yang sebelumnya sudah kita bangun
hidden_states = mini_bert_forward(input_ids)

# Ambil output vector dari token [CLS] (posisi pertama)
cls_vector = hidden_states[0]


In [None]:
hidden_dim = cls_vector.shape[0]  # misal 32 atau 64, tergantung BERT

W = np.random.randn(hidden_dim, 2) * 0.01
b = np.zeros(2)

logits = cls_vector @ W + b

# Softmax
probs = np.exp(logits) / np.sum(np.exp(logits))
pred = np.argmax(probs)


In [None]:
for i in range(d_model):
    token_result = []
    print()
    print(f"Token {i}: {x[i]}")
    for j in range(0, d_model, 2):
        xx = x[i][j]
        yy = x[i][j + 1]
        print(f"Before Pos : (x,y) ({j}, {j+1}): ({xx}, {yy})")

        m = j
        wi = 1/10000**(2*i/d_model)
        thetha = m*wi

        xx_r = xx*np.cos(thetha) - yy*np.sin(thetha)
        yy_r = yy*np.sin(thetha) + yy*np.cos(thetha)

        print(f"After Pos : (x,y) ({j}, {j+1}): ({xx_r}, {yy_r})")

        # Store the results
        result[i][j] = xx_r
        if j + 1 < d_model: 
            result[i][j + 1] = yy_r
                
        token_result.extend([xx_r, yy_r])
        
    print(f"Token {i} with positional encoding: {result[i]}")


In [None]:
# Format: (kalimat, label) — label 1 = positif, 0 = negatif
data = [
    ("aku suka banget sama film ini", 1),
    ("filmnya bener-bener membosankan", 0),
    ("aktingnya luar biasa", 1),
    ("ngantuk banget pas nonton", 0),
    ("ceritanya bikin terharu", 1),
    ("gak masuk akal dan jelek", 0),
]


In [None]:
for b in range(batch_size):
    print(f"\n[Batch {b}]")
    sentence_ids = input[b]
    sentence_words = [id2w.get(i, "[UNK]") for i in sentence_ids]
    print("Kalimat:", " ".join(sentence_words))
    print("Token IDs:", sentence_ids)
    
    for h in range(num_heads):
        print(f"\n  Head {h}:")
        for q in range(seq_len):
            q_word = id2w.get(sentence_ids[q], "[UNK]")
            print(f"    Query Token {q:2d} [{q_word:<10}]")

            for k in range(seq_len):
                k_word = id2w.get(sentence_ids[k], "[UNK]")
                score = scores[b, h, q, k]
                weight = att_weights[b, h, q, k]
                print(f"      ↳ Key Token {k:2d} [{k_word:<10}] | Score: {score:>7.4f} | Softmax: {weight:>7.4f}")

In [None]:
# 2 contoh kalimat + label (1 = positif, 0 = negatif)
inputs = np.array([
    [1, 2, 3, 4, 5, 0, 0, 0, 0, 0],  # "aku suka banget film ini"
    [1, 6, 7, 8, 9, 0, 0, 0, 0, 0]   # "aku benci banget endingnya"
])
labels = np.array([1, 0])


np.random.seed(42)
vocab_size = 20
embedding_dim = 8
head_dim = 4
num_heads = 2
ff_hidden = 32
max_len = 10

# Word Embedding
W_embed = np.random.randn(vocab_size, embedding_dim)

# Positional Encoding
pos_embed = np.random.randn(max_len, embedding_dim)

# MHA Proj
W_q = np.random.randn(num_heads, embedding_dim, head_dim)
W_k = np.random.randn(num_heads, embedding_dim, head_dim)
W_v = np.random.randn(num_heads, embedding_dim, head_dim)
W_o = np.random.randn(num_heads * head_dim, embedding_dim)

# FFN
W1 = np.random.randn(embedding_dim, ff_hidden)
b1 = np.zeros(ff_hidden)
W2 = np.random.randn(ff_hidden, embedding_dim)
b2 = np.zeros(embedding_dim)

# Classifier
W_cls = np.random.randn(embedding_dim, 1)
b_cls = 0



## Fine Tuning Sentiment Analysis

1. data -> sentence and label
2. vocab : add cls and pad in index 1 and 0
3. tokenize : tokens the sentence and add cls and pad
4. input numeric and labels : sentence, token numeric, dim token and labels
5. embedding : seq_len, dim_token
6. positional encoding : seq_len, dim_token
7. pe = positional encoding + embedding
8. multihead attention : batch_size, seq_len, dim_token, num_heads, head_dim
9. feed forward : batch_size, seq_len, dim_token, dim_ffn
10. logits : batch_size, seq_len, num_classes
11. loss : cross entropy
12. update parameter : gradient descent



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim




import math
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import random
from typing import List, Dict, Tuple

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# 1. Sample Data (Sentences and Labels)
# In practice, you would load your own dataset
class SentimentDataset:
    def __init__(self):
        self.sentences = [
            "I love this movie so much!",
            "This was a terrible waste of time.",
            "The acting was quite good in this film.",
            "I would never recommend watching this.",
            "An absolute masterpiece of cinema.",
            "The plot makes no sense at all.",
            "Such a heartwarming story throughout.",
            "I fell asleep halfway through this boring movie.",
            "The director did an outstanding job.",
            "The characters were completely unrealistic."
        ]
        # Labels: 0 = negative, 1 = positive
        self.labels = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0]
    
    def get_data(self):
        return self.sentences, self.labels


# 2. Vocabulary Creation
class Vocabulary:
    def __init__(self):
        # Initialize with special tokens
        self.token2idx = {"[PAD]": 0, "[CLS]": 1, "[UNK]": 2}
        self.idx2token = {0: "[PAD]", 1: "[CLS]", 2: "[UNK]"}
        self.vocab_size = 3  # Start with special tokens
    
    def build_vocab(self, sentences: List[str], min_freq=1):
        """Build vocabulary from the sentences"""
        word_freq = {}
        
        # Count word frequencies
        for sentence in sentences:
            # Simple tokenization by splitting on spaces
            for word in sentence.lower().split():
                # Remove punctuation (simple version)
                word = ''.join(c for c in word if c.isalnum())
                if word:
                    if word in word_freq:
                        word_freq[word] += 1
                    else:
                        word_freq[word] = 1
        
        # Add words that meet minimum frequency
        for word, freq in word_freq.items():
            if freq >= min_freq and word not in self.token2idx:
                self.token2idx[word] = self.vocab_size
                self.idx2token[self.vocab_size] = word
                self.vocab_size += 1
        
        print(f"Vocabulary built with {self.vocab_size} tokens")
    
    def tokenize(self, sentence: str, max_len=128) -> List[int]:
        """Tokenize a sentence and return token indices"""
        # Simple tokenization
        words = sentence.lower().split()
        words = [''.join(c for c in word if c.isalnum()) for word in words]
        words = [word for word in words if word]
        
        # Convert to indices with [CLS] at the beginning
        token_ids = [self.token2idx["[CLS]"]]
        
        for word in words:
            if word in self.token2idx:
                token_ids.append(self.token2idx[word])
            else:
                token_ids.append(self.token2idx["[UNK]"])
        
        # Truncate or pad to max_len
        if len(token_ids) > max_len:
            token_ids = token_ids[:max_len]
        else:
            token_ids += [self.token2idx["[PAD]"]] * (max_len - len(token_ids))
        
        return token_ids


# 3. Sentiment Dataset Class for Training
class SentimentDatasetLoader(Dataset):
    def __init__(self, sentences, labels, vocab, max_len=128):
        self.sentences = sentences
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len
    
    def __len__(self):
        return len(self.sentences)
    
    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        label = self.labels[idx]
        
        # Tokenize sentence
        token_ids = self.vocab.tokenize(sentence, self.max_len)
        
        # Create attention mask (1 for tokens, 0 for padding)
        attention_mask = [1 if id != self.vocab.token2idx["[PAD]"] else 0 for id in token_ids]
        
        return {
            'input_ids': torch.tensor(token_ids, dtype=torch.long),
            'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
            'labels': torch.tensor(label, dtype=torch.long)
        }


# 4. Model Components

# Embedding Layer
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.embed_size = embed_size
    
    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.embed_size)


# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_len=512):
        super(PositionalEncoding, self).__init__()
        
        # Create positional encoding matrix
        pe = torch.zeros(max_len, embed_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-math.log(10000.0) / embed_size))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # Register buffer (won't be updated during backprop)
        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x):
        # x shape: [batch_size, seq_len, embed_size]
        return x + self.pe[:, :x.size(1), :]


# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert embed_size % num_heads == 0, "Embedding size must be divisible by number of heads"
        
        self.embed_size = embed_size
        self.num_heads = num_heads
        self.head_dim = embed_size // num_heads
        
        # Linear projections
        self.query = nn.Linear(embed_size, embed_size)
        self.key = nn.Linear(embed_size, embed_size)
        self.value = nn.Linear(embed_size, embed_size)
        self.out_proj = nn.Linear(embed_size, embed_size)
    
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        
        # Linear projections and reshape for multi-head attention
        q = self.query(query).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        k = self.key(key).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        v = self.value(value).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        
        # Scaled dot-product attention
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        
        # Apply mask if provided
        if mask is not None:
            # Expand mask for multi-head attention
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(mask == 0, -1e9)
        
        # Apply softmax and get weighted sum
        attention_weights = torch.softmax(scores, dim=-1)
        context = torch.matmul(attention_weights, v)
        
        # Reshape back and apply final projection
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.embed_size)
        output = self.out_proj(context)
        
        return output


# Feed Forward Layer
class FeedForward(nn.Module):
    def __init__(self, embed_size, ff_dim):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(embed_size, ff_dim)
        self.linear2 = nn.Linear(ff_dim, embed_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(0.1)
    
    def forward(self, x):
        return self.linear2(self.dropout(self.gelu(self.linear1(x))))


# Layer Normalization
class LayerNorm(nn.Module):
    def __init__(self, embed_size, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(embed_size))
        self.beta = nn.Parameter(torch.zeros(embed_size))
        self.eps = eps
    
    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta


# Transformer Encoder Layer
class EncoderLayer(nn.Module):
    def __init__(self, embed_size, num_heads, ff_dim, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(embed_size, num_heads)
        self.feed_forward = FeedForward(embed_size, ff_dim)
        self.norm1 = LayerNorm(embed_size)
        self.norm2 = LayerNorm(embed_size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        # Self-attention with residual connection and normalization
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        # Feed-forward with residual connection and normalization
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        
        return x


# 5. Full BERT Model for Sentiment Analysis
class BERT(nn.Module):
    def __init__(self, vocab_size, embed_size=768, num_heads=12, ff_dim=3072, 
                 num_layers=6, max_len=128, num_classes=2):
        super(BERT, self).__init__()
        
        # Token embedding and positional encoding
        self.token_embedding = TokenEmbedding(vocab_size, embed_size)
        self.position_encoding = PositionalEncoding(embed_size, max_len)
        self.dropout = nn.Dropout(0.1)
        
        # Transformer encoder layers
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(embed_size, num_heads, ff_dim)
            for _ in range(num_layers)
        ])
        
        # Classification head
        self.classifier = nn.Linear(embed_size, num_classes)
    
    def forward(self, input_ids, attention_mask=None):
        # Get embeddings with positional encoding
        x = self.token_embedding(input_ids)
        x = self.position_encoding(x)
        x = self.dropout(x)
        
        # Apply attention mask if provided
        if attention_mask is not None:
            mask = attention_mask.unsqueeze(1).unsqueeze(2)
        else:
            mask = None
        
        # Pass through encoder layers
        for encoder_layer in self.encoder_layers:
            x = encoder_layer(x, mask)
        
        # Classification using the [CLS] token (first token)
        cls_token = x[:, 0, :]
        logits = self.classifier(cls_token)
        
        return logits


# 6. Training Functions
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch in dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        # Forward pass
        optimizer.zero_grad()
        logits = model(input_ids, attention_mask)
        
        # Calculate loss
        loss = criterion(logits, labels)
        
        # Backward pass and update
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        # Calculate accuracy
        _, predicted = torch.max(logits, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    
    return avg_loss, accuracy


def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            # Forward pass
            logits = model(input_ids, attention_mask)
            
            # Calculate loss
            loss = criterion(logits, labels)
            total_loss += loss.item()
            
            # Calculate accuracy
            _, predicted = torch.max(logits, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    
    return avg_loss, accuracy


# 7. Main Training Pipeline
def main():
    # Check for GPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Load data
    data = SentimentDataset()
    sentences, labels = data.get_data()
    
    # Create vocabulary
    vocab = Vocabulary()
    vocab.build_vocab(sentences)
    
    # Create dataset
    dataset = SentimentDatasetLoader(sentences, labels, vocab)
    
    # Split into train and validation sets (80-20 split)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=2)
    
    # Initialize model
    model = BERT(
        vocab_size=vocab.vocab_size,
        embed_size=256,     # Smaller than original BERT for demonstration
        num_heads=8,
        ff_dim=512,
        num_layers=2,       # Fewer layers for faster training
        max_len=128,
        num_classes=2       # Binary sentiment classification
    ).to(device)
    
    # Initialize loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=2e-5)
    
    # Training loop
    num_epochs = 10
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    
    print("Starting training...")
    
    for epoch in range(num_epochs):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)
        
        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    # Save the model
    torch.save(model.state_dict(), "bert_sentiment.pt")
    print("Model saved to bert_sentiment.pt")
    
    # Plot training history
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss History')
    
    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Accuracy')
    plt.plot(val_accs, label='Val Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Accuracy History')
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    print("Training history plot saved to training_history.png")


# 8. Test the model on new sentences
def predict_sentiment(model, vocab, sentence, device):
    model.eval()
    
    # Tokenize sentence
    tokens = vocab.tokenize(sentence)
    input_ids = torch.tensor([tokens]).to(device)
    attention_mask = torch.tensor([[1 if id != vocab.token2idx["[PAD]"] else 0 for id in tokens]]).to(device)
    
    # Get prediction
    with torch.no_grad():
        logits = model(input_ids, attention_mask)
        _, predicted = torch.max(logits, 1)
    
    return "Positive" if predicted.item() == 1 else "Negative"


if __name__ == "__main__":
    # Run the training pipeline
    main()
    
    # Optional: Demonstrate prediction
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Initialize model and load weights
    data = SentimentDataset()
    sentences, _ = data.get_data()
    
    vocab = Vocabulary()
    vocab.build_vocab(sentences)
    
    model = BERT(
        vocab_size=vocab.vocab_size,
        embed_size=256,
        num_heads=8,
        ff_dim=512,
        num_layers=2,
        max_len=128,
        num_classes=2
    ).to(device)
    
    # Load the saved model (if available)
    try:
        model.load_state_dict(torch.load("bert_sentiment.pt"))
        print("Loaded saved model.")
        
        # Test on new sentences
        test_sentences = [
            "This movie was amazing, I loved it!",
            "I really hated the ending of this show.",
            "The film was neither good nor bad, just average."
        ]
        
        print("\nTesting on new sentences:")
        for sentence in test_sentences:
            sentiment = predict_sentiment(model, vocab, sentence, device)
            print(f"Sentence: '{sentence}'")
            print(f"Predicted sentiment: {sentiment}\n")
            
    except FileNotFoundError:
        print("No saved model found. Run training first.")

In [None]:
## DATA

# sentences -> label 1 = positif, 0 = negatif
sentences = [
    ("aku suka banget sama film ini", 1),
    ("filmnya bener-bener membosankan", 0),
    ("aktingnya luar biasa", 1),
    ("ngantuk banget pas nonton", 0),
    ("ceritanya bikin terharu", 1),
    ("gak masuk akal dan jelek", 0),
]

# vocab
vocab = {}
vocab['[CLS]'] = 1
vocab['[PAD]'] = 0

idx = 2

for sentence, labels in sentences:
    for word in sentence.split():
        vocab[word] = idx
        idx += 1
    
## word to index and index to word
w2id = {word : idx for idx, word in enumerate(vocab)}
id2w = {idx : word for word, idx in vocab.items()}

## tokenize
def tokenize(sentence, max_len=10):
    tokens = sentence.lower().split()
    ids = [vocab["[CLS]"]] + [vocab.get(tok, 0) for tok in tokens]
    # Padding
    if len(ids) < max_len:
        ids += [vocab["[PAD]"]] * (max_len - len(ids))
    else:
        ids = ids[:max_len]
    return ids

## input and label
import numpy as np

input = []
lables = []

for sentence, labels in sentences:
    ids = tokenize(sentence)
    input.append(ids)
    lables.append(labels)

input = np.array(input)
labels = np.array(lables)

## embedding layer
dim = 4

np.random.seed(4)
embedding_matrix = np.random.randn(len(vocab)+1, dim)
input_embedding = embedding_matrix[input]

## positional encoding
max_len = 10

pos = np.arange(max_len)[:, np.newaxis]  # (10, 1)
i = np.arange(dim)[np.newaxis, :]        # (1, 8)

# Hitung angle rates
angle_rates = 1 / np.power(10000, (2 * (i // 2)) / dim)
np.set_printoptions(precision=4, suppress=True)


# Hitung angle radians
angle_rads = pos * angle_rates

# Terapkan sin ke index genap dan cos ke index ganjil
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])  # even
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  # odd

pe = input_embedding + angle_rads[np.newaxis, :input_embedding.shape[1], :]

## multihead self attention
# Parameters
batch_size, seq_len, embed_dim = pe.shape
num_heads = 4
head_dim = embed_dim // num_heads

print("batch_size:", batch_size)
print("seq_len   :", seq_len)
print("embed_dim :", embed_dim)
print("num_heads :", num_heads)
print("head_dim  :", head_dim)

# Weights
wq = np.random.randn(num_heads, embed_dim, head_dim)
wk = np.random.randn(num_heads, embed_dim, head_dim)
wv = np.random.randn(num_heads, embed_dim, head_dim)
wo = np.random.randn(num_heads * head_dim, embed_dim)

# Proyeksi Q, K, V
# hasil: (batch, head, seq_len, head_dim)
Q = np.einsum('bse,hed->bhsd', pe, wq)
K = np.einsum('bse,hed->bhsd', pe, wk)
V = np.einsum('bse,hed->bhsd', pe, wv)

# Scaled dot-product attention
# (b, h, s, s)
scores = np.einsum('bhsd,bhtd->bhst', Q, K) / np.sqrt(head_dim)

# Softmax
scores -= np.max(scores, axis=-1, keepdims=True)
att_weights = np.exp(scores)
att_weights /= np.sum(att_weights, axis=-1, keepdims=True)

# Attention output: (b, h, s, d)
att_out = np.einsum('bhst,bhtd->bhsd', att_weights, V)

# Concatenate heads: (b, s, h*d)
att_concat = att_out.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, -1)

# Output projection: (b, s, embed_dim)
att_projected = np.einsum('bsd,df->bsf', att_concat, wo)


## NORM AND ADD
def norm(x):
    eps = 1e-6
    mean = np.mean(x, axis=-1, keepdims=True)
    std = np.std(x, axis=-1, keepdims=True)
    return (x - mean) / (std + eps)

# Residual + norm
add = pe + att_projected
norm1 = norm(add)

## FFN
batch_size, seq_len, embed_dim = pe.shape
ff_dim = 2 * embed_dim
W1 = np.random.randn(embed_dim, ff_dim)
b1 = np.zeros(ff_dim)
W2 = np.random.randn(ff_dim, embed_dim)
b2 = np.zeros(embed_dim)

# Dense -> ReLU -> Dense

expanded_input = norm1 @ W1 + b1

relu = np.maximum(0, expanded_input) 

compressed_output = relu @ W2 + b2

## NORM AND ADD
add1 = norm1 + compressed_output
norm2 = norm(add1)

## CLASSIFICATION HEAD
sentence_ids = input[b]
sentence_words = [id2w.get(i, "[UNK]") for i in sentence_ids]
true_label = labels[b]

cls_output = norm2[:, 0, :]
num_classes = 2
W_cls = np.random.randn(embed_dim, num_classes)
b_cls = np.zeros(num_classes)

# Logits & Probabilities
logit = cls_output @ W_cls + b_cls
prob = softmax(logit)
pred_label = np.argmax(prob)
loss = cross_entropy(prob, true_label)




In [1]:
import numpy as np

class VocabProcessor:
    def __init__(self):
        self.vocab = {
            '[CLS]': 1,
            '[PAD]': 0
        }
        self.w2id = None
        self.id2w = None
    
    def build_vocab(self, sentences):
        """Build vocabulary from list of sentences"""
        idx = 2  # Start after special tokens
        
        for sentence, _ in sentences:
            for word in sentence.split():
                if word not in self.vocab:
                    self.vocab[word] = idx
                    idx += 1
        
        # Create mappings
        self.w2id = {word: idx for word, idx in self.vocab.items()}
        self.id2w = {idx: word for word, idx in self.w2id.items()}
        
        return self.vocab
    
    def tokenize(self, sentence, max_len=10):
        """Convert sentence to token IDs with padding/truncation"""
        tokens = sentence.lower().split()
        ids = [self.vocab["[CLS]"]] + [self.vocab.get(tok, 0) for tok in tokens]
        
        # Padding
        if len(ids) < max_len:
            ids += [self.vocab["[PAD]"]] * (max_len - len(ids))
        else:
            ids = ids[:max_len]
        
        return ids


class EmbeddingLayer:
    def __init__(self, vocab_size, embed_dim, seed=4):
        np.random.seed(seed)
        self.embed_dim = embed_dim
        self.embedding_matrix = np.random.randn(vocab_size, embed_dim)
    
    def forward(self, input_ids):
        """Convert token IDs to embeddings"""
        return self.embedding_matrix[input_ids]


class PositionalEncoder:
    def __init__(self, max_len, embed_dim):
        self.max_len = max_len
        self.embed_dim = embed_dim
        self.pe = self._create_positional_encoding()
    
    def _create_positional_encoding(self):
        pos = np.arange(self.max_len)[:, np.newaxis]  # (max_len, 1)
        i = np.arange(self.embed_dim)[np.newaxis, :]  # (1, embed_dim)
        
        # Calculate angle rates
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / self.embed_dim)
        
        # Calculate angle radians
        angle_rads = pos * angle_rates
        
        # Apply sin to even indices and cos to odd indices
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        
        return angle_rads
    
    def forward(self, embeddings):
        """Add positional encoding to embeddings"""
        batch_size, seq_len, _ = embeddings.shape
        return embeddings + self.pe[:seq_len, :]


class MultiHeadAttention:
    def __init__(self, embed_dim, num_heads):
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        
        # Initialize weights
        self.wq = np.random.randn(num_heads, embed_dim, self.head_dim)
        self.wk = np.random.randn(num_heads, embed_dim, self.head_dim)
        self.wv = np.random.randn(num_heads, embed_dim, self.head_dim)
        self.wo = np.random.randn(num_heads * self.head_dim, embed_dim)
    
    def forward(self, x):
        batch_size, seq_len, _ = x.shape
        
        # Project to Q, K, V
        Q = np.einsum('bse,hed->bhsd', x, self.wq)
        K = np.einsum('bse,hed->bhsd', x, self.wk)
        V = np.einsum('bse,hed->bhsd', x, self.wv)
        
        # Scaled dot-product attention
        scores = np.einsum('bhsd,bhtd->bhst', Q, K) / np.sqrt(self.head_dim)
        
        # Softmax
        scores -= np.max(scores, axis=-1, keepdims=True)
        att_weights = np.exp(scores)
        att_weights /= np.sum(att_weights, axis=-1, keepdims=True)
        
        # Attention output
        att_out = np.einsum('bhst,bhtd->bhsd', att_weights, V)
        
        # Concatenate heads
        att_concat = att_out.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, -1)
        
        # Output projection
        att_projected = np.einsum('bsd,df->bsf', att_concat, self.wo)
        
        return att_projected


class LayerNorm:
    def __init__(self, eps=1e-6):
        self.eps = eps
    
    def forward(self, x):
        mean = np.mean(x, axis=-1, keepdims=True)
        std = np.std(x, axis=-1, keepdims=True)
        return (x - mean) / (std + self.eps)


class FeedForward:
    def __init__(self, embed_dim, ff_dim=None):
        if ff_dim is None:
            ff_dim = 2 * embed_dim
            
        self.W1 = np.random.randn(embed_dim, ff_dim)
        self.b1 = np.zeros(ff_dim)
        self.W2 = np.random.randn(ff_dim, embed_dim)
        self.b2 = np.zeros(embed_dim)
    
    def forward(self, x):
        # First dense layer
        expanded_input = x @ self.W1 + self.b1
        
        # ReLU activation
        relu = np.maximum(0, expanded_input)
        
        # Second dense layer
        return relu @ self.W2 + self.b2


class TransformerBlock:
    def __init__(self, embed_dim, num_heads):
        self.attention = MultiHeadAttention(embed_dim, num_heads)
        self.norm1 = LayerNorm()
        self.feed_forward = FeedForward(embed_dim)
        self.norm2 = LayerNorm()
    
    def forward(self, x):
        # Multi-head attention
        att_output = self.attention.forward(x)
        
        # Add & norm
        add1 = x + att_output
        norm1 = self.norm1.forward(add1)
        
        # Feed-forward network
        ff_output = self.feed_forward.forward(norm1)
        
        # Add & norm
        add2 = norm1 + ff_output
        norm2 = self.norm2.forward(add2)
        
        return norm2


class ClassificationHead:
    def __init__(self, embed_dim, num_classes=2):
        self.W_cls = np.random.randn(embed_dim, num_classes)
        self.b_cls = np.zeros(num_classes)
    
    def forward(self, x):
        # Use the [CLS] token output
        cls_output = x[:, 0, :]
        
        # Calculate logits
        logits = cls_output @ self.W_cls + self.b_cls
        
        return logits
    
    def softmax(self, x):
        x -= np.max(x, axis=-1, keepdims=True)
        exp_x = np.exp(x)
        return exp_x / np.sum(exp_x, axis=-1, keepdims=True)
    
    def predict(self, x):
        logits = self.forward(x)
        probs = self.softmax(logits)
        return np.argmax(probs, axis=-1)
    
    def cross_entropy_loss(self, logits, labels):
        batch_size = logits.shape[0]
        probs = self.softmax(logits)
        
        # Get probability of the true class for each sample
        true_probs = np.array([probs[i, labels[i]] for i in range(batch_size)])
        
        # Calculate cross-entropy loss
        return -np.log(true_probs + 1e-10)


class BERTSentimentClassifier:
    def __init__(self, max_len=10, embed_dim=4, num_heads=4):
        self.max_len = max_len
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.vocab_processor = VocabProcessor()
        self.embedding_layer = None  # Will be initialized after vocabulary is built
        self.positional_encoder = PositionalEncoder(max_len, embed_dim)
        self.transformer = TransformerBlock(embed_dim, num_heads)
        self.classifier = ClassificationHead(embed_dim)
    
    def initialize(self, sentences):
        # Build vocabulary
        self.vocab_processor.build_vocab(sentences)
        
        # Initialize embedding layer with the vocabulary size
        vocab_size = len(self.vocab_processor.vocab) + 1  # +1 for handling unknown tokens
        self.embedding_layer = EmbeddingLayer(vocab_size, self.embed_dim)
    
    def _prepare_batch(self, sentences):
        # Convert sentences to token IDs
        input_ids = np.array([self.vocab_processor.tokenize(sentence) for sentence, _ in sentences])
        labels = np.array([label for _, label in sentences])
        return input_ids, labels
    
    def forward(self, input_ids):
        # Embedding layer
        embeddings = self.embedding_layer.forward(input_ids)
        
        # Add positional encoding
        pos_embeddings = self.positional_encoder.forward(embeddings)
        
        # Transformer block
        transformer_output = self.transformer.forward(pos_embeddings)
        
        # Classification logits
        logits = self.classifier.forward(transformer_output)
        
        return logits
    
    def train_step(self, batch, learning_rate=0.01):
        # This is a simplified training step - in a real implementation, 
        # you would compute gradients and update weights
        input_ids, labels = self._prepare_batch(batch)
        logits = self.forward(input_ids)
        loss = self.classifier.cross_entropy_loss(logits, labels)
        
        # Here you would add backpropagation and parameter updates
        # For demonstration purposes, we're returning the loss
        return np.mean(loss)
    
    def predict(self, sentences):
        # For single sentence or list of sentence strings
        if isinstance(sentences, str):
            sentences = [(sentences, 0)]  # Dummy label
        elif isinstance(sentences[0], str):
            sentences = [(sent, 0) for sent in sentences]  # Dummy labels
            
        input_ids, _ = self._prepare_batch(sentences)
        logits = self.forward(input_ids)
        return self.classifier.predict(logits)
    
    def evaluate(self, test_data):
        input_ids, true_labels = self._prepare_batch(test_data)
        logits = self.forward(input_ids)
        pred_labels = self.classifier.predict(logits)
        
        accuracy = np.mean(pred_labels == true_labels)
        return accuracy


# Example usage
if __name__ == "__main__":
    # Sample data
    sentences = [
        ("aku suka banget sama film ini", 1),
        ("filmnya bener-bener membosankan", 0),
        ("aktingnya luar biasa", 1),
        ("ngantuk banget pas nonton", 0),
        ("ceritanya bikin terharu", 1),
        ("gak masuk akal dan jelek", 0),
    ]
    
    # Create and initialize the model
    model = BERTSentimentClassifier(max_len=10, embed_dim=4, num_heads=4)
    model.initialize(sentences)
    
    # Training loop (simplified)
    num_epochs = 10
    for epoch in range(num_epochs):
        loss = model.train_step(sentences)
        print(f"Epoch {epoch+1}, Loss: {loss:.4f}")
    
    # Evaluate
    accuracy = model.evaluate(sentences)
    print(f"Accuracy: {accuracy:.4f}")
    
    # Predict new sentences
    new_sentences = [
        ("film ini sangat bagus", 0),  # Should predict 1 (positive)
        ("saya kecewa dengan ceritanya", 0)  # Should predict 0 (negative)
    ]
    
    predictions = model.predict(new_sentences)
    for i, (sentence, _) in enumerate(new_sentences):
        sentiment = "Positive" if predictions[i] == 1 else "Negative"
        print(f"Sentence: '{sentence}', Predicted: {sentiment}")

Epoch 1, Loss: 4.0190
Epoch 2, Loss: 4.0190
Epoch 3, Loss: 4.0190
Epoch 4, Loss: 4.0190
Epoch 5, Loss: 4.0190
Epoch 6, Loss: 4.0190
Epoch 7, Loss: 4.0190
Epoch 8, Loss: 4.0190
Epoch 9, Loss: 4.0190
Epoch 10, Loss: 4.0190


IndexError: too many indices for array: array is 2-dimensional, but 3 were indexed

In [4]:
import numpy as np

class BERTSentimentClassifier:
    def __init__(self, max_len=10, embed_dim=4, num_heads=4):
        self.max_len = max_len
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.vocab_processor = VocabProcessor()
        self.embedding_layer = None  # Will be initialized after vocabulary is built
        self.positional_encoder = PositionalEncoder(max_len, embed_dim)
        self.transformer = TransformerBlock(embed_dim, num_heads)
        self.classifier = ClassificationHead(embed_dim)
    
    def initialize(self, sentences):
        # Build vocabulary
        self.vocab_processor.build_vocab(sentences)
        
        # Initialize embedding layer with the vocabulary size
        vocab_size = len(self.vocab_processor.vocab) + 1  # +1 for handling unknown tokens
        self.embedding_layer = EmbeddingLayer(vocab_size, self.embed_dim)
    
    def _prepare_batch(self, sentences):
        # Convert sentences to token IDs
        input_ids = np.array([self.vocab_processor.tokenize(sentence) for sentence, _ in sentences])
        labels = np.array([label for _, label in sentences])
        return input_ids, labels
    
    def forward(self, input_ids, cache=True):
        # Embedding layer
        embeddings = self.embedding_layer.forward(input_ids, cache)
        
        # Add positional encoding
        batch_size, seq_len, _ = embeddings.shape
        pos_embeddings = self.positional_encoder.forward(embeddings)
        
        # Transformer block
        transformer_output = self.transformer.forward(pos_embeddings, cache)
        
        # Classification logits
        logits = self.classifier.forward(transformer_output, cache)
        
        return logits
    
    def backward(self, logits, labels):
        # Loss and gradients through classifier
        loss = np.mean(self.classifier.cross_entropy_loss(logits, labels))
        
        # Backpropagate through classifier
        dW_cls, db_cls, dcls_output = self.classifier.backward()
        
        # Create gradient for transformer output
        # Only the [CLS] token is used for classification
        dtransformer_output = np.zeros((dcls_output.shape[0], self.max_len, self.embed_dim))
        dtransformer_output[:, 0, :] = dcls_output  # Only pass gradient to [CLS] token
        
        # Backpropagate through transformer
        transformer_grads = self.transformer.backward(dtransformer_output)
        
        # Backpropagate through embedding layer
        dembedding_matrix = self.embedding_layer.backward(transformer_grads['dx'])
        
        # Collect all gradients to return
        gradients = {
            'dW_cls': dW_cls,
            'db_cls': db_cls,
            'dembedding_matrix': dembedding_matrix,
            'transformer': transformer_grads
        }
        
        return loss, gradients
    
    def update_parameters(self, gradients, learning_rate):
        # Update classifier weights
        self.classifier.W_cls -= learning_rate * gradients['dW_cls']
        self.classifier.b_cls -= learning_rate * gradients['db_cls']
        
        # Update embedding matrix
        self.embedding_layer.embedding_matrix -= learning_rate * gradients['dembedding_matrix']
        
        # Update transformer parameters
        # Update attention weights
        self.transformer.attention.wq -= learning_rate * gradients['transformer']['wq']
        self.transformer.attention.wk -= learning_rate * gradients['transformer']['wk']
        self.transformer.attention.wv -= learning_rate * gradients['transformer']['wv']
        self.transformer.attention.wo -= learning_rate * gradients['transformer']['wo']
        
        # Update feed-forward weights
        self.transformer.feed_forward.W1 -= learning_rate * gradients['transformer']['W1']
        self.transformer.feed_forward.b1 -= learning_rate * gradients['transformer']['b1']
        self.transformer.feed_forward.W2 -= learning_rate * gradients['transformer']['W2']
        self.transformer.feed_forward.b2 -= learning_rate * gradients['transformer']['b2']
    
    def train_step(self, batch, learning_rate=0.01):
        # Prepare batch data
        input_ids, labels = self._prepare_batch(batch)
        
        # Forward pass
        logits = self.forward(input_ids)
        
        # Backward pass to compute gradients
        loss, gradients = self.backward(logits, labels)
        
        # Update parameters
        self.update_parameters(gradients, learning_rate)
        
        return loss
    
    def train(self, data, epochs=10, learning_rate=0.01, batch_size=None):
        """Train the model for multiple epochs"""
        # If batch_size is None, use the entire dataset as one batch
        if batch_size is None:
            batch_size = len(data)
        
        # Training history
        history = {'loss': []}
        
        for epoch in range(epochs):
            # Shuffle data
            np.random.shuffle(data)
            
            # Initialize epoch loss
            epoch_loss = 0
            
            # Process batches
            for i in range(0, len(data), batch_size):
                batch = data[i:i+batch_size]
                batch_loss = self.train_step(batch, learning_rate)
                epoch_loss += batch_loss
            
            # Average loss for the epoch
            avg_loss = epoch_loss / (len(data) / batch_size)
            history['loss'].append(avg_loss)
            
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
        
        return history
    
    def predict(self, sentences):
        # For single sentence or list of sentence strings
        if isinstance(sentences, str):
            sentences = [(sentences, 0)]  # Dummy label
        elif isinstance(sentences[0], str):
            sentences = [(sent, 0) for sent in sentences]  # Dummy labels
            
        input_ids, _ = self._prepare_batch(sentences)
        logits = self.forward(input_ids, cache=False)  # No need to cache during inference
        return self.classifier.predict(logits)
    
    def evaluate(self, test_data):
        input_ids, true_labels = self._prepare_batch(test_data)
        logits = self.forward(input_ids, cache=False)
        pred_labels = self.classifier.predict(logits)
        
        accuracy = np.mean(pred_labels == true_labels)
        return accuracy

class VocabProcessor:
    def __init__(self):
        self.vocab = {
            '[CLS]': 1,
            '[PAD]': 0
        }
        self.w2id = None
        self.id2w = None
    
    def build_vocab(self, sentences):
        """Build vocabulary from list of sentences"""
        idx = 2  # Start after special tokens
        
        for sentence, _ in sentences:
            for word in sentence.split():
                if word not in self.vocab:
                    self.vocab[word] = idx
                    idx += 1
        
        # Create mappings
        self.w2id = {word: idx for word, idx in self.vocab.items()}
        self.id2w = {idx: word for word, idx in self.w2id.items()}
        
        return self.vocab
    
    def tokenize(self, sentence, max_len=10):
        """Convert sentence to token IDs with padding/truncation"""
        tokens = sentence.lower().split()
        ids = [self.vocab["[CLS]"]] + [self.vocab.get(tok, 0) for tok in tokens]
        
        # Padding
        if len(ids) < max_len:
            ids += [self.vocab["[PAD]"]] * (max_len - len(ids))
        else:
            ids = ids[:max_len]
        
        return ids


class EmbeddingLayer:
    def __init__(self, vocab_size, embed_dim, seed=4):
        np.random.seed(seed)
        self.embed_dim = embed_dim
        self.vocab_size = vocab_size
        self.embedding_matrix = np.random.randn(vocab_size, embed_dim) * 0.01
        self.cache = {}
    
    def forward(self, input_ids, cache=True):
        """Convert token IDs to embeddings"""
        embeddings = self.embedding_matrix[input_ids]
        
        if cache:
            self.cache['input_ids'] = input_ids
        
        return embeddings
    
    def backward(self, dembeddings):
        """Backpropagate gradient to embedding matrix"""
        input_ids = self.cache['input_ids']
        batch_size, seq_len = input_ids.shape
        
        # Initialize gradients for embedding matrix
        dembedding_matrix = np.zeros_like(self.embedding_matrix)
        
        # For each position in the batch, accumulate gradients for the corresponding word vector
        for b in range(batch_size):
            for s in range(seq_len):
                word_idx = input_ids[b, s]
                dembedding_matrix[word_idx] += dembeddings[b, s]
        
        return dembedding_matrix


class PositionalEncoder:
    def __init__(self, max_len, embed_dim):
        self.max_len = max_len
        self.embed_dim = embed_dim
        self.pe = self._create_positional_encoding()
    
    def _create_positional_encoding(self):
        pos = np.arange(self.max_len)[:, np.newaxis]  # (max_len, 1)
        i = np.arange(self.embed_dim)[np.newaxis, :]  # (1, embed_dim)
        
        # Calculate angle rates
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / self.embed_dim)
        
        # Calculate angle radians
        angle_rads = pos * angle_rates
        
        # Apply sin to even indices and cos to odd indices
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        
        return angle_rads
    
    def forward(self, embeddings):
        """Add positional encoding to embeddings"""
        batch_size, seq_len, _ = embeddings.shape
        return embeddings + self.pe[:seq_len, :]
    
    # No need for backward method as positional encodings are fixed


class MultiHeadAttention:
    def __init__(self, embed_dim, num_heads):
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        
        # Initialize weights with small random values
        self.wq = np.random.randn(num_heads, embed_dim, self.head_dim) * 0.01
        self.wk = np.random.randn(num_heads, embed_dim, self.head_dim) * 0.01
        self.wv = np.random.randn(num_heads, embed_dim, self.head_dim) * 0.01
        self.wo = np.random.randn(num_heads * self.head_dim, embed_dim) * 0.01
        self.cache = {}
    
    def forward(self, x, cache=True):
        batch_size, seq_len, _ = x.shape
        
        # Project to Q, K, V
        Q = np.einsum('bse,hed->bhsd', x, self.wq)
        K = np.einsum('bse,hed->bhsd', x, self.wk)
        V = np.einsum('bse,hed->bhsd', x, self.wv)
        
        # Scaled dot-product attention
        scores = np.einsum('bhsd,bhtd->bhst', Q, K) / np.sqrt(self.head_dim)
        
        # Softmax
        scores_max = np.max(scores, axis=-1, keepdims=True)
        exp_scores = np.exp(scores - scores_max)
        att_weights = exp_scores / np.sum(exp_scores, axis=-1, keepdims=True)
        
        # Attention output
        att_out = np.einsum('bhst,bhtd->bhsd', att_weights, V)
        
        # Concatenate heads
        att_concat = att_out.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, -1)
        
        # Output projection
        att_projected = np.einsum('bsd,df->bsf', att_concat, self.wo)
        
        if cache:
            self.cache['x'] = x
            self.cache['Q'] = Q
            self.cache['K'] = K
            self.cache['V'] = V
            self.cache['scores'] = scores
            self.cache['att_weights'] = att_weights
            self.cache['att_out'] = att_out
            self.cache['att_concat'] = att_concat
        
        return att_projected
    
    def backward(self, datt_projected):
        # Retrieve from cache
        x = self.cache['x']
        Q = self.cache['Q']
        K = self.cache['K']
        V = self.cache['V']
        att_weights = self.cache['att_weights']
        att_out = self.cache['att_out']
        att_concat = self.cache['att_concat']
        
        batch_size, seq_len, _ = x.shape
        
        # Gradient w.r.t output projection (wo)
        dwo = np.einsum('bsd,bsf->df', att_concat, datt_projected)
        
        # Gradient w.r.t concatenated attention
        datt_concat = np.einsum('bsf,df->bsd', datt_projected, self.wo)
        
        # Reshape back to multi-head format
        datt_out = datt_concat.reshape(batch_size, seq_len, self.num_heads, self.head_dim).transpose(0, 2, 1, 3)
        
        # Gradient w.r.t V
        dwv = np.einsum('bhst,bhsd->hed', att_weights, datt_out)
        
        # Gradient w.r.t attention weights
        datt_weights = np.einsum('bhsd,bhtd->bhst', datt_out, V)
        
        # Gradient through softmax
        dscores = att_weights * (datt_weights - np.sum(att_weights * datt_weights, axis=-1, keepdims=True))
        
        # Gradient w.r.t Q and K
        dwq = np.einsum('bhst,bse->hed', dscores / np.sqrt(self.head_dim), x)
        dwk = np.einsum('bhst,bte->hed', dscores.transpose(0, 1, 3, 2) / np.sqrt(self.head_dim), x)
        
        # Gradient w.r.t input x (for Q)
        dx_q = np.einsum('bhsd,hed->bse', dscores / np.sqrt(self.head_dim), self.wq)
        
        # Gradient w.r.t input x (for K)
        dx_k = np.einsum('bhts,hed->bte', dscores / np.sqrt(self.head_dim), self.wk)
        
        # Gradient w.r.t input x (for V)
        dx_v = np.einsum('bhst,hed->bte', att_weights, dwv)
        
        # Combine gradients from Q, K, and V paths
        dx = dx_q + dx_k + dx_v
        
        return dwq, dwk, dwv, dwo, dx


class LayerNorm:
    def __init__(self, eps=1e-6):
        self.eps = eps
        self.cache = {}
    
    def forward(self, x, cache=True):
        mean = np.mean(x, axis=-1, keepdims=True)
        var = np.var(x, axis=-1, keepdims=True)
        std = np.sqrt(var + self.eps)
        normalized = (x - mean) / std
        
        if cache:
            self.cache['x'] = x
            self.cache['mean'] = mean
            self.cache['var'] = var
            self.cache['std'] = std
            self.cache['normalized'] = normalized
        
        return normalized
    
    def backward(self, dout):
        # Retrieve from cache
        x = self.cache['x']
        mean = self.cache['mean']
        var = self.cache['var']
        std = self.cache['std']
        
        N = x.shape[-1]
        
        # Gradient calculations for layer normalization
        # Based on derivation of layer norm backward pass
        dx_normalized = dout
        
        # Gradient w.r.t variance
        dvar = -0.5 * np.sum(dx_normalized * (x - mean) * np.power(var + self.eps, -1.5), axis=-1, keepdims=True)
        
        # Gradient w.r.t mean
        dmean = -np.sum(dx_normalized / std, axis=-1, keepdims=True) + dvar * np.mean(-2.0 * (x - mean), axis=-1, keepdims=True)
        
        # Gradient w.r.t input
        dx = dx_normalized / std + dvar * 2.0 * (x - mean) / N + dmean / N
        
        return dx


class FeedForward:
    def __init__(self, embed_dim, ff_dim=None):
        if ff_dim is None:
            ff_dim = 2 * embed_dim
            
        self.W1 = np.random.randn(embed_dim, ff_dim) * 0.01
        self.b1 = np.zeros(ff_dim)
        self.W2 = np.random.randn(ff_dim, embed_dim) * 0.01
        self.b2 = np.zeros(embed_dim)
        self.cache = {}
    
    def forward(self, x, cache=True):
        # First dense layer
        expanded_input = x @ self.W1 + self.b1
        
        # ReLU activation
        relu = np.maximum(0, expanded_input)
        
        # Second dense layer
        output = relu @ self.W2 + self.b2
        
        if cache:
            self.cache['input'] = x
            self.cache['expanded_input'] = expanded_input
            self.cache['relu'] = relu
        
        return output
    
    def backward(self, doutput):
        # Retrieve from cache
        x = self.cache['input']
        expanded_input = self.cache['expanded_input']
        relu = self.cache['relu']
        
        # Gradient w.r.t W2 and b2
        dW2 = relu.T @ doutput
        db2 = np.sum(doutput, axis=0)
        
        # Gradient w.r.t relu
        drelu = doutput @ self.W2.T
        
        # Gradient through ReLU (ReLU gradient is 1 where input > 0, 0 otherwise)
        dexpanded_input = drelu * (expanded_input > 0)
        
        # Gradient w.r.t W1 and b1
        dW1 = x.T @ dexpanded_input
        db1 = np.sum(dexpanded_input, axis=0)
        
        # Gradient w.r.t input
        dx = dexpanded_input @ self.W1.T
        
        return dW1, db1, dW2, db2, dx


class TransformerBlock:
    def __init__(self, embed_dim, num_heads):
        self.attention = MultiHeadAttention(embed_dim, num_heads)
        self.norm1 = LayerNorm()
        self.feed_forward = FeedForward(embed_dim)
        self.norm2 = LayerNorm()
        self.cache = {}
    
    def forward(self, x, cache=True):
        # Multi-head attention
        att_output = self.attention.forward(x, cache)
        
        # Add & norm
        add1 = x + att_output
        norm1 = self.norm1.forward(add1, cache)
        
        # Feed-forward network
        ff_output = self.feed_forward.forward(norm1, cache)
        
        # Add & norm
        add2 = norm1 + ff_output
        norm2 = self.norm2.forward(add2, cache)
        
        if cache:
            self.cache['x'] = x
            self.cache['att_output'] = att_output
            self.cache['add1'] = add1
            self.cache['norm1'] = norm1
            self.cache['ff_output'] = ff_output
            self.cache['add2'] = add2
        
        return norm2
    
    def backward(self, dout):
        # Retrieve from cache
        x = self.cache['x']
        att_output = self.cache['att_output']
        add1 = self.cache['add1']
        norm1 = self.cache['norm1']
        ff_output = self.cache['ff_output']
        
        # Backward through norm2
        dadd2 = self.norm2.backward(dout)
        
        # Gradient split at residual connection
        dnorm1 = dadd2
        dff_output = dadd2
        
        # Backward through feed-forward
        dW1, db1, dW2, db2, dnorm1_ff = self.feed_forward.backward(dff_output)
        
        # Add gradients from residual connection and FF path
        dnorm1 += dnorm1_ff
        
        # Backward through norm1
        dadd1 = self.norm1.backward(dnorm1)
        
        # Gradient split at residual connection
        dx = dadd1
        datt_output = dadd1
        
        # Backward through attention
        dwq, dwk, dwv, dwo, dx_att = self.attention.backward(datt_output)
        
        # Add gradients from residual connection and attention path
        dx += dx_att
        
        return {
            'wq': dwq, 'wk': dwk, 'wv': dwv, 'wo': dwo,
            'W1': dW1, 'b1': db1, 'W2': dW2, 'b2': db2,
            'dx': dx
        }


class ClassificationHead:
    def __init__(self, embed_dim, num_classes=2):
        self.W_cls = np.random.randn(embed_dim, num_classes) * 0.01  # Initialize with small values
        self.b_cls = np.zeros(num_classes)
        self.cache = {}  # For storing intermediates for backpropagation
    
    def forward(self, x, cache=True):
        # Use the [CLS] token output
        cls_output = x[:, 0, :]
        
        # Calculate logits
        logits = cls_output @ self.W_cls + self.b_cls
        
        if cache:
            self.cache['cls_output'] = cls_output
            self.cache['logits'] = logits
        
        return logits
    
    def softmax(self, x):
        x_shifted = x - np.max(x, axis=-1, keepdims=True)
        exp_x = np.exp(x_shifted)
        probs = exp_x / np.sum(exp_x, axis=-1, keepdims=True)
        return probs
    
    def predict(self, x):
        logits = self.forward(x, cache=False)
        probs = self.softmax(logits)
        return np.argmax(probs, axis=-1)
    
    def cross_entropy_loss(self, logits, labels):
        batch_size = logits.shape[0]
        probs = self.softmax(logits)
        
        # Store for backpropagation
        self.cache['probs'] = probs
        self.cache['labels'] = labels
        
        # Get probability of the true class for each sample
        true_probs = np.array([probs[i, labels[i]] for i in range(batch_size)])
        
        # Calculate cross-entropy loss
        return -np.log(true_probs + 1e-10)
    
    def backward(self):
        # Retrieve from cache
        probs = self.cache['probs']
        labels = self.cache['labels']
        cls_output = self.cache['cls_output']
        
        batch_size = probs.shape[0]
        
        # Gradient of cross entropy w.r.t. softmax output
        dprobs = probs.copy()
        # Subtract 1 from the correct class probability
        for i in range(batch_size):
            dprobs[i, labels[i]] -= 1
        dprobs /= batch_size  # Average over batch
        
        # Gradient w.r.t. weights and biases
        dW_cls = cls_output.T @ dprobs
        db_cls = np.sum(dprobs, axis=0)
        
        # Gradient w.r.t. input (cls_output)
        dcls_output = dprobs @ self.W_cls.T
        
        return dW_cls, db_cls, dcls_output


class BERTSentimentClassifier:
    def __init__(self, max_len=10, embed_dim=4, num_heads=4):
        self.max_len = max_len
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.vocab_processor = VocabProcessor()
        self.embedding_layer = None  # Will be initialized after vocabulary is built
        self.positional_encoder = PositionalEncoder(max_len, embed_dim)
        self.transformer = TransformerBlock(embed_dim, num_heads)
        self.classifier = ClassificationHead(embed_dim)
    
    def initialize(self, sentences):
        # Build vocabulary
        self.vocab_processor.build_vocab(sentences)
        
        # Initialize embedding layer with the vocabulary size
        vocab_size = len(self.vocab_processor.vocab) + 1  # +1 for handling unknown tokens
        self.embedding_layer = EmbeddingLayer(vocab_size, self.embed_dim)
    
    def _prepare_batch(self, sentences):
        # Convert sentences to token IDs
        input_ids = np.array([self.vocab_processor.tokenize(sentence) for sentence, _ in sentences])
        labels = np.array([label for _, label in sentences])
        return input_ids, labels
    
    def forward(self, input_ids):
        # Embedding layer
        embeddings = self.embedding_layer.forward(input_ids)
        
        # Add positional encoding
        pos_embeddings = self.positional_encoder.forward(embeddings)
        
        # Transformer block
        transformer_output = self.transformer.forward(pos_embeddings)
        
        # Classification logits
        logits = self.classifier.forward(transformer_output)
        
        return logits
    
    def train_step(self, batch, learning_rate=0.01):
        # This is a simplified training step - in a real implementation, 
        # you would compute gradients and update weights
        input_ids, labels = self._prepare_batch(batch)
        logits = self.forward(input_ids)
        loss = self.classifier.cross_entropy_loss(logits, labels)
        
        # Here you would add backpropagation and parameter updates
        # For demonstration purposes, we're returning the loss
        return np.mean(loss)
    
    def predict(self, sentences):
        # For single sentence or list of sentence strings
        if isinstance(sentences, str):
            sentences = [(sentences, 0)]  # Dummy label
        elif isinstance(sentences[0], str):
            sentences = [(sent, 0) for sent in sentences]  # Dummy labels
            
        input_ids, _ = self._prepare_batch(sentences)
        logits = self.forward(input_ids)
        return self.classifier.predict(logits)
    
    def evaluate(self, test_data):
        input_ids, true_labels = self._prepare_batch(test_data)
        logits = self.forward(input_ids)
        pred_labels = self.classifier.predict(logits)
        
        accuracy = np.mean(pred_labels == true_labels)
        return accuracy


# Example usage
if __name__ == "__main__":
    # Sample data
    sentences = [
        ("aku suka banget sama film ini", 1),
        ("filmnya bener-bener membosankan", 0),
        ("aktingnya luar biasa", 1),
        ("ngantuk banget pas nonton", 0),
        ("ceritanya bikin terharu", 1),
        ("gak masuk akal dan jelek", 0),
    ]
    
    # Create and initialize the model
    model = BERTSentimentClassifier(max_len=10, embed_dim=4, num_heads=4)
    model.initialize(sentences)
    
    # Training loop with proper backpropagation and parameter updates
    print("Starting training...")
    history = model.train_step(sentences, epochs=50, learning_rate=0.05, batch_size=3)
    
    # Evaluate
    accuracy = model.evaluate(sentences)
    print(f"Training accuracy: {accuracy:.4f}")
    
    # Predict new sentences
    new_sentences = [
        ("film ini sangat bagus", 0),  # Should predict 1 (positive)
        ("saya kecewa dengan ceritanya", 0)  # Should predict 0 (negative)
    ]
    
    predictions = model.predict(new_sentences)
    for i, (sentence, _) in enumerate(new_sentences):
        sentiment = "Positive" if predictions[i] == 1 else "Negative"
        print(f"Sentence: '{sentence}', Predicted: {sentiment}")
    
    # Example of how to trace through a single forward pass to understand the model
    def trace_example(model, sentence="aku suka banget sama film ini"):
        print(f"\nTracing forward pass for: '{sentence}'")
        
        # Tokenize
        tokens = sentence.lower().split()
        print(f"Tokens: {tokens}")
        token_ids = model.vocab_processor.tokenize(sentence)
        print(f"Token IDs: {token_ids}")
        
        # Word representations
        token_words = [model.vocab_processor.id2w.get(id, "[UNK]") for id in token_ids]
        print(f"Token representations: {token_words}")
        
        # Make prediction
        pred = model.predict(sentence)
        sentiment = "Positive" if pred[0] == 1 else "Negative"
        print(f"Predicted sentiment: {sentiment}")
    
    # Trace an example
    trace_example(model)

Starting training...


TypeError: BERTSentimentClassifier.train_step() got an unexpected keyword argument 'epochs'

In [17]:
import numpy as np

class Vocab:
    def __init__(self):
        self.vocab = {
            '[PAD]' : 0,
            '[CLS]' : 1,
        }
        self.w2i = {}
        self.i2w = {}
    
    def build_vocab(self, sentences):
        start_idx = len(self.vocab)
        
        for sentence, _ in sentences:
            for word in sentence.lower().split():
                if word not in self.vocab:
                    self.vocab[word] = start_idx
                    start_idx += 1
        
        # mapping
        self.w2i = {w: i  for i, w in enumerate(self.vocab)}
        self.i2w = {i: w for w, i in self.w2i.items()}

        return self.vocab

    def tokenize(self, sentence, max_len=10):
        tokens = sentence.lower().split()
        ids = [self.vocab["[CLS]"]] + [self.vocab.get(tok, 0) for tok in tokens]  # Fixed: Should use CLS not PAD for first token

        # padding
        if len(ids) < max_len:
            ids += [self.vocab["[PAD]"]] * (max_len - len(ids))
        else:
            ids = ids[:max_len]
        
        return ids

class EmbeddingLayer:
    def __init__(self, vocab_size, d_model):
        np.random.seed(42)
        self.d_model = d_model
        self.embedding_matrix = np.random.rand(vocab_size, d_model)
        self.cache = {}
    
    def forward(self, input_ids, cache=True):
        # Fixed: Handle input_ids properly as a batch of sequences
        batch_size = input_ids.shape[0]
        seq_len = input_ids.shape[1]
        embeddings = np.zeros((batch_size, seq_len, self.d_model))
        
        for i in range(batch_size):
            for j in range(seq_len):
                word_id = input_ids[i, j]
                embeddings[i, j] = self.embedding_matrix[word_id]
    
        if cache:
            self.cache['input_ids'] = input_ids
        
        return embeddings
    
    def backward(self, dembeddings):
        input_ids = self.cache['input_ids']
        batch_size, seq_len = input_ids.shape

        # initialize gradients
        dembedding_matrix = np.zeros_like(self.embedding_matrix)

        # for each position in batch, accumulate gradients for corresponding words vectors
        for i in range(batch_size):
            for j in range(seq_len):
                word_id = input_ids[i, j]
                dembedding_matrix[word_id] += dembeddings[i, j]
        
        return dembedding_matrix

class PositionalEncoder:
    def __init__(self, max_len, d_model):
        self.max_len = max_len
        self.d_model = d_model
        self.pe = self._create_postional_encoding()
    
    def _create_postional_encoding(self):
        pos = np.arange(self.max_len)[:, np.newaxis]
        i = np.arange(self.d_model)[np.newaxis, :]

        # calculate angle rates
        angle_rates = 1 / np.power(10000, (2*(i//2) / self.d_model))

        # calculate angle radians
        angle_rads = pos * angle_rates

        # apply sin to even indices and cos to odd idices
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

        return angle_rads
    
    def forward(self, embedding):
        batch_size, seq_len, _ = embedding.shape
        # Fixed: broadcast PE correctly to all items in batch
        return embedding + self.pe[:seq_len, :][np.newaxis, :, :]

class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
    
        # weight initialization with proper scaling
        self.wq = np.random.randn(d_model, num_heads * self.head_dim) * 0.01  # Fixed: shape should be (d_model, d_model)
        self.wk = np.random.randn(d_model, num_heads * self.head_dim) * 0.01
        self.wv = np.random.randn(d_model, num_heads * self.head_dim) * 0.01
        self.wo = np.random.randn(d_model, d_model) * 0.01

        self.cache = {}

    def forward(self, x, cache=True):
        batch_size, seq_len, _ = x.shape
        
        # Fixed: project to q, k, v with correct dimensions
        q = x @ self.wq  # (batch_size, seq_len, d_model)
        k = x @ self.wk
        v = x @ self.wv
        
        # Reshape for multi-head attention
        q = q.reshape(batch_size, seq_len, self.num_heads, self.head_dim).transpose(0, 2, 1, 3)  # (batch_size, num_heads, seq_len, head_dim)
        k = k.reshape(batch_size, seq_len, self.num_heads, self.head_dim).transpose(0, 2, 1, 3)
        v = v.reshape(batch_size, seq_len, self.num_heads, self.head_dim).transpose(0, 2, 1, 3)

        # scale dot product attention
        scores = np.matmul(q, k.transpose(0, 1, 3, 2)) / np.sqrt(self.head_dim)  # (batch_size, num_heads, seq_len, seq_len)

        # softmax
        scores -= np.max(scores, axis=-1, keepdims=True)
        attn_weight = np.exp(scores)
        attn_weight /= np.sum(attn_weight, axis=-1, keepdims=True) + 1e-9  # Added epsilon for numerical stability

        # attention output
        attn_output = np.matmul(attn_weight, v)  # (batch_size, num_heads, seq_len, head_dim)
        
        # concatenate heads and transpose back to original shape
        attn_concat = attn_output.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, -1)  # (batch_size, seq_len, d_model)

        # output projection
        attn_projected = attn_concat @ self.wo  # (batch_size, seq_len, d_model)

        if cache:
            self.cache['x'] = x
            self.cache['q'] = q
            self.cache['k'] = k
            self.cache['v'] = v
            self.cache['scores'] = scores
            self.cache['attn_weight'] = attn_weight
            self.cache['attn_output'] = attn_output
            self.cache['attn_concat'] = attn_concat

        return attn_projected

    def backward(self, dattn_projected):
        # get from cache
        x = self.cache['x']
        q = self.cache['q']
        k = self.cache['k']
        v = self.cache['v']
        attn_weight = self.cache['attn_weight']
        attn_output = self.cache['attn_output']
        attn_concat = self.cache['attn_concat']

        batch_size, seq_len, _ = x.shape
        
        # Fixed: gradient wrt output projection wo
        dwo = attn_concat.transpose(0, 2, 1) @ dattn_projected  # Correct: (d_model, d_model)
        dwo = np.sum(dwo, axis=0)  # Sum over batch dimension
        
        # gradient wrt concatenated attention heads concat
        dattn_concat = dattn_projected @ self.wo.T  # (batch_size, seq_len, d_model)
        
        # reshape back to multihead-format
        dattn_output = dattn_concat.reshape(batch_size, seq_len, self.num_heads, self.head_dim).transpose(0, 2, 1, 3)
        
        # gradient wrt v
        dattn_weight = np.matmul(dattn_output, v.transpose(0, 1, 3, 2))
        dv = np.matmul(attn_weight.transpose(0, 1, 3, 2), dattn_output)
        
        # gradient through softmax
        dscores = attn_weight * (dattn_weight - np.sum(attn_weight * dattn_weight, axis=-1, keepdims=True))
        
        # gradient wrt q and k
        dk_transpose = np.matmul(q.transpose(0, 1, 3, 2), dscores) / np.sqrt(self.head_dim)
        dk = dk_transpose.transpose(0, 1, 3, 2)
        dq = np.matmul(dscores, k) / np.sqrt(self.head_dim)
        
        # Reshape gradients to original dimensions
        dq = dq.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, -1)
        dk = dk.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, -1)
        dv = dv.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, -1)
        
        # gradient wrt input x
        dx = (dq @ self.wq.T) + (dk @ self.wk.T) + (dv @ self.wv.T)
        
        # gradient wrt weights
        dwq = x.transpose(0, 2, 1) @ dq
        dwq = np.sum(dwq, axis=0)
        dwk = x.transpose(0, 2, 1) @ dk
        dwk = np.sum(dwk, axis=0)
        dwv = x.transpose(0, 2, 1) @ dv
        dwv = np.sum(dwv, axis=0)

        return dwq, dwk, dwv, dwo, dx

class LayerNorm:
    def __init__(self, d_model, eps=1e-6):
        self.eps = eps
        self.gamma = np.ones(d_model)  # Added scale parameter
        self.beta = np.zeros(d_model)  # Added shift parameter
        self.cache = {}
    
    def forward(self, x, cache=True):
        mean = np.mean(x, axis=-1, keepdims=True)
        var = np.var(x, axis=-1, keepdims=True)
        std = np.sqrt(var + self.eps)
        normalized = (x - mean) / std
        out = self.gamma * normalized + self.beta  # Apply scale and shift

        if cache:
            self.cache['x'] = x
            self.cache['mean'] = mean
            self.cache['var'] = var
            self.cache['std'] = std
            self.cache['normalized'] = normalized
        
        return out
    
    def backward(self, dout):
        # get from cache
        x = self.cache['x']
        mean = self.cache['mean']
        std = self.cache['std']
        normalized = self.cache['normalized']
        
        batch_size, seq_len, d_model = x.shape
        
        # gradient wrt gamma and beta
        dgamma = np.sum(dout * normalized, axis=(0, 1))
        dbeta = np.sum(dout, axis=(0, 1))
        
        # gradient wrt normalized x
        dx_normalized = dout * self.gamma
        
        # gradient wrt var
        dvar = np.sum(dx_normalized * (x - mean) * -0.5 * std**(-3), axis=-1, keepdims=True)
        
        # gradient wrt mean
        dxmean1 = dx_normalized * -1.0/std
        dxmean2 = dvar * -2.0 * np.mean(x - mean, axis=-1, keepdims=True)
        
        # gradient wrt x
        dx = dx_normalized / std + dvar * 2 * (x - mean) / d_model + (dxmean1 + dxmean2) / d_model
        
        return dx, dgamma, dbeta

class FeedForward:
    def __init__(self, d_model, d_ff=None):
        if d_ff is None:
            d_ff = 4 * d_model
    
        # weights with proper initialization
        self.w1 = np.random.randn(d_model, d_ff) * 0.01
        self.b1 = np.zeros(d_ff)
        self.w2 = np.random.randn(d_ff, d_model) * 0.01
        self.b2 = np.zeros(d_model)
        self.cache = {}
    
    def forward(self, x, cache=True):
        batch_size, seq_len, _ = x.shape
        
        # linear 1 - Expansion
        f1 = x @ self.w1 + self.b1
        # relu
        r = np.maximum(0, f1)
        # Linear 2 - Compression
        f2 = r @ self.w2 + self.b2

        if cache:
            self.cache['x'] = x
            self.cache['f1'] = f1
            self.cache['r'] = r
        
        return f2
    
    def backward(self, doutput):
        # get from cache
        x = self.cache['x']
        f1 = self.cache['f1']
        r = self.cache['r']
        
        batch_size, seq_len, _ = x.shape

        # gradient wrt w2 and b2
        dr = doutput @ self.w2.T
        dw2 = r.reshape(-1, r.shape[-1]).T @ doutput.reshape(-1, doutput.shape[-1])
        db2 = np.sum(doutput, axis=(0, 1))

        # gradient through relu
        df1 = dr * (f1 > 0)

        # gradient wrt w1 and b1
        dw1 = x.reshape(-1, x.shape[-1]).T @ df1.reshape(-1, df1.shape[-1])
        db1 = np.sum(df1, axis=(0, 1))

        # gradient wrt x
        dx = df1 @ self.w1.T
        
        return dw1, db1, dw2, db2, dx

class BertModel:
    def __init__(self, d_model, num_heads):
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = LayerNorm(d_model)
        self.ff = FeedForward(d_model, 4*d_model)
        self.norm2 = LayerNorm(d_model)
        self.cache = {}
    
    def forward(self, x, cache=True):
        # multi-head attention
        attn_output = self.attention.forward(x)

        # add & norm (residual connection)
        add1 = x + attn_output
        norm1 = self.norm1.forward(add1)

        # feed-forward
        ff_output = self.ff.forward(norm1)  # Fixed: should use norm1 not x

        # add & norm (residual connection)
        add2 = norm1 + ff_output
        norm2 = self.norm2.forward(add2)

        if cache:
            self.cache['x'] = x
            self.cache['attn_output'] = attn_output
            self.cache['add1'] = add1
            self.cache['norm1'] = norm1
            self.cache['ff_output'] = ff_output
            self.cache['add2'] = add2

        return norm2
    
    def backward(self, dout):
        # get from cache
        x = self.cache['x']
        attn_output = self.cache['attn_output']
        add1 = self.cache['add1']
        norm1 = self.cache['norm1']
        ff_output = self.cache['ff_output']
        add2 = self.cache['add2']

        # backward through norm2
        dadd2, dgamma2, dbeta2 = self.norm2.backward(dout)

        # gradient split at residual connection
        dnorm1 = dadd2
        dff_output = dadd2

        # backward through feed-forward
        dw1, db1, dw2, db2, dnorm1_ff = self.ff.backward(dff_output)

        # add gradients from residual connection
        dnorm1 += dnorm1_ff

        # backward through norm1
        dadd1, dgamma1, dbeta1 = self.norm1.backward(dnorm1)

        # gradient split at residual connection
        dx = dadd1
        dattn_output = dadd1

        # backward through attention
        dwq, dwk, dwv, dwo, dx_att = self.attention.backward(dattn_output)

        # add gradients from residual connection
        dx += dx_att

        return {
            'dwq': dwq,
            'dwk': dwk,
            'dwv': dwv,
            'dwo': dwo,
            'dw1': dw1,
            'db1': db1,
            'dw2': dw2,
            'db2': db2,
            'dgamma1': dgamma1,
            'dbeta1': dbeta1,
            'dgamma2': dgamma2,
            'dbeta2': dbeta2,
            'dx': dx,
        }

class ClassifierHead:
    def __init__(self, d_model, num_classes=2):
        self.w_cls = np.random.randn(d_model, num_classes) * 0.01
        self.b_cls = np.zeros(num_classes)
        self.cache = {}
    
    def forward(self, x, cache=True):
        # take cls per batch
        cls_output = x[:, 0, :]

        # logits of that cls
        logits = cls_output @ self.w_cls + self.b_cls

        if cache:
            self.cache['cls_output'] = cls_output
            self.cache['logits'] = logits
            self.cache['x'] = x

        return logits

    def softmax(self, x):
        x -= np.max(x, axis=-1, keepdims=True)
        exp_x = np.exp(x)
        return exp_x / (np.sum(exp_x, axis=-1, keepdims=True) + 1e-9)  # Added epsilon for numerical stability
    
    def predict(self, x):
        logits = self.forward(x)
        softmax = self.softmax(logits)
        return np.argmax(softmax, axis=-1)
    
    def binary_cross_entropy(self, logits, labels):
        batch_size = logits.shape[0]
        probs = self.softmax(logits)

        # store for backward propagation
        self.cache['probs'] = probs
        self.cache['labels'] = labels

        # one-hot encode labels
        y_one_hot = np.zeros_like(probs)
        for i in range(batch_size):
            y_one_hot[i, labels[i]] = 1
            
        # cross entropy loss
        loss = -np.sum(y_one_hot * np.log(probs + 1e-9)) / batch_size  # Use sum for entire batch
        return loss
    
    def backward(self):
        # get from cache
        probs = self.cache['probs']
        labels = self.cache['labels']
        cls_output = self.cache['cls_output']
        x = self.cache['x']
        
        batch_size = probs.shape[0]

        # gradient of cross entropy wrt softmax output (probs)
        dprobs = probs.copy()
        for i in range(batch_size):
            dprobs[i, labels[i]] -= 1
        dprobs /= batch_size

        # gradient wrt weights and biases
        dw_cls = cls_output.T @ dprobs
        db_cls = np.sum(dprobs, axis=0)

        # gradient wrt cls output
        dcls_output = dprobs @ self.w_cls.T
        
        # gradient wrt full bert output (only CLS token position gets gradient)
        dx = np.zeros_like(x)
        dx[:, 0, :] = dcls_output

        return dw_cls, db_cls, dx

class BERTSentimentClassifier:
    def __init__(self, max_len=10, d_model=16, num_heads=4):
        # parameters
        self.max_len = max_len
        self.d_model = d_model
        self.num_heads = num_heads
        # vocab
        self.vocab = Vocab()
        # embedding
        self.embedding = None
        # positional encoding
        self.pos_encoding = PositionalEncoder(max_len, d_model)
        # bert model
        self.model = BertModel(d_model, num_heads)
        # classifier head
        self.classifier = ClassifierHead(d_model)
    
    def initialize(self, sentences):
        # build vocab
        self.vocab.build_vocab(sentences)
        # initialize embedding with vocab size
        vocab_size = len(self.vocab.vocab) + 1
        self.embedding = EmbeddingLayer(vocab_size, self.d_model)
    
    def prepare_input(self, sentences):
        # convert sentences to tokens to indices
        input_ids = [self.vocab.tokenize(sentence[0], self.max_len) for sentence in sentences]
        input_ids = np.array(input_ids)
        labels = [label for _, label in sentences]
        labels = np.array(labels)
        return input_ids, labels
    
    def forward(self, input_ids, cache=True):
        # Embedding Layer
        embeddings = self.embedding.forward(input_ids, cache)
        # Positional Encoding
        pe = self.pos_encoding.forward(embeddings)
        # Bert Model
        bert_output = self.model.forward(pe, cache)
        # Classifier Head
        logits = self.classifier.forward(bert_output, cache)
        return logits
    
    def backward(self, logits, labels):
        # loss 
        loss = self.classifier.binary_cross_entropy(logits, labels)

        # backprop through classifier
        dw_cls, db_cls, dbert_output = self.classifier.backward()

        # backprop through bert model
        dbert_grads = self.model.backward(dbert_output)

        # backprop through embedding layer
        dembedding_matrix = self.embedding.backward(dbert_grads['dx'])

        # collect all gradients for return
        grads = {
            'dw_cls': dw_cls,
            'db_cls': db_cls,
            'dembedding_matrix': dembedding_matrix,
            'dwq': dbert_grads['dwq'],
            'dwk': dbert_grads['dwk'],
            'dwv': dbert_grads['dwv'],
            'dwo': dbert_grads['dwo'],
            'dw1': dbert_grads['dw1'],
            'db1': dbert_grads['db1'],
            'dw2': dbert_grads['dw2'],
            'db2': dbert_grads['db2'],
            'dgamma1': dbert_grads['dgamma1'],
            'dbeta1': dbert_grads['dbeta1'],
            'dgamma2': dbert_grads['dgamma2'],
            'dbeta2': dbert_grads['dbeta2']
        }

        return loss, grads

    def update_parameters(self, grads, lr=0.01):
        # update classifier weight parameter
        self.classifier.w_cls -= lr * grads['dw_cls']
        self.classifier.b_cls -= lr * grads['db_cls']

        # update embedding weight parameter
        self.embedding.embedding_matrix -= lr * grads['dembedding_matrix']

        # update attention weight parameter
        self.model.attention.wq -= lr * grads['dwq']
        self.model.attention.wk -= lr * grads['dwk']
        self.model.attention.wv -= lr * grads['dwv']
        self.model.attention.wo -= lr * grads['dwo']

        # update layer norm parameters
        self.model.norm1.gamma -= lr * grads['dgamma1']
        self.model.norm1.beta -= lr * grads['dbeta1']
        self.model.norm2.gamma -= lr * grads['dgamma2']
        self.model.norm2.beta -= lr * grads['dbeta2']

        # update ffn weight parameter
        self.model.ff.w1 -= lr * grads['dw1']
        self.model.ff.b1 -= lr * grads['db1']
        self.model.ff.w2 -= lr * grads['dw2']
        self.model.ff.b2 -= lr * grads['db2']

    def train_step(self, batch, lr=0.01):
        # prepare batch data
        input_ids, labels = self.prepare_input(batch)

        # forward pass
        logits = self.forward(input_ids)

        # backward pass
        loss, grads = self.backward(logits, labels)

        # update parameters
        self.update_parameters(grads, lr)

        return loss
        
    def evaluate(self, sentences):
        input_ids, labels = self.prepare_input(sentences)
        logits = self.forward(input_ids, cache=False)
        predictions = np.argmax(self.classifier.softmax(logits), axis=1)
        accuracy = np.mean(predictions == labels)
        return accuracy

# Example
if __name__ == "__main__":
    sentences = [
        ('aku suka film ini', 1),
        ('aku tidak suka film ini', 0),
        ('film ini bagus sekali', 1),
        ('film ini sangat buruk', 0),
    ]

    # Create and initialize model
    model = BERTSentimentClassifier(max_len=10, d_model=16, num_heads=4)
    model.initialize(sentences)

    # Training loop
    num_epochs = 1000
    learning_rate = 0.01
    
    for epoch in range(num_epochs):
        loss = model.train_step(sentences, lr=learning_rate)
        
        if epoch % 100 == 0:
            accuracy = model.evaluate(sentences)
            print(f"Epoch {epoch}, Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")

Epoch 0, Loss: 0.6935, Accuracy: 0.5000
Epoch 100, Loss: 0.6931, Accuracy: 0.5000
Epoch 200, Loss: 0.6931, Accuracy: 1.0000
Epoch 300, Loss: 0.6931, Accuracy: 1.0000
Epoch 400, Loss: 0.6931, Accuracy: 1.0000
Epoch 500, Loss: 0.6931, Accuracy: 1.0000
Epoch 600, Loss: 0.6931, Accuracy: 1.0000
Epoch 700, Loss: 0.6931, Accuracy: 1.0000
Epoch 800, Loss: 0.6931, Accuracy: 1.0000
Epoch 900, Loss: 0.6931, Accuracy: 1.0000


In [15]:
import numpy as np

x = [[1,2],[2,2]]

# feed forward

w1 = np.random.rand(2,4)
b1 = np.zeros(4)
w2 = np.random.rand(4,2)
b2 = np.zeros(2)

f1 = x @ w1 + b1

r = np.maximum(0, f1)

f2 = r @ w2 + b2

In [16]:
f2

array([[1.97258695, 3.64779082],
       [3.06845175, 5.19621885]])