In [None]:
import os
import sys
import torch
import yaml
import joblib
import argparse
from tqdm import tqdm
import torch.nn as nn
from torchsummary import summary
from torchview import draw_graph
from transformers import AutoTokenizer
from torch.utils.data import DataLoader, TensorDataset

### Utils

In [None]:
class CustomException(Exception):
    def __init__(self, message: str):
        super(CustomException, self).__init__()
        self.message = message


def dump(value: str, filename: str):
    if (value is not None) and (filename is not None):
        joblib.dump(value=value, filename=filename)

    else:
        raise CustomException("Cannot be dump into pickle file".capitalize())


def load(filename: str):
    if filename is not None:
        joblib.load(filename=filename)

    else:
        raise CustomException("Cannot be load the pickle file".capitalize())


def device_init(self, device: str = "mps"):
    if device == "cuda":
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    elif device == "mps":
        return torch.device("mps" if torch.backends.mps.is_available() else "cpu")

    else:
        return torch.device("cpu")


def config():
    with open("./config.yml", "r") as file:
        return yaml.safe_load(file)


english = [
    "The sun is shining brightly today",
    "I enjoy reading books on rainy afternoons",
    "The cat sat on the windowsill watching the birds",
    "She baked a delicious chocolate cake for dessert",
    "We went for a long walk in the park yesterday",
    "He plays the guitar beautifully during the evenings",
]

german = [
    "Die Sonne scheint heute hell",
    "Ich lese gerne Bücher an regnerischen Nachmittagen",
    "Die Katze saß auf der Fensterbank und beobachtete die Vögel",
    "Sie hat einen leckeren Schokoladenkuchen zum Nachtisch gebacken",
    "Wir sind gestern lange im Park spazieren gegangen",
]

### Positional Encoding

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(
        self, sequence_length: int = 200, dimension: int = 512, constant: int = 10000
    ):
        super(PositionalEncoding, self).__init__()

        self.sequence_length = sequence_length
        self.model_dimension = dimension
        self.constant = constant

        self.position_encode = torch.ones((sequence_length, dimension))

        for position in tqdm(range(self.sequence_length)):
            for index in range(self.model_dimension):
                if index % 2 == 0:
                    self.position_encode[position, index] = math.sin(
                        position / self.constant ** (2 * index / self.model_dimension)
                    )
                elif index % 2 != 0:
                    self.position_encode[position, index] = math.cos(
                        position / self.constant ** (2 * index / self.model_dimension)
                    )

        self.register_buffer("position_encoding", self.position_encode.unsqueeze(0))

        print("Positional Encoding initialized".capitalize())

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            return self.position_encode[:, : x.shape[-1]]

        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Positional Encoder for Transformer".title()
    )
    parser.add_argument(
        "--seq_length",
        type=int,
        default=200,
        help="Define the sequence length".capitalize(),
    )
    parser.add_argument(
        "--dimension",
        type=int,
        default=512,
        help="Define the dimension of the model".capitalize(),
    )

    args = parser.parse_args()

    sequence_length = args.seq_length
    model_dimension = args.dimension

    positional_encode = PositionalEncoding(
        sequence_length=sequence_length, dimension=model_dimension
    )

    assert positional_encode(
        torch.randn((sequence_length, model_dimension))
    ).size() == (
        sequence_length,
        model_dimension,
    )

### Embedding Layer

In [None]:
class EmbeddingLayer(nn.Module):
    def __init__(
        self,
        vocabulary_size: int = 1000,
        sequence_length: int = 200,
        dimension: int = 100,
    ):
        super(EmbeddingLayer, self).__init__()

        self.vocabulary_size = vocabulary_size
        self.sequence_length = sequence_length
        self.model_dimension = dimension

        self.embedding = nn.Embedding(
            num_embeddings=self.vocabulary_size, embedding_dim=self.model_dimension
        )

        self.positional_encoding = PositionalEncoding(
            sequence_length=self.sequence_length, dimension=self.model_dimension
        )

    def forward(self, tokenize: torch.Tensor):
        if isinstance(tokenize, torch.Tensor):
            x = self.embedding(tokenize)
            return x + self.positional_encoding(x)

        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Embedding Layer for Transformer".title()
    )
    parser.add_argument(
        "--vocab_size", type=int, default=100, help="Vocabulary Size".capitalize()
    )
    parser.add_argument(
        "--seq_len", type=int, default=200, help="Sequence Length".capitalize()
    )
    parser.add_argument(
        "--dim", type=int, default=512, help="Dimension of the Model".capitalize()
    )

    args = parser.parse_args()

    sequence_length = args.seq_len
    vocabulary_size = args.vocab_size
    model_dimension = args.dim

    embedding = EmbeddingLayer(
        vocabulary_size=vocabulary_size,
        sequence_length=sequence_length,
        dimension=model_dimension,
    )
    input_ids = torch.randint(0, vocabulary_size, (400, sequence_length))

    assert embedding(input_ids).size() == (
        400,
        sequence_length,
        model_dimension,
    ), "Dimension Mismatch in the embedding layer".title()

### Layer Normalization

