In [None]:
import math
import yaml
import torch
import joblib
import argparse
import torch.nn as nn
from torchsummary import summary
from torchview import draw_graph

## Utils file

In [None]:
class CustomException(Exception):
    def __init__(self, message: str):
        super(CustomException, self).__init__()
        self.message = message


def dump(value: str, filename: str):
    if (value is not None) and (filename is not None):
        joblib.dump(value=value, filename=filename)

    else:
        raise CustomException("Cannot be dump into pickle file".capitalize())


def load(filename: str):
    if filename is not None:
        joblib.load(filename=filename)

    else:
        raise CustomException("Cannot be load the pickle file".capitalize())


def device_init(self, device: str = "mps"):
    if device == "cuda":
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    elif device == "mps":
        return torch.device("mps" if torch.backends.mps.is_available() else "cpu")

    else:
        return torch.device("cpu")


def config():
    with open("../config.yml", "r") as file:
        return yaml.safe_load(file)

## Scaled_dot_product

In [None]:
def scaled_dot_product(
    query: torch.Tensor,
    key: torch.Tensor,
    values: torch.Tensor,
    dimension: int = 512,
    mask=None,
):
    if (
        (isinstance(query, torch.Tensor))
        and (isinstance(key, torch.Tensor))
        and isinstance(values, torch.Tensor)
    ):
        result = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(dimension)

        if (mask is not None) and isinstance(mask, torch.Tensor):
            mask = mask.unsqueeze(1).unsqueeze(2)

            result = torch.add(result, mask)

        result = torch.softmax(input=result, dim=-1)

        attention = torch.matmul(result, values)

        return result, attention

    else:
        raise TypeError(
            "All inputs, for instance, query, key, and values must be of type torch.Tensor".capitalize()
        )


if __name__ == "__main__":
    batch_size = config()["embedding"]["batch_size"]
    sequence_length = config()["embedding"]["sequence_length"]

    dimension = config()["transformer"]["dimension"]
    heads = config()["transformer"]["heads"]

    query = key = values = torch.randn(
        (
            batch_size,
            heads,
            sequence_length,
            dimension // heads,
        )
    )
    mask = torch.randn((batch_size, sequence_length))

    parser = argparse.ArgumentParser(
        description="Scaled Dot Product Attention for Transformers".title()
    )
    parser.add_argument(
        "--dimension",
        type=int,
        default=config()["transformer"]["dimension"],
        help="Dimension of the input".capitalize(),
    )
    parser.add_argument(
        "--query",
        type=torch.Tensor,
        default=query,
        help="Query tensor".capitalize(),
    )
    parser.add_argument(
        "--key",
        type=torch.Tensor,
        default=key,
        help="Key tensor".capitalize(),
    )
    parser.add_argument(
        "--values",
        type=torch.Tensor,
        default=values,
        help="Values tensor".capitalize(),
    )
    parser.add_argument(
        "--mask",
        type=torch.Tensor,
        default=mask,
        help="Mask tensor for padding".capitalize(),
    )

    args = parser.parse_args()

    values, attention = scaled_dot_product(
        query=args.query,
        key=args.key,
        values=args.values,
        mask=args.mask,
    )

    assert args.query.size() == attention.size()

## Multihead-attention

