<a href="https://colab.research.google.com/github/adnaen/machine-learning-notes/blob/main/llm/transformers/decoder/decoder_block.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Decoder Implementation From Scratch**

```text
workflow
========
1. Input text
2. Right shift (cut the last token of each sentence)
3. Embeddings
4. Positional Encoding
5. Masked-Multi-Head Attention (each token only can see previous and self token)
6. Add/Norm
7. FFN
8. Add/Norm
```

In [None]:
import torch

## **Dummy Data**

## **Positional Encoding**

In [None]:
def positional_encoding(x: torch.Tensor) -> torch.Tensor:
    # with sinoudial equation
    # sinosoidal equation
    # pos = idx of the token
    # i   = idx of each token feature
    #
    # if embedded feature idx is even:
    #  sin(pos / 10000 ^ (2i / d_model))
    #
    # if embedded feature idx is odd:
    #  cos(pos / 1000 ^ (2i / d_model))

    seq_len, d_model = x.shape
    c_val = torch.tensor(10000)

    result = []
    for pos, t_feature in enumerate(x):
        each = []
        for i, _ in enumerate(t_feature):
            # check the token feature idx if even or odd
            if i % 2 == 0:
                each.append(torch.sin(
                    (pos / torch.pow(c_val, torch.tensor((2*i / d_model))))
                    ))
            else:
                each.append(torch.cos(
                    (pos / torch.pow(c_val, torch.tensor((2*i / d_model))))
                    ))
        result.append(each)

    return torch.tensor(result)

## **Masked Multi-Head Attention Block**

In [None]:
class MaskedMultiHeadAttention(torch.nn.Module):
    def __init__(self, no_of_heads: int) -> None:
        super().__init__()
        """
        Args:
            no_of_heads (int) : number of heads

        Returns:
            None
        """
        self.d_model = d_model
        self.no_of_heads = no_of_heads
        self.dk = self.d_model // self.no_of_heads
        print(f"Initialize MaskedMultiHeadAttention with \n{self.d_model=}\n{self.no_of_heads=}")

        self.q_w = torch.nn.Linear(self.d_model, self.d_model)
        self.k_w = torch.nn.Linear(self.d_model, self.d_model)
        self.v_w = torch.nn.Linear(self.d_model, self.d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x (Tensor)  : input data

        Returns:
            Tensor : output logits
        """
        self.seq_len, self.d_model = x.shape
        # calcualte Q, K, V
        Q = self.q_w(x)
        K = self.k_w(x)
        V = self.v_w(x)

        # split Q, K, V for multi-head attenions.
        # Each head get no.of feature / specific part of each token
        # So, each heads can focus singe token in many view
        m_q = Q.view(self.seq_len, self.no_of_heads, self.dk).transpose(0, 1)
        m_k = K.view(self.seq_len, self.no_of_heads, self.dk).transpose(0, 1)
        m_v = V.view(self.seq_len, self.no_of_heads, self.dk).transpose(0, 1)

        # calculate attention scores
        # attention(q, k, v) = softmax(Q*K^T / sqrt(dk)) * V
        mask = torch.tril(torch.ones(self.seq_len, self.seq_len)).bool()
        scores = []
        for q, k, v in zip(m_q, m_k, m_v):
            score = ( q @ k.T ) / torch.sqrt(torch.tensor(self.dk))

            # masking
            # here what is diff in masked attention
            # we
            masked_score = score.masked_fill(~mask, -1e9)

            scores.append(torch.softmax(masked_score, dim=1) @ v)
        tensor_result = torch.stack(scores, dim=0)
        return tensor_result.transpose(1, 0).reshape(self.seq_len, self.d_model)


## **Add/Norm**

In [None]:
# add = just add input + sublayer output
# norm = apply layernorm

def residual_norm(
        x: torch.Tensor,
        output: torch.Tensor,
        layer_norm: torch.nn.modules.normalization.LayerNorm) -> torch.Tensor:
        return layer_norm(x + output)

## **Full Decoder**

In [None]:
class Decoder(torch.nn.Module):
    def __init__(self, no_of_heads: int, d_model: int) -> None:
        super().__init__()

        self.mmha = MaskedMultiHeadAttention(no_of_heads=no_of_heads)
        self.ffn = torch.nn.Sequential(
            torch.nn.Linear(d_model, 4*d_model),
            torch.nn.ReLU(),
            torch.nn.Linear(4*d_model, d_model),
        )
        self.norm1 = torch.nn.LayerNorm(d_model)
        self.norm2 = torch.nn.LayerNorm(d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        X = positional_encoding(x)
        attention_score = self.mmha(X)
        r1 = residual_norm(X, attention_score, self.norm1)
        ffn_result = self.ffn(r1)
        r2 = residual_norm(r1, ffn_result, self.norm2)
        return r2

In [None]:
# 10 tokens
# each token embedding into (20,)
input_embd = torch.randn(10, 20)
input_embd.shape

torch.Size([10, 20])

In [None]:
decoder = Decoder(5, d_model=20)

Initialize MaskedMultiHeadAttention with 
self.d_model=20
self.no_of_heads=5


In [None]:
res = decoder(input_embd)
res.shape

torch.Size([10, 20])

In [None]:
res

tensor([[-0.9239,  1.2566, -0.6877,  0.7834, -0.3338,  2.0210, -0.8523,  1.0145,
         -0.3438,  1.1300, -0.2573,  1.0362, -0.9080,  0.5436, -1.8957,  1.0866,
         -0.8638, -0.1872, -0.8759, -0.7425],
        [ 0.4649,  1.1696, -0.4438,  0.6514, -0.1518,  1.9474, -0.7795,  0.9397,
         -0.3069,  1.0864, -0.5282,  1.0733, -1.0994,  0.2523, -1.9820,  1.0067,
         -1.1976, -0.0409, -1.0935, -0.9680],
        [ 0.6025,  0.8062, -0.1172,  0.5892, -0.0522,  1.8884, -0.7160,  0.9119,
         -0.2837,  1.1774, -0.6398,  1.1506, -1.1129,  0.2182, -1.9575,  1.0264,
         -1.3581,  0.0911, -1.1973, -1.0272],
        [-0.7122,  0.2606,  0.2929,  0.6184, -0.0198,  1.8739, -0.6619,  0.9727,
         -0.2520,  1.4469, -0.6377,  1.2830, -0.9987,  0.4409, -1.8381,  1.1659,
         -1.2968,  0.1444, -1.1995, -0.8827],
        [-1.9965, -0.3706,  0.5806,  0.6356, -0.0470,  1.5997, -0.5923,  0.8805,
         -0.1600,  1.5686, -0.5690,  1.2910, -0.7630,  0.6952, -1.5380,  1.1524,
      