In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, normalized_shape: int = 512, epsilon: float = 1e-5):
        super(LayerNormalization, self).__init__()

        self.normalized_shape = normalized_shape
        self.epsilon = epsilon

        self.gamma = nn.Parameter(
            data=torch.ones((normalized_shape,)), requires_grad=True
        )
        self.beta = nn.Parameter(
            data=torch.zeros((normalized_shape,)), requires_grad=True
        )

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            self.mean = torch.mean(x, dim=-1)
            self.variance = torch.var(x, dim=-1)

            self.mean = self.mean.unsqueeze(-1)
            self.variance = self.variance.unsqueeze(-1)

            return (x - self.mean) / torch.sqrt(
                self.variance + self.epsilon
            ) * self.gamma + self.beta


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Layer Normalization for transformer".title()
    )
    parser.add_argument(
        "--normalized_shape",
        type=int,
        default=512,
        help="The normalized shape of the input tensor".capitalize(),
    )
    parser.add_argument(
        "--epsilon",
        type=float,
        default=1e-6,
        help="The epsilon value for the variance".capitalize(),
    )

    args = parser.parse_args()

    normalized_shape = args.normalized_shape
    epsilon = args.epsilon

    layer_norm = LayerNormalization(normalized_shape=normalized_shape, epsilon=epsilon)

    assert layer_norm(torch.randn((40, 200, 512))).size() == (
        40,
        200,
        normalized_shape,
    ), "Layer Normalization is not working properly, check the dimensions".title()

### PointWise Neural Network

In [None]:
class PointWiseFeedForward(nn.Module):
    def __init__(
        self,
        in_features: int = 512,
        out_features: int = 2048,
        dropout: float = 0.1,
        display: bool = False,
    ):
        super(PointWiseFeedForward, self).__init__()

        self.in_features = in_features
        self.out_features = out_features
        self.dropout = dropout
        self.display = display

        self.layers = list()

        for index in range(2):
            self.layers.append(
                nn.Linear(
                    in_features=self.in_features,
                    out_features=self.out_features,
                    bias=False,
                )
            )

            self.in_features = self.out_features
            self.out_features = in_features

            if index == 0:
                self.layers.append(nn.ReLU(inplace=True))
                self.layers.append(nn.Dropout1d(p=self.dropout))

        self.model = nn.Sequential(*self.layers)

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            return self.model(x)

        else:
            raise TypeError("Input type is not a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Feed Forward Network for Transformer".title()
    )
    parser.add_argument(
        "--in_features", type=int, default=512, help="Input features".capitalize()
    )
    parser.add_argument(
        "--out_features", type=int, default=2048, help="Output features".capitalize()
    )
    parser.add_argument(
        "--dropout",
        type=float,
        default=0.1,
        help="Dropout rate".capitalize(),
    )
    parser.add_argument(
        "--display",
        type=bool,
        default=False,
        help="Display the model".capitalize(),
    )

    args = parser.parse_args()

    net = PointWiseFeedForward(
        in_features=args.in_features, out_features=args.out_features
    )

    assert net(torch.randn((40, 200, args.in_features))).size() == (
        40,
        200,
        args.in_features,
    )
    if args.display:
        print(summary(model=net, input_size=(200, 512)))

        path = config()["path"]["FILES_PATH"]

        draw_graph(
            model=net, input_data=torch.randn((40, 200, 512))
        ).visual_graph.render(
            filename=os.path.join(path, "feedforward_network"), format="png"
        )

#### Masking

In [None]:
def padding_mask(mask: torch.Tensor):
    mask = torch.where(mask == 0.0, float("-inf"), mask)
    return mask.unsqueeze(1).unsqueeze(2)


def target_mask(sequence_length: int = 200):
    mask = torch.triu(input=torch.ones((sequence_length, sequence_length)), diagonal=1)
    mask = torch.where(mask == 1.0, float("-inf"), mask)
    return mask


if __name__ == "__main__":
    mask = padding_mask(mask=torch.ones((40, 200)))
    assert mask.size() == (40, 1, 1, 200)

    mask = target_mask(sequence_length=200)
    print(mask.size())

### Transformer Attention - Scaled dot product

In [None]:
def scaled_dot_product(
    query: torch.Tensor,
    key: torch.Tensor,
    values: torch.Tensor,
    mask=None,
    type: str = "src",
):
    if (
        isinstance(query, torch.Tensor)
        and isinstance(key, torch.Tensor)
        and isinstance(values, torch.Tensor)
    ) and (query.size() == key.size() == values.size()):

        result = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(
            (query.size(1) * query.size(3))
        )

        if (mask is not None) and (type == "src"):
            result += padding_mask(mask=mask)

        elif type == "target":
            result += (
                target_mask(sequence_length=result.size(-1)).unsqueeze(0).unsqueeze(1)
            )

        result = torch.softmax(input=result, dim=-1)

        result = torch.matmul(result, values)

        return result
    else:
        raise TypeError(
            "The query, key, and values must be of type torch.Tensor and same shape".capitalize()
        )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Scaled dot product for Transformer".title()
    )
    parser.parse_args()

    query = torch.randn((40, 8, 200, 64))
    key = torch.randn((40, 8, 200, 64))
    values = torch.randn((40, 8, 200, 64))

    attention_output = scaled_dot_product(query=query, key=key, values=values)

    assert attention_output.size() == (40, 8, 200, 64)

### Multihead Attention