In [None]:
class MultiHeadAttenion(nn.Module):
    def __init__(self, dimension: int = 512, heads: int = 8, mask=None):
        super(MultiHeadAttenion, self).__init__()

        self.dimension = dimension
        self.heads = heads
        self.mask = mask

        assert (
            self.dimension % self.heads == 0
        ), "dimension should be divisible by heads".capitalize()

        self.QKV = nn.Linear(
            in_features=self.dimension, out_features=3 * self.dimension, bias=False
        )
        self.layer = nn.Linear(
            in_features=self.dimension, out_features=self.dimension, bias=False
        )

    def forward(self, x: torch.Tensor, mask=None):
        if isinstance(x, torch.Tensor):
            self.QueryKeyValues = self.QKV(x)

            self.query, self.key, self.values = torch.chunk(
                self.QueryKeyValues, 3, dim=-1
            )

            self.query = self.query.view(
                self.query.size(0),
                self.query.size(1),
                self.heads,
                self.dimension // self.heads,
            )
            self.key = self.key.view(
                self.key.size(0),
                self.key.size(1),
                self.heads,
                self.dimension // self.heads,
            )
            self.values = self.values.view(
                self.values.size(0),
                self.values.size(1),
                self.heads,
                self.dimension // self.heads,
            )

            self.query = self.query.permute(0, 2, 1, 3)
            self.key = self.key.permute(0, 2, 1, 3)
            self.values = self.values.permute(0, 2, 1, 3)

            _, attention = scaled_dot_product(
                query=self.query, key=self.key, values=self.values, mask=self.mask
            )

            assert (
                attention.size() == self.query.size()
                and self.key.size()
                and self.values.size()
            ), "attention size is not equal to query, key and values size".capitalize()

            self.attention = attention.view(
                attention.size(0),
                attention.size(2),
                attention.size(1),
                attention.size(3),
            )

            self.attention = self.attention.view(
                self.attention.size(0), self.attention.size(1), -1
            )

            assert (
                self.attention.size(-1) == self.dimension
            ), "attention size is not equal to dimension".capitalize()

            return self.layer(self.attention)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="MultiHeadAttention for Transformer Encoder".title()
    )
    parser.add_argument(
        "--dimension",
        type=int,
        default=config()["transformer"]["dimension"],
        help="dimension of the input".capitalize(),
    )
    parser.add_argument(
        "--heads",
        type=int,
        default=config()["transformer"]["heads"],
        help="number of heads".capitalize(),
    )
    parser.add_argument(
        "--mask",
        type=torch.Tensor,
        default=None,
        help="mask for attention".capitalize(),
    )

    args = parser.parse_args()

    batch_size = config()["embedding"]["batch_size"]
    sequence_length = config()["embedding"]["sequence_length"]

    attention = MultiHeadAttenion(
        dimension=args.dimension, heads=args.heads, mask=args.mask
    )

    assert attention(
        torch.randn(batch_size, sequence_length, args.dimension)
    ).size() == (
        batch_size,
        sequence_length,
        args.dimension,
    ), "MultiHeadAttention output size is not equal to input size".capitalize()

## Layer Normalization

In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, normalized_shape: int = 512, epsilon: float = 1e-05):
        super(LayerNormalization, self).__init__()

        self.normalized_shape = normalized_shape
        self.epsilon = epsilon

        self.gamma = nn.Parameter(torch.ones((self.normalized_shape,)))
        self.betas = nn.Parameter(torch.zeros((self.normalized_shape,)))

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            self.mean = x.mean(dim=-1)
            self.variance = x.var(dim=-1, unbiased=False)

            self.mean = self.mean.unsqueeze(-1)
            self.variance = self.variance.unsqueeze(-1)

            return (
                self.gamma
                * ((x - self.mean) / (torch.sqrt(self.variance + self.epsilon)))
                + self.betas
            )

        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Layer Normalization for Transformer encoder".title()
    )
    parser.add_argument(
        "--normalized_shape",
        type=int,
        default=config()["transformer"]["dimension"],
        help="The normalized shape of the input tensor".capitalize(),
    )
    parser.add_argument(
        "--epsilon",
        type=float,
        default=config()["transformer"]["eps"],
        help="Epsilon value".capitalize(),
    )
    args = parser.parse_args()

    normalization = LayerNormalization(
        normalized_shape=args.normalized_shape, epsilon=args.epsilon
    )

    assert normalization(
        torch.randn(
            config()["embedding"]["batch_size"],
            config()["embedding"]["sequence_length"],
            args.normalized_shape,
        )
    ).size() == (
        config()["embedding"]["batch_size"],
        config()["embedding"]["sequence_length"],
        args.normalized_shape,
    ), "Dimension mismatch in the layer normalization layer".capitalize()

## Pointwise Feed Forward Network

In [None]:
class PointWiseNeuralNetwork(nn.Module):
    def __init__(
        self, in_features: int = 512, out_features: int = 512, dropout: float = 0.1
    ):
        super(PointWiseNeuralNetwork, self).__init__()

        self.in_features = in_features
        self.out_features = out_features
        self.dropout = dropout

        self.layers = []

        for index in range(2):
            self.layers.append(
                nn.Linear(in_features=self.in_features, out_features=self.out_features),
            )

            self.in_features = self.out_features
            self.out_features = in_features

            if index % 2 == 0:
                self.layers.append(nn.ReLU(inplace=True))
                self.layers.append(nn.Dropout(p=self.dropout))

        self.layer = nn.Sequential(*self.layers)

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            return self.layer(x)

        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Pointwise neural network for transformer".title()
    )
    parser.add_argument(
        "--in_features",
        type=int,
        default=config()["transformer"]["dimension"],
        help="Input features".capitalize(),
    )
    parser.add_argument(
        "--out_features",
        type=int,
        default=config()["transformer"]["feed_forward"],
        help="Output features".capitalize(),
    )
    parser.add_argument(
        "--dropout", type=float, default=0.1, help="Dropout rate".capitalize()
    )

    args = parser.parse_args()

    net = PointWiseNeuralNetwork(
        in_features=args.in_features,
        out_features=args.out_features,
        dropout=args.dropout,
    )

    batch_size = config()["embedding"]["batch_size"]
    sequence_length = config()["embedding"]["sequence_length"]

    assert (
        args.in_features % config()["transformer"]["heads"] == 0
    ), "Input features must be divisible by the number of heads".capitalize()

    assert net(torch.rand((batch_size, sequence_length, args.in_features))).size() == (
        batch_size,
        sequence_length,
        args.in_features,
    ), "Dimensions do not match in the poinytwise neural network".capitalize()

