## Lib

In [None]:
import os
import sys
import math
import yaml
import torch
import argparse
import torch.nn as nn
from torchview import draw_graph

## Utils

In [None]:
def dump(value=None, filename=None):
    if (value is not None) and (filename is not None):
        yaml.safe_dump(value=value, filename=filename)

    else:
        raise ValueError("value and filename must be provided".capitalize())


def load(filename=None):
    if filename is not None:
        yaml.safe_load(filename=filename)

    else:
        raise ValueError("filename must be provided".capitalize())


def config():
    with open("../../config.yml", "r") as file:
        return yaml.safe_load(file)

## Scaled Dot Production Attention

In [None]:
def scaled_dot_product_attention(
    query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, mask=None
):
    if (
        isinstance(query, torch.Tensor)
        and isinstance(key, torch.Tensor)
        and isinstance(value, torch.Tensor)
    ):
        assert (
            query.size() == key.size() == value.size()
        ), "query, key, and value must have the same size"

        result = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(value.size(-1))

        if mask is not None:
            lookup = torch.triu(
                input=torch.ones_like(mask.unsqueeze(1).unsqueeze(2)), diagonal=1
            )
            lookup = torch.where(lookup == 1.0, 1e-19, lookup)
            result = torch.add(result, lookup)

        attention = torch.softmax(result, dim=-1)
        attention = torch.matmul(attention, value)

        return attention

    else:
        raise ValueError("query, key, and value must be torch.Tensor".capitalize())


if __name__ == "__main__":

    scaled = scaled_dot_product_attention(
        query=torch.randn(40, 8, 200, 512 // 8),
        key=torch.randn(40, 8, 200, 512 // 8),
        value=torch.randn(40, 8, 200, 512 // 8),
        mask=torch.randn(40, 200,),
    )

## Multi Head Attention Layer

In [None]:
class MultiHeadAttentionLayer(nn.Module):
    def __init__(
        self,
        dimension: int = 512,
        nheads: int = 8,
        dropout: float = 0.1,
        bias: bool = True,
    ):
        super(MultiHeadAttentionLayer, self).__init__()

        self.dimension = dimension
        self.nheads = nheads
        self.dropout = dropout
        self.bias = bias

        self.dimension % self.nheads == 0, "dimension must be divisible by nheads".capitalize()

        self.QKV = nn.Linear(
            in_features=self.dimension, out_features=3 * self.dimension, bias=self.bias
        )

        self.layer = nn.Linear(
            in_features=self.dimension, out_features=self.dimension, bias=self.bias
        )

    def forward(self, x: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor):
            QKV = self.QKV(x)

            self.query, self.key, self.value = torch.chunk(input=QKV, chunks=3, dim=-1)

            assert (
                self.query.size() == self.key.size() == self.value.size()
            ), "QKV must have the same size".capitalize()

            self.query = self.query.view(
                self.query.size(0),
                self.query.size(1),
                self.nheads,
                self.dimension // self.nheads,
            )
            self.key = self.key.view(
                self.key.size(0),
                self.key.size(1),
                self.nheads,
                self.dimension // self.nheads,
            )
            self.value = self.value.view(
                self.value.size(0),
                self.value.size(1),
                self.nheads,
                self.dimension // self.nheads,
            )

            self.query = self.query.permute(0, 2, 1, 3)
            self.key = self.key.permute(0, 2, 1, 3)
            self.value = self.value.permute(0, 2, 1, 3)

            self.attention = scaled_dot_product_attention(
                query=self.query, key=self.key, value=self.value, mask=mask
            )

            assert (
                self.attention.size()
                == self.query.size()
                == self.key.size()
                == self.value.size()
            ), "Attention must have the same size as QKV".capitalize()

            self.attention = self.attention.view(
                self.attention.size(0),
                self.attention.size(2),
                self.attention.size(1) * self.attention.size(3),
            )

            return self.layer(self.attention)


if __name__ == "__main__":
    attention = MultiHeadAttentionLayer(
        dimension=512,
        nheads=8,
        dropout=0.1,
        bias=True,
    )
    
    print(attention(torch.randn(40, 200, 512)).size())

## Feed Forward Neural Network

In [None]:
class FeedForwardNeuralNetwork(nn.Module):
    def __init__(
        self,
        in_features: int = 512,
        out_features: int = 2048,
        dropout: float = 0.1,
        activation: str = "relu",
        bias: bool = True,
    ):
        super(FeedForwardNeuralNetwork, self).__init__()

        self.in_features = in_features
        self.out_features = out_features
        self.dropout = dropout
        self.activation = activation
        self.bias = bias

        if self.activation == "relu":
            self.activation_fn = nn.ReLU(inplace=True)

        elif self.activation == "gelu":
            self.activation_fn = nn.GELU()

        elif self.activation == "leaky_relu":
            self.activation_fn = nn.LeakyReLU(inplace=True, negative_slope=0.2)

        self.layers = list()

        for index in range(2):
            self.layers.append(
                nn.Linear(
                    in_features=self.in_features,
                    out_features=self.out_features,
                    bias=self.bias,
                )
            )
            if index == 0:
                self.layers.append(self.activation_fn)
                self.layers.append(nn.Dropout(p=self.dropout))

            self.in_features = self.out_features
            self.out_features = in_features

        self.model = nn.Sequential(*self.layers)

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            return self.model(x)

        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    network = FeedForwardNeuralNetwork(
        in_features=512,
        out_features=2048,
        activation="gelu",
        dropout=0.1,
        bias=True,
    )

    print(network(torch.randn(40, 200, 512)).size())

## Layer Normalization

In [None]:
class LayerNormalization(nn.Module):
    def __init__(
        self, normalized_shape: int = 512, eps: float = 1e-05, bias: bool = True
    ):
        super(LayerNormalization, self).__init__()

        self.normalized_shape = normalized_shape
        self.epsilon = eps
        self.bias = bias

        self.gamma = nn.Parameter(data=torch.ones((normalized_shape,)))
        self.beta = nn.Parameter(data=torch.zeros((normalized_shape,)))

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            self.mean = torch.mean(x, dim=-1)
            self.variance = torch.var(x, dim=-1)

            self.mean = self.mean.unsqueeze(-1)
            self.variance = self.variance.unsqueeze(-1)

            normalized = (
                self.gamma * (x - self.mean) / torch.sqrt(self.variance + self.epsilon)
                + self.beta
            )

            return normalized


if __name__ == "__main__":
    layer_norm = LayerNormalization(
        normalized_shape=512,
        eps=1e-5,
        bias=True,
    )
    
    print(layer_norm(torch.randn(40, 200, 512)).size())

## Positional Encoding

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, sequence_length: int = 200, dimension: int = 512):
        super(PositionalEncoding, self).__init__()

        self.sequence_length = sequence_length
        self.dimension = dimension

        self.positional_encoding = torch.zeros((self.sequence_length, self.dimension))

        for position in range(self.sequence_length):
            for index in range(self.dimension):
                if index % 2 == 0:
                    self.positional_encoding[position, index] = math.sin(
                        position / (10000 ** ((2 * index) / dimension))
                    )
                else:
                    self.positional_encoding[position, index] = math.cos(
                        position / (10000 ** ((2 * index) / dimension))
                    )

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            return self.positional_encoding.unsqueeze(0)[:, : x.size(1), :]


if __name__ == "__main__":
    positional_encoding = PositionalEncoding(sequence_length=200, dimension=512)
    print(positional_encoding(torch.randn(40, 200, 512)).size())