In [None]:
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, dimension=512, heads: int = 8, dropout: float = 0.1):
        super(MultiHeadAttentionLayer, self).__init__()

        self.dimension = dimension
        self.heads = heads
        self.dropout = dropout

        assert (
            self.dimension % self.heads == 0
        ), "Dimension must be divisible by heads".title()

        self.QKV = nn.Linear(
            in_features=self.dimension, out_features=3 * self.dimension, bias=False
        )
        self.layer = nn.Linear(
            in_features=self.dimension, out_features=self.dimension, bias=False
        )

    def forward(self, x: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor):
            self.mask = mask

            QKV = self.QKV(x)

            self.query, self.key, self.values = torch.chunk(input=QKV, chunks=3, dim=-1)

            self.query = self.query.view(
                self.query.size(0),
                self.query.size(1),
                self.heads,
                self.dimension // self.heads,
            )
            self.key = self.key.view(
                self.key.size(0),
                self.key.size(1),
                self.heads,
                self.dimension // self.heads,
            )
            self.values = self.values.view(
                self.values.size(0),
                self.values.size(1),
                self.heads,
                self.dimension // self.heads,
            )

            self.query = self.query.permute(0, 2, 1, 3)
            self.key = self.key.permute(0, 2, 1, 3)
            self.values = self.values.permute(0, 2, 1, 3)

            try:
                self.attention = scaled_dot_product(
                    query=self.query, key=self.key, values=self.values, mask=self.mask
                )

            except Exception as e:
                print("An error occured : {}".format(e))

            else:
                self.attention = self.attention.view(
                    self.attention.size(0),
                    self.attention.size(2),
                    self.attention.size(1) * self.attention.size(3),
                )

                return self.layer(self.attention)

        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="MultiHeadAttention Layer for Transformer".title()
    )
    parser.add_argument(
        "--dimension",
        type=int,
        default=512,
        help="Dimension of the input tensor".title(),
    )
    parser.add_argument(
        "--heads",
        type=int,
        default=8,
        help="Number of heads for the multihead attention".title(),
    )
    parser.add_argument(
        "--dropout",
        type=float,
        default=0.1,
        help="Dropout rate for the multihead attention".title(),
    )

    args = parser.parse_args()

    dimension = args.dimension
    heads = args.heads
    dropout = args.dropout

    attention = MultiHeadAttentionLayer(
        dimension=dimension, heads=heads, dropout=dropout
    )

    input = torch.randn((40, 200, dimension))
    masked = torch.ones((40, 200))

    assert attention(input, masked).size() == (
        40,
        200,
        dimension,
    ), "Dimension of the output tensor must be equal to the input dimension".capitalize()

    masked = None

    assert attention(input).size() == (
        40,
        200,
        dimension,
    ), "Dimension of the output tensor must be equal to the input dimension".capitalize()

### Encoder Block

In [None]:
class EncoderBlock(nn.Module):
    def __init__(
        self,
        dimension: int = 512,
        heads: int = 8,
        feedforward: int = 2048,
        dropout: float = 0.1,
        epsilon: float = 1e-6,
        display: bool = False,
    ):
        super(EncoderBlock, self).__init__()

        self.dimension = dimension
        self.heads = heads
        self.feedforward = feedforward
        self.dropout = dropout
        self.epsilon = epsilon
        self.display = display

        self.multihead_attention = MultiHeadAttentionLayer(
            dimension=self.dimension, heads=self.heads, dropout=self.dropout
        )

        self.layer_norm = LayerNormalization(
            normalized_shape=self.dimension, epsilon=self.epsilon
        )

        self.feedforward_network = PointWiseFeedForward(
            in_features=self.dimension,
            out_features=self.feedforward,
            dropout=self.dropout,
        )

    def forward(self, x: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor):
            self.mask = mask

            residual = x

            x = self.multihead_attention(x=x, mask=self.mask)
            x = torch.dropout(input=x, p=self.dropout, train=self.training)
            x = torch.add(x, residual)
            x = self.layer_norm(x)

            residual = x

            x = self.feedforward_network(x=x)
            x = torch.dropout(input=x, p=self.dropout, train=self.training)
            x = torch.add(residual, x)
            x = self.layer_norm(x)

            return x

        else:
            raise TypeError("Input must be a tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Encoder Block for Transfomers".title()
    )

    parser.add_argument(
        "--dimension",
        type=int,
        default=512,
        help="Dimension of the input tensor".capitalize(),
    )
    parser.add_argument(
        "--heads",
        type=int,
        default=8,
        help="Number of heads in the multi-head attention".capitalize(),
    )
    parser.add_argument(
        "--feedfoward",
        type=int,
        default=2048,
        help="Dimension of the feedforward network".capitalize(),
    )
    parser.add_argument(
        "--dropout", type=float, default=0.1, help="Dropout rate".capitalize()
    )
    parser.add_argument(
        "-eps",
        type=float,
        default=1e-6,
        help="Epsilon value for layer normalization".capitalize(),
    )
    parser.add_argument(
        "--display", type=bool, default=False, help="Display the model".capitalize()
    )

    args = parser.parse_args()

    dimension = args.dimension
    heads = args.heads
    dropout = args.dropout
    feedforward = args.feedfoward
    eps = args.eps
    display = args.display

    encoder = EncoderBlock(
        dimension=dimension,
        heads=heads,
        dropout=dropout,
    )
    masked = torch.ones((40, 200))

    assert encoder(torch.randn((40, 200, dimension)), masked).size() == (
        40,
        200,
        dimension,
    ), "Encoder block is not working properl as dimension is not equal".title()

    masked = None

    assert encoder(torch.randn((40, 200, dimension))).size() == (
        40,
        200,
        dimension,
    ), "Encoder block is not working properl as dimension is not equal".title()

    if display:

        path = config()["path"]["FILES_PATH"]

        draw_graph(
            model=encoder, input_data=torch.randn((40, 200, dimension))
        ).visual_graph.render(filename=os.path.join(path, "one_encoder"), format="png")