## Encoder Block

In [None]:
class EncoderBlock(nn.Module):

    def __init__(
        self,
        dimension: int = 512,
        heads: int = 8,
        feed_forward: int = 2048,
        dropout: float = 0.1,
        epsilon: float = 1e-5,
        mask=None,
    ):
        super(EncoderBlock, self).__init__()

        self.dimension = dimension
        self.haeds = heads
        self.feed_forward = feed_forward
        self.dropout = dropout
        self.epsilon = epsilon
        self.mask = mask

        self.multihead_attention = MultiHeadAttenion(
            dimension=self.dimension, heads=self.haeds, mask=self.mask
        )

        self.layer_norm = LayerNormalization(
            normalized_shape=self.dimension, epsilon=self.epsilon
        )

        self.feedfoward = PointWiseNeuralNetwork(
            in_features=self.dimension,
            out_features=self.feed_forward,
            dropout=self.dropout,
        )

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            residual = x

            x = self.multihead_attention(x)
            x = torch.dropout(input=x, p=self.dropout, train=self.training)
            x = torch.add(x, residual)
            x = self.layer_norm(x)

            residual = x

            x = self.feedfoward(residual)
            x = torch.dropout(input=x, p=self.dropout, train=self.training)
            x = torch.add(x, residual)
            x = self.layer_norm(x)

            return x

        else:
            raise TypeError("Input must be a torch.Tensor".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Encoder Block for transformers".capitalize()
    )
    parser.add_argument(
        "--d_model",
        type=int,
        default=config()["transformer"]["dimension"],
        help="Dimension of the input tensor".capitalize(),
    )
    parser.add_argument(
        "--heads",
        type=int,
        default=config()["transformer"]["heads"],
        help="Number of heads in the multihead attention".capitalize(),
    )
    parser.add_argument(
        "--feedforward",
        type=int,
        default=config()["transformer"]["feed_forward"],
        help="Dimension of the feedforward layer".capitalize(),
    )
    parser.add_argument(
        "--dropout",
        type=float,
        default=config()["transformer"]["dropout"],
        help="Dropout rate".capitalize(),
    )
    parser.add_argument(
        "--epsilon",
        type=float,
        default=config()["transformer"]["eps"],
        help="Epsilon for layer norm".capitalize(),
    )
    parser.add_argument(
        "--mask",
        type=torch.Tensor,
        default=None,
        help="Mask for attention".capitalize(),
    )
    parser.add_argument(
        "--display", action="store_true", help="Display the arguments".capitalize()
    )

    args = parser.parse_args()

    dimension = args.d_model
    heads = args.heads
    feed_forward = args.feedforward
    dropout = args.dropout
    epsilon = args.epsilon
    mask = args.mask

    batch_size = config()["embedding"]["batch_size"]
    sequence_length = config()["embedding"]["sequence_length"]

    encoder = EncoderBlock(
        dimension=dimension,
        heads=heads,
        feed_forward=feed_forward,
        dropout=dropout,
        epsilon=epsilon,
        mask=mask,
    )

    assert encoder(torch.randn(batch_size, sequence_length, dimension)).size() == (
        batch_size,
        sequence_length,
        dimension,
    ), "Dimension mismatch in the EncoderBlock".capitalize()

    if args.display:
        print(summary(model=encoder, input_size=(sequence_length, dimension)), "\n")

        draw_graph(
            model=encoder,
            input_data=torch.randn(batch_size, sequence_length, dimension),
        ).visual_graph.render(
            filename=os.path.join(config()["path"]["FILES_PATH"], "encoder"),
            format="png",
        )

## Transformer Encoder Block