### Encoder

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(
        self,
        d_model: int = 512,
        nhead: int = 8,
        dim_feedforward: int = 2048,
        num_encoder_layers: int = 8,
        dropout: float = 0.1,
        layer_norm_eps: float = 1e-6,
        display: bool = False,
    ):

        super(TransformerEncoder, self).__init__()

        self.dimension = d_model
        self.heads = nhead
        self.feedforward = dim_feedforward
        self.num_encoder_layers = num_encoder_layers
        self.dropout = dropout
        self.epsilon = layer_norm_eps
        self.display = display

        self.encoder = nn.Sequential(
            *[
                EncoderBlock(
                    dimension=self.dimension,
                    heads=self.heads,
                    feedforward=self.feedforward,
                    dropout=self.dropout,
                    epsilon=self.epsilon,
                )
                for _ in tqdm(range(self.num_encoder_layers))
            ]
        )

    def forward(self, x: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor):
            for layer in self.encoder:
                x = layer(x, mask)
            return x

        else:
            raise TypeError("Input must be a tensor".capitalize())

    @staticmethod
    def display_parameters(model: nn.Module):
        if isinstance(model, TransformerEncoder):
            return sum(params.numel() for params in model.parameters())

        else:
            raise TypeError("Input must be a transformer encoder".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Encoder for Transformer".title())
    parser.add_argument(
        "--d_model", type=int, default=512, help="Dimension of the model".capitalize()
    )
    parser.add_argument(
        "--nhead", type=int, default=8, help="Number of heads".capitalize()
    )
    parser.add_argument(
        "--feedforward",
        type=int,
        default=2048,
        help="Feedforward dimension".capitalize(),
    )
    parser.add_argument(
        "--dropout", type=float, default=0.1, help="Dropout rate".capitalize()
    )
    parser.add_argument(
        "--epsilon", type=float, default=1e-6, help="Epsilon for LayerNorm".capitalize()
    )
    parser.add_argument(
        "--display", type=bool, default=False, help="Display the model".capitalize()
    )
    parser.add_argument(
        "--num_encoder_layers",
        type=int,
        default=8,
        help="Number of encoder layers".capitalize(),
    )

    args = parser.parse_args()

    d_model = args.d_model
    nheads = args.nhead
    feedforward = args.feedforward
    num_encoder_layers = args.num_encoder_layers
    dropout = args.dropout
    epsilon = args.epsilon
    display = args.display

    input = torch.randn((40, 200, d_model))
    masked = torch.ones((40, 200))

    encoderTransformer = TransformerEncoder(
        d_model=d_model,
        nhead=nheads,
        dim_feedforward=feedforward,
        dropout=dropout,
        layer_norm_eps=epsilon,
        display=display,
        num_encoder_layers=num_encoder_layers,
    )

    assert encoderTransformer(input, masked).size() == (
        40,
        200,
        d_model,
    ), "Transformer Encoder block is not working properl as dimension is not equal"

    masked = None

    assert encoderTransformer(input).size() == (
        40,
        200,
        d_model,
    ), "Transformer Encoder block is not working properl as dimension is not equal"

    if display:
        print(
            f"Total parameters of the transformer encoder {TransformerEncoder.display_parameters(model=encoderTransformer)}"
        )

        try:
            path = config()["path"]["FILES_PATH"]

            draw_graph(
                model=encoderTransformer, input_data=torch.randn((40, 200, d_model))
            ).visual_graph.render(
                filename=os.path.join(path, "encoderTransformer"), format="png"
            )

        except Exception as e:
            print("An error occurred: ", e)

        else:
            print(f"Encoder Transformer graph saved successfully in the path {path}")

### MultiCross Attention Layer

In [None]:
class MultiCrossAttentionLayer(nn.Module):
    def __init__(self, dimension: int = 512, heads: int = 8, dropout: float = 0.1):
        super(MultiCrossAttentionLayer, self).__init__()

        self.dimension = dimension
        self.heads = heads
        self.dropout = dropout

        assert (
            self.dimension % self.heads == 0
        ), "Dimension must be divisible by heads".title()

        self.KV = nn.Linear(
            in_features=self.dimension, out_features=2 * self.dimension, bias=False
        )
        self.Q = nn.Linear(
            in_features=self.dimension, out_features=self.dimension, bias=False
        )

        self.layer = nn.Linear(
            in_features=self.dimension, out_features=self.dimension, bias=False
        )

    def forward(self, x: torch.Tensor, y: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor) and isinstance(y, torch.Tensor):
            self.mask = mask

            KV = self.KV(x)
            Q = self.Q(y)

            self.key, self.value = torch.chunk(input=KV, chunks=2, dim=-1)
            self.query = Q

            self.key = self.key.view(
                self.key.size(0),
                self.key.size(1),
                self.heads,
                self.dimension // self.heads,
            )
            self.value = self.value.view(
                self.value.size(0),
                self.value.size(1),
                self.heads,
                self.dimension // self.heads,
            )
            self.query = self.query.view(
                self.query.size(0),
                self.query.size(1),
                self.heads,
                self.dimension // self.heads,
            )

            self.key = self.key.permute(0, 2, 1, 3)
            self.value = self.value.permute(0, 2, 1, 3)
            self.query = self.query.permute(0, 2, 1, 3)

            result = scaled_dot_product(
                query=self.query,
                key=self.key,
                values=self.value,
                type="target",
            )
            result = result.view(
                result.size(0), result.size(2), result.size(1) * result.size(3)
            )
            return self.layer(result)

        else:
            raise TypeError("x and y must be torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Multi Cross Attention Layer for Transformer".title()
    )
    parser.add_argument(
        "--dimension", type=int, default=512, help="dimension".capitalize()
    )
    parser.add_argument("--heads", type=int, default=8, help="heads".capitalize())
    parser.add_argument(
        "--dropout", type=float, default=0.1, help="dropout".capitalize()
    )

    args = parser.parse_args()

    dimension = args.dimension
    heads = args.heads
    dropout = args.dropout

    attention = MultiCrossAttentionLayer(
        dimension=dimension,
        heads=heads,
        dropout=dropout,
    )

    assert attention(
        torch.randn((40, 200, dimension)), torch.randn((40, 200, dimension))
    ).size() == (
        40,
        200,
        dimension,
    ), "Multi Cross Attention Layer for Transformer is not working properly".capitalize()

### Decoder Block

In [None]:
class DecoderBlock(nn.Module):
    def __init__(
        self,
        dimension: int = 512,
        heads: int = 8,
        feedforward: int = 2048,
        dropout: float = 0.1,
        epsilon: float = 1e-6,
        display: bool = False,
    ):
        super(DecoderBlock, self).__init__()

        self.dimension = dimension
        self.heads = heads
        self.feedforward = feedforward
        self.dropout = dropout
        self.epsilon = epsilon
        self.display = display

        self.masked_multihead_attention = MultiHeadAttentionLayer(
            dimension=self.dimension,
            heads=self.heads,
            dropout=self.dropout,
        )

        self.layer_norm = LayerNormalization(
            normalized_shape=self.dimension,
            epsilon=self.epsilon,
        )

        self.encoder_deecoder_attention = MultiCrossAttentionLayer(
            dimension=self.dimension,
            heads=self.heads,
            dropout=self.dropout,
        )

        self.feedforward_network = PointWiseFeedForward(
            in_features=self.dimension,
            dropout=self.dropout,
        )

    def forward(self, x: torch.Tensor, y: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor):
            residual = y

            y = self.masked_multihead_attention(x=y, mask=mask)
            y = torch.dropout(input=y, p=self.dropout, train=self.training)
            y = torch.add(y, residual)
            y = self.layer_norm(y)

            residual = y

            y = self.encoder_deecoder_attention(x=x, y=y, mask=None)
            y = torch.dropout(input=y, p=self.dropout, train=self.training)
            y = torch.add(y, residual)
            y = self.layer_norm(y)

            residual = y

            y = self.feedforward_network(y)
            y = torch.dropout(input=y, p=self.dropout, train=self.training)
            y = torch.add(y, residual)
            y = self.layer_norm(y)

            return y
        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Decoder block for the Transformer model".title()
    )
    parser.add_argument(
        "--dimension",
        type=int,
        default=512,
        help="Dimension of the input tensor".capitalize(),
    )
    parser.add_argument(
        "--heads",
        type=int,
        default=8,
        help="Number of attention heads".capitalize(),
    )
    parser.add_argument(
        "--dropout",
        type=float,
        default=0.1,
        help="Dropout rate".capitalize(),
    )
    parser.add_argument(
        "--epsilon",
        type=float,
        default=1e-6,
        help="Epsilon for the layer normalization".capitalize(),
    )
    parser.add_argument(
        "--feedforward",
        type=int,
        default=2048,
        help="Feedforward dimension".capitalize(),
    )
    parser.add_argument(
        "--display",
        type=bool,
        default=False,
        help="Display the model architecture".capitalize(),
    )

    args = parser.parse_args()

    dimension = args.dimension
    heads = args.heads
    dropout = args.dropout
    epsilon = args.epsilon
    feedforward = args.feedforward

    decoder = DecoderBlock(
        dimension=dimension,
        heads=heads,
        dropout=dropout,
        epsilon=epsilon,
        feedforward=feedforward,
    )
    X = torch.randn((40, 200, dimension))
    y = torch.randn((40, 200, dimension))

    assert decoder(X, y).size() == (
        40,
        200,
        dimension,
    ), "Model output size is incorrect from decoder block".capitalize()

    if args.display:
        path = config()["path"]["FILES_PATH"]

        draw_graph(model=decoder, input_data=(X, y)).visual_graph.render(
            filename=os.path.join(path, "one_decoder"), format="png"
        )

        print(f"Model architecture is saved in the path {path}".capitalize())