In [None]:
class TransformerEncoder(nn.Module):

    def __init__(
        self,
        dimension: int = 512,
        heads: int = 8,
        feed_forward: int = 2048,
        dropout: float = 0.1,
        epsilon: float = 1e-5,
        mask=None,
    ):
        super(TransformerEncoder, self).__init__()

        self.dimension = dimension
        self.heads = heads
        self.feed_forward = feed_forward
        self.dropout = dropout
        self.epsilon = epsilon
        self.mask = mask

        self.model = nn.Sequential(
            *[
                EncoderBlock(
                    dimension=self.dimension,
                    heads=self.heads,
                    feed_forward=self.feed_forward,
                    dropout=self.dropout,
                    epsilon=self.epsilon,
                    mask=self.mask,
                )
                for _ in range(self.heads)
            ]
        )

    def forward(self, x: torch.Tensor):
        if isinstance(x, torch.Tensor):
            return self.model(x)

        else:
            raise TypeError("Input must be a tensor".capitalize())

    @staticmethod
    def total_params(model=None):
        if isinstance(model, TransformerEncoder):
            return sum(params.numel() for params in model.parameters())

        else:
            raise TypeError("Input must be a transformer encoder".capitalize())


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Encoder Block for transformers".capitalize()
    )
    parser.add_argument(
        "--d_model",
        type=int,
        default=config()["transformer"]["dimension"],
        help="Dimension of the input tensor".capitalize(),
    )
    parser.add_argument(
        "--heads",
        type=int,
        default=config()["transformer"]["heads"],
        help="Number of heads in the multihead attention".capitalize(),
    )
    parser.add_argument(
        "--feedforward",
        type=int,
        default=config()["transformer"]["feed_forward"],
        help="Dimension of the feedforward layer".capitalize(),
    )
    parser.add_argument(
        "--dropout",
        type=float,
        default=config()["transformer"]["dropout"],
        help="Dropout rate".capitalize(),
    )
    parser.add_argument(
        "--epsilon",
        type=float,
        default=config()["transformer"]["eps"],
        help="Epsilon for layer norm".capitalize(),
    )
    parser.add_argument(
        "--mask",
        type=torch.Tensor,
        default=None,
        help="Mask for attention".capitalize(),
    )

    parser.add_argument(
        "--display", action="store_true", help="Display the arguments".capitalize()
    )

    args = parser.parse_args()

    dimension = args.d_model
    heads = args.heads
    feed_forward = args.feedforward
    dropout = args.dropout
    epsilon = args.epsilon
    mask = args.mask

    batch_size = config()["embedding"]["batch_size"]
    sequence_length = config()["embedding"]["sequence_length"]

    netTransfomer = TransformerEncoder(
        dimension=dimension,
        heads=heads,
        feed_forward=feed_forward,
        dropout=dropout,
        epsilon=epsilon,
        mask=mask,
    )
    assert netTransfomer(
        torch.randn(batch_size, sequence_length, dimension)
    ).size() == (
        batch_size,
        sequence_length,
        dimension,
    ), "Dimension mismatch in the EncoderBlock".capitalize()

    if args.display:

        print(
            "Total parameters of the model is # {}".format(
                TransformerEncoder.total_params(model=netTransfomer)
            )
        )

        print(summary(model=netTransfomer, input_size=(sequence_length, dimension)))

        draw_graph(
            model=netTransfomer,
            input_data=torch.randn(batch_size, sequence_length, dimension),
        ).visual_graph.render(
            filename=os.path.join(config()["path"]["FILES_PATH"], "transfomerEncoder"),
            format="png",
        )

## Inference

In [None]:
import torch
from .transformer import TransformerEncoder

"""
This script initializes a Transformer Encoder with specified parameters, 
creates a random embedding tensor, and prints the shapes of the embedding 
and the output tensors.

Attributes:
    batch_size (int): The batch size for the input tensor.
    sequence_length (int): The sequence length for the input tensor.
    model_dimension (int): The dimension of the model.
    feed_forward (int): The dimension of the feed forward network.
    number_heads (int): The number of attention heads.
    dropout (float): The dropout rate.
    epsilon (float): The epsilon value for numerical stability.
"""

batch_size = 64
sequence_length = 512
model_dimension = 768
feed_forward = 2048
number_heads = 12
dropout = 0.1
epsilon = 1e-6

# Create a random embedding tensor with the specified shape
embedding = torch.randn((batch_size, sequence_length, model_dimension))

# Create a random padding mask tensor
padding_masked = torch.randn((batch_size, sequence_length))

# Initialize the Transformer Encoder with the specified parameters
netTransformer = TransformerEncoder(
    dimension=model_dimension,
    heads=number_heads,
    feed_forward=feed_forward,
    dropout=dropout,
    epsilon=epsilon,
    mask=padding_masked,
)

# Print the divider line
print("|", "-" * 100, "|")

# Print the shape of the embedding tensor
print("|", "\tThe embedding shape is: ", embedding.size())

# Pass the embedding through the Transformer Encoder and print the output shape
print(
    "|",
    "\tThe output shape is: ",
    netTransformer(embedding).size(),
)  # (batch_size, sequence_length, model_dimension)

# Print the closing divider line
print("|", "-" * 100, "|")