### Decoder

In [None]:
class TransformerDecoder(nn.Module):
    def __init__(
        self,
        d_model: int = 512,
        nhead: int = 8,
        dim_feedforward: int = 2048,
        dropout: float = 0.1,
        layer_norm_eps: float = 1e-6,
        num_decoder_layers: int = 6,
        display: bool = False,
    ):
        super(TransformerDecoder, self).__init__()

        self.dimension = d_model
        self.heads = nhead
        self.feedforward = dim_feedforward
        self.dropout = dropout
        self.epsilon = layer_norm_eps
        self.number_of_layers = num_decoder_layers
        self.display = display

        self.decoder = nn.Sequential(
            *[
                DecoderBlock(
                    dimension=self.dimension,
                    heads=self.heads,
                    feedforward=self.feedforward,
                    dropout=self.dropout,
                    epsilon=self.epsilon,
                )
                for _ in tqdm(range(self.number_of_layers))
            ]
        )

    def forward(self, x: torch.Tensor, y: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor) and (isinstance(y, torch.Tensor)):
            for layer in self.decoder:
                y = layer(x=x, y=y, mask=mask)

            return y

        else:
            raise TypeError(
                "Input and output must be of type torch.Tensor".capitalize()
            )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Decoder layer for the transformer".title()
    )
    parser.add_argument(
        "--d_model", type=int, default=512, help="Dimension of the model".capitalize()
    )
    parser.add_argument(
        "--num_decoder_layers",
        type=int,
        default=8,
        help="Number of decoder layers".capitalize(),
    )
    parser.add_argument(
        "--dim_feedforward",
        type=int,
        default=2048,
        help="Dimension of the feedforward layer".capitalize(),
    )
    parser.add_argument(
        "--dropout", type=float, default=0.1, help="Dropout rate".capitalize()
    )
    parser.add_argument(
        "--layer_norm_eps",
        type=float,
        default=1e-6,
        help="Layer norm epsilon".capitalize(),
    )
    parser.add_argument(
        "--display", type=bool, default=False, help="Display the model".capitalize()
    )

    args = parser.parse_args()
    d_model = args.d_model
    num_decoder_layers = args.num_decoder_layers
    dim_feedforward = args.dim_feedforward
    dropout = args.dropout
    layer_norm_eps = args.layer_norm_eps

    decoderTransformer = TransformerDecoder(
        d_model=d_model,
        num_decoder_layers=num_decoder_layers,
        dim_feedforward=dim_feedforward,
        dropout=dropout,
        layer_norm_eps=layer_norm_eps,
    )

    X = torch.randn((40, 200, d_model))
    y = torch.randn((40, 200, d_model))
    padding_masked = torch.randn((40, 200))

    assert decoderTransformer(X, y, padding_masked).size() == (
        40,
        200,
        d_model,
    ), "Dimension mismatch in the decoder".title()

    if args.display:
        print(
            "Total parameters of the decoder transformer: ",
            sum(params.numel() for params in decoderTransformer.parameters()),
        )
        path = config()["path"]["FILES_PATH"]

        draw_graph(
            model=decoderTransformer, input_data=(X, y, padding_masked)
        ).visual_graph.render(
            filename=os.path.join(path, "decoderTransformer"), format="png"
        )

        print(f"Decoder model saved in the path {path}")

## Transformer

In [None]:
class Transformer(nn.Module):
    def __init__(
        self,
        d_model: int = 512,
        nhead: int = 8,
        num_encoder_layers: int = 8,
        num_decoder_layers: int = 6,
        dim_feedforward: int = 2048,
        dropout: float = 0.1,
        layer_norm_eps: float = 1e-05,
    ):
        super(Transformer, self).__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.num_encoder_layers = num_encoder_layers
        self.num_decoder_layers = num_decoder_layers
        self.dim_feedforward = dim_feedforward
        self.dropout = dropout
        self.layer_norm_eps = layer_norm_eps

        self.transformerEncoder = TransformerEncoder(
            d_model=self.d_model,
            nhead=self.nhead,
            dim_feedforward=self.dim_feedforward,
            num_encoder_layers=self.num_encoder_layers,
            dropout=self.dropout,
            layer_norm_eps=self.layer_norm_eps,
        )

        self.transformerDecoder = TransformerDecoder(
            d_model=self.d_model,
            nhead=self.nhead,
            dim_feedforward=self.dim_feedforward,
            num_decoder_layers=self.num_decoder_layers,
            dropout=self.dropout,
            layer_norm_eps=self.layer_norm_eps,
        )

    def forward(
        self,
        x: torch.Tensor,
        y: torch.Tensor,
        encoder_padding_mask=None,
        decoder_padding_mask=None,
    ):
        if isinstance(x, torch.Tensor) and isinstance(y, torch.Tensor):
            x = self.transformerEncoder(x=x, mask=encoder_padding_mask)
            x = self.transformerDecoder(x=x, y=y, mask=decoder_padding_mask)

            return x


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Transformer model".capitalize())
    parser.add_argument(
        "--d_model", type=int, default=512, help="Embedding dimension".capitalize()
    )
    parser.add_argument(
        "--nhead", type=int, default=8, help="Number of heads".capitalize
    )
    parser.add_argument(
        "--ffn", type=int, default=2048, help="Feed forward dimension".capitalize()
    )
    parser.add_argument(
        "--encoder_layers",
        type=int,
        default=8,
        help="Number of encoder layers".capitalize(),
    )
    parser.add_argument(
        "--decoder_layers",
        type=int,
        default=8,
        help="Number of decoder layers".capitalize(),
    )
    parser.add_argument(
        "--dropout", type=float, default=0.1, help="Dropout".capitalize()
    )
    parser.add_argument(
        "--eps", type=float, default=1e-6, help="Layer norm epsilon".capitalize()
    )
    parser.add_argument(
        "--display", type=bool, default=False, help="Display model".capitalize()
    )

    args = parser.parse_args()

    d_model = args.d_model
    nhead = args.nhead
    dim_feedforward = args.ffn
    num_encoder_layers = args.encoder_layers
    num_decoder_layers = args.decoder_layers
    dropout = args.dropout
    layer_norm_eps = args.eps

    transformer = Transformer(
        d_model=d_model,
        nhead=nhead,
        dim_feedforward=dim_feedforward,
        num_encoder_layers=num_decoder_layers,
        num_decoder_layers=num_decoder_layers,
        dropout=dropout,
        layer_norm_eps=layer_norm_eps,
    )

    X = torch.randn((40, 200, d_model))
    y = torch.randn((40, 200, d_model))

    encoder_padding_mask = torch.ones((40, 200))
    decoder_padding_mask = torch.ones((40, 200))

    assert (
        transformer(
            x=X,
            y=y,
            encoder_padding_mask=encoder_padding_mask,
            decoder_padding_mask=decoder_padding_mask,
        ).size()
    ) == (
        40,
        200,
        d_model,
    ), "Output of the transformer is not correct as the dimensions are not matching".title()

    if args.display:
        print(
            "Total parameters of the transformer: ",
            sum(params.numel() for params in transformer.parameters()),
        )

        path = config()["path"]["FILES_PATH"]

        draw_graph(
            model=transformer,
            input_data=(X, y, encoder_padding_mask, decoder_padding_mask),
        ).visual_graph.render(filename=os.path.join(path, "Transformer"), format="png")

        print(f"Decoder model saved in the path {path}")

### Tokenize

In [None]:
class Tokenizer:
    def __init__(
        self,
        text: list,
        padding: str = "max_length",
        truncation: bool = True,
        return_tensors: str = "pt",
        max_length: int = 200,
        batch_size: int = 4,
        return_attention_mask: bool = True,
    ):
        self.text = text
        self.padding = padding
        self.truncation = truncation
        self.return_tensors = return_tensors
        self.max_length = max_length
        self.batch_size = batch_size
        self.return_attention_mask = return_attention_mask

        try:
            self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
        except Exception as e:
            print("An error occurred: ", e)

    def tokenize_text(self):
        if isinstance(self.text, list):
            tokenizer_inputs = self.tokenizer(
                self.text,
                padding=self.padding,
                truncation=self.truncation,
                return_tensors=self.return_tensors,
                max_length=self.max_length,
                return_attention_mask=self.return_attention_mask,
            )

            return {
                "tokenize_inputs": tokenizer_inputs,
                "input_ids": tokenizer_inputs["input_ids"],
                "attention_mask": tokenizer_inputs["attention_mask"],
                "vocab_size": self.tokenizer.vocab_size,
            }

        else:
            raise TypeError("Input must be a list of strings".capitalize())

    def create_dataloader(self):
        try:
            tokenize = self.tokenize_text()

            input_ids = tokenize["input_ids"]
            attention_mask = tokenize["attention_mask"]
            vocab_size = tokenize["vocab_size"]

            datasets = TensorDataset(input_ids, attention_mask)
            dataloader = DataLoader(
                dataset=datasets, batch_size=self.batch_size, shuffle=True
            )

            dump(
                value=dataloader,
                filename=os.path.join(
                    config()["path"]["PROCESSED_PATH"], "dataloader.pkl"
                ),
            )

            return {
                "tokenizer_object": self.tokenizer,
                "input_ids": input_ids,
                "dataloader": dataloader,
                "vocab_size": vocab_size,
            }

        except Exception as e:
            print("An error occurred: ", e)


if __name__ == "__main__":
    tokenizer = Tokenizer(
        text=english,
        padding="max_length",
        truncation=True,
        return_tensors="pt",
        max_length=200,
        batch_size=4,
        return_attention_mask=True,
    )

    tokenizer = tokenizer.create_dataloader()

    dataloader = tokenizer["dataloader"]
    vocab_size = tokenizer["vocab_size"]

    assert vocab_size == 30522

### Inference

In [None]:
"""
This script tests the implementation of a Transformer model from scratch using a dummy dataset. 
It includes tokenization, embedding, and the Transformer model itself. The script ensures that the 
code runs correctly and the Transformer model produces the expected output.

Modules used:
    - os
    - sys
    - transformers.AutoTokenizer
    - utils: Provides the dummy English and German sentences.
    - tokenizer: Custom Tokenizer class for tokenizing and processing text.
    - embedding_layer: Custom EmbeddingLayer class for creating embeddings.
    - transformer: Custom Transformer class for the Transformer model.

The script performs the following steps:
    1. Imports necessary modules and sets up paths.
    2. Checks if the lengths of the English and German sentences are equal.
    3. Initializes Tokenizers and DataLoaders for English and German sentences.
    4. Initializes the Embedding Layer with the appropriate vocabulary size and dimensions.
    5. Initializes the Transformer model with specified hyperparameters.
    6. Tests the Transformer model by feeding it embeddings from the dummy dataset and prints the output size.

Usage:
    This script is intended to verify that the implemented Transformer model works correctly with a dummy dataset.
    You can also use your own embeddings instead of the provided ones.
"""

# Variable values for easy configuration
MAX_LENGTH = 200
BATCH_SIZE = 40
EMBEDDING_DIMENSION = 512
NUM_ENCODER_LAYERS = 8
NUM_DECODER_LAYERS = 8
NUM_HEADS = 8
DIM_FEEDFORWARD = 2048
DROPOUT = 0.1
LAYER_NORM_EPS = 1e-5

# Ensure that the lengths of sentences match
if len(english) != len(german):
    raise ValueError("Length of the sentences are not equal")

# Initialize Tokenizers and DataLoaders
english_tokenizer = Tokenizer(
    text=english,
    padding="max_length",
    truncation=True,
    return_tensors="pt",
    max_length=MAX_LENGTH,
    batch_size=BATCH_SIZE,
)
english_tokenizer_results = english_tokenizer.create_dataloader()
english_dataloader = english_tokenizer_results["dataloader"]
english_vocab_size = english_tokenizer_results["vocab_size"]

german_tokenizer = Tokenizer(
    text=german,
    padding="max_length",
    truncation=True,
    return_tensors="pt",
    max_length=MAX_LENGTH,
    batch_size=BATCH_SIZE,
)
german_tokenizer_results = german_tokenizer.create_dataloader()
german_dataloader = german_tokenizer_results["dataloader"]
german_vocab_size = german_tokenizer_results["vocab_size"]

# Initialize Embedding Layer
embedding_layer = EmbeddingLayer(
    vocabulary_size=english_vocab_size,
    dimension=EMBEDDING_DIMENSION,
    sequence_length=MAX_LENGTH,
)

# Initialize Transformer
transformer_model = Transformer(
    d_model=EMBEDDING_DIMENSION,
    nhead=NUM_HEADS,
    num_encoder_layers=NUM_ENCODER_LAYERS,
    num_decoder_layers=NUM_DECODER_LAYERS,
    dim_feedforward=DIM_FEEDFORWARD,
    dropout=DROPOUT,
    layer_norm_eps=LAYER_NORM_EPS,
)

# Test the Transformer with embeddings
for (english_batch, english_padding_mask), (german_batch, german_padding_mask) in zip(
    english_dataloader, german_dataloader
):
    english_embeddings = embedding_layer(english_batch)
    german_embeddings = embedding_layer(german_batch)

    transformer_output = transformer_model(
        x=english_embeddings,
        y=german_embeddings,
        encoder_padding_mask=english_padding_mask,
        decoder_padding_mask=german_padding_mask,
    )
    print(transformer_output.size())
    break  # Test with only the first batch


####################################################################################################################
####################################################################################################################
#                            THIS IS ANOTHER APPROACH THAT YOU CAN USE TO RUN THE TRANSFORMER                      #
####################################################################################################################
####################################################################################################################

from transformers import AutoTokenizer
from torch.utils.data import DataLoader, TensorDataset

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")


############################
#          English         #
############################

english_tokenizer = tokenizer(
    english,
    padding="max_length",
    truncation=True,
    return_tensors="pt",
    max_length=MAX_LENGTH,
)

print("Tokenized Input IDs:", english_tokenizer["input_ids"].size())
print("Attention Mask:", english_tokenizer["attention_mask"].size())

print("*" * 50, "\n")

english_vocab_size = tokenizer.vocab_size

english_tokenizer_results = TensorDataset(
    english_tokenizer["input_ids"], english_tokenizer["attention_mask"]
)
english_tokenizer_dataloader = DataLoader(
    english_tokenizer_results, batch_size=BATCH_SIZE, shuffle=True
)

############################
#          German          #
############################

german_tokenizer = tokenizer(
    german,
    padding="max_length",
    truncation=True,
    return_tensors="pt",
    max_length=MAX_LENGTH,
)

print("Tokenized Input IDs:", german_tokenizer["input_ids"].size())
print("Attention Mask:", german_tokenizer["attention_mask"].size())

print("*" * 50, "\n")

german_vocab_size = tokenizer.vocab_size

german_tokenizer_results = TensorDataset(
    german_tokenizer["input_ids"], german_tokenizer["attention_mask"]
)
german_tokenizer_dataloader = DataLoader(
    german_tokenizer_results, batch_size=BATCH_SIZE, shuffle=True
)

###########################
#         Embedding       #
###########################

assert german_vocab_size == english_vocab_size, "Vocabulary sizes must be equal"

embedding = EmbeddingLayer(
    vocabulary_size=english_vocab_size,
    sequence_length=MAX_LENGTH,
    dimension=EMBEDDING_DIMENSION,
)

# Test the Transformer with embeddings
for (english_batch, english_padding_mask), (german_batch, german_padding_mask) in zip(
    english_tokenizer_dataloader, german_tokenizer_dataloader
):
    english_embeddings = embedding(english_batch)
    german_embeddings = embedding(german_batch)

    transformer_output = transformer_model(
        x=english_embeddings,
        y=german_embeddings,
        encoder_padding_mask=english_padding_mask,
        decoder_padding_mask=german_padding_mask,
    )
    print(transformer_output.size())
    break  # Test with only the first batch