<a href="https://colab.research.google.com/github/jamadri/Transformers/blob/main/model_utils_and_training_drive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Utils and Model

## Utils

### addAndNorm.py

In [1]:
"""Residual connection."""


def addAndNorm(x, blockOutput, norm):
    """Residual connection."""
    return norm(x + blockOutput)


### PositionalEncoding.py

In [2]:
"""Positional Encoding."""
import math
import torch


def positionalEncoding(x, dim_model):
    """Positional Encoding."""
    def sineOrCosine(i):
        """sin(alpha+pi/2) = cos(alpha)."""
        return math.pi/2*(i % 2 == 1)
    values = [
        [sineOrCosine(i) + pos/math.pow(10000, 2*(i//2)/dim_model) for
         i in range(dim_model)]
        for pos in range(x.shape[0])
    ]
    return torch.sin(torch.tensor(values))

## Model

In [40]:
from torch import nn
import torch

class Transformer(nn.Module):
    """Main transformer block.

    Inputs: - model_parameters: ordered dictionary of key-values describing the
    layer parameters of the model:
      - dim_model: dimension of the model.
      - layers: dictionary of key-values describing specific layers
        - <layer_name>: dictionary of parameters for the specific multi-head
          layer
          - attention: dictionary of parameters for the specific attention
            function
            - dim_key: dimension of the key and query.
            - dim_value: dimension of the value.
          - nb_head: number of heads.

    """

    def __init__(self, model_parameters):
        """Initialize parameters."""
        super().__init__()
        self.dim_model = model_parameters["dim_model"]
        self.encoder = Encoder(model_parameters["encoder"])
        self.decoder = Decoder(model_parameters["decoder"])
        self.embedding = Embedding(model_parameters)
        self.toProba = nn.Sequential(
            nn.Linear(self.dim_model,
                      model_parameters["vocabulary_size"]),
            nn.Softmax()
        )
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, lastOutput):
        """Apply a step forward."""
        encoderInput = self.embedding(x) + positionalEncoding(
            x, self.dim_model)
        decoderInput = self.embedding(lastOutput) + positionalEncoding(
            lastOutput, self.dim_model)
        encoderInput = self.dropout(encoderInput)
        decoderInput = self.dropout(decoderInput)
        encoderOutput = self.encoder(encoderInput)
        decoderOutput = self.decoder(decoderInput, encoderOutput)
        lastOutput = self.toProba(decoderOutput)
        return lastOutput


class Encoder(nn.Module):
    """Encoder."""

    def __init__(self, encoderConfig):
        """Initialize."""
        super().__init__()
        self.nb_layers = encoderConfig["nb_layers"]
        self.dim_model = encoderConfig["dim_model"]
        self.dim_feedforward = encoderConfig["feedforward"]["dim_feedforward"]
        self.norm = nn.LayerNorm(normalized_shape=self.dim_model,
                                 elementwise_affine=True, bias=True)
        self.multiheads = [MultiHeadAttention(encoderConfig["multihead"],
                                              masked=False)
                           for i in range(self.nb_layers)]
        self.feedforwards = [nn.Sequential(nn.Linear(self.dim_model,
                                                     self.dim_feedforward),
                                           nn.ReLU(),
                                           nn.Linear(self.dim_feedforward,
                                                     self.dim_model))
                             for i in range(self.nb_layers)]
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        """Forward."""
        for i in range(self.nb_layers):
            h1 = addAndNorm(x, self.dropout(self.multiheads[i](x, x,
                                                               x)),
                            self.norm)
            x = addAndNorm(h1, self.dropout(self.feedforwards[i](h1)),
                           self.norm)
        return x


class Decoder(nn.Module):
    """Decoder."""

    def __init__(self, decoderConfig):
        """Initialize."""
        super().__init__()
        self.dim_model = decoderConfig["dim_model"]
        self.norm = nn.LayerNorm(normalized_shape=self.dim_model)
        self.dim_feedforward = decoderConfig["feedforward"]["dim_feedforward"]
        self.nb_layers = decoderConfig["nb_layers"]
        self.layer = []
        self.multiheads1 = [MultiHeadAttention(decoderConfig["multihead"],
                                               masked=True)
                            for i in range(self.nb_layers)]
        self.multiheads2 = [MultiHeadAttention(decoderConfig["multihead"],
                                               masked=False)
                            for i in range(self.nb_layers)]
        self.feedforwards = [nn.Sequential(nn.Linear(self.dim_model,
                                                     self.dim_feedforward),
                                           nn.ReLU(),
                                           nn.Linear(self.dim_feedforward,
                                                     self.dim_model))
                             for i in range(self.nb_layers)]

    def forward(self, decoderInput, encoderOutput):
        """Forward."""
        for i in range(self.nb_layers):
            h1 = addAndNorm(decoderInput,
                            self.multiheads1[i](decoderInput,
                                                decoderInput,
                                                decoderInput),
                            self.norm)
            h2 = addAndNorm(h1,
                            self.multiheads2[i](h1,
                                                encoderOutput,
                                                encoderOutput),
                            self.norm)
            lastOutput = addAndNorm(h2,
                                    self.feedforwards[i](h2),
                                    self.norm)
        return lastOutput


class LonelyDecoder(nn.Module):
    """A lonely decoder."""

    def __init__(self, model_parameters):
        """Initialize."""
        super().__init__()
        self.decoderConfig = model_parameters["decoder"]
        self.dim_model = self.decoderConfig["dim_model"]
        self.norm = nn.LayerNorm(normalized_shape=self.dim_model)
        self.dim_feedforward = self.decoderConfig["feedforward"]["dim_feedforward"]
        self.nb_layers = self.decoderConfig["nb_layers"]
        self.layer = []
        self.embedding = Embedding(model_parameters)
        self.multiheads1 = [MultiHeadAttention(self.decoderConfig["multihead"],
                                               masked=True)
                            for i in range(self.nb_layers)]
        self.multiheads2 = [MultiHeadAttention(self.decoderConfig["multihead"],
                                               masked=False)
                            for i in range(self.nb_layers)]
        self.feedforwards = [nn.Sequential(nn.Linear(self.dim_model,
                                                     self.dim_feedforward),
                                           nn.ReLU(),
                                           nn.Linear(self.dim_feedforward,
                                                     self.dim_model))
                             for i in range(self.nb_layers)]
        self.toProba = nn.Sequential(
            nn.Linear(self.dim_model,
                      model_parameters["vocabulary_size"]),
            nn.Softmax()
        )
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        """Forward."""
        x = self.embedding(x) + positionalEncoding(
            x, self.dim_model)
        for i in range(self.nb_layers):
            h1 = addAndNorm(x,
                            self.multiheads1[i](x, x, x),
                            self.norm)
            h2 = addAndNorm(h1,
                            self.multiheads2[i](h1, h1, h1),
                            self.norm)
            layerOutput = addAndNorm(h2,
                                     self.feedforwards[i](h2),
                                     self.norm)
        finalOutput = self.toProba(layerOutput)
        return finalOutput


class ScaledDotProductAttention(nn.Module):
    """Scaled Dot-Product Attention."""

    def __init__(self, dim_model, masked=False):
        """Initialize.

        Inputs:
        - dim_model: model dimension
        - masked: prevents tokens to attend to the following ones.
        """
        super().__init__()
        self.dim_model = dim_model
        self.masked = masked
        self.softmax = nn.Softmax()

    def forward(self, Q, K, V):
        """Forward.

        Inputs:
        - Q: query
        - K: key
        - V: value
        """
        matmul_0 = torch.matmul(Q, K.transpose(0, 1))
        scaled = torch.divide(matmul_0, torch.Tensor([self.dim_model]))
        if self.masked:
            mask = torch.ones(scaled.shape)
            mask = mask - torch.tril(mask)*mask
            mask = torch.where(mask == 1, float('-inf'), 0)
            scaled = scaled + mask
        softmaxed = self.softmax(scaled)
        sdpa = torch.matmul(softmaxed, V)
        return sdpa


class MultiHeadAttention(nn.Module):
    """Multi-Head Attention.

    Inputs:
    - multi_head_config: dictionary
    """

    def __init__(self, multi_head_config, masked=False):
        """Initialize multi-head."""
        super().__init__()
        self.dim_key = multi_head_config["attention"]["dim_key"]
        self.dim_value = multi_head_config["attention"]["dim_value"]
        self.nb_heads = multi_head_config["nb_heads"]
        self.dim_model = self.dim_key * self.nb_heads

        self.WQs = [nn.Linear(self.dim_model, self.dim_key)
                    for i in range(self.nb_heads)]
        self.WKs = [nn.Linear(self.dim_model, self.dim_key)
                    for i in range(self.nb_heads)]
        self.WVs = [nn.Linear(self.dim_model, self.dim_value)
                    for i in range(self.nb_heads)]
        self.spda = ScaledDotProductAttention(self.dim_model, masked)

    def forward(self, Q, K, V):
        """One step of the multi-head block."""
        heads = [self.spda(self.WQs[i](Q),
                           self.WKs[i](K),
                           self.WVs[i](V))
                 for i in range(self.nb_heads)]
        return torch.cat([head for head in heads], 1)


class Embedding(nn.Module):
    """Embedding."""

    def __init__(self, model_parameters):
        """Initialize embedding."""
        super().__init__()
        self.embedding = nn.Linear(model_parameters["vocabulary_size"],
                                   model_parameters["dim_model"])

    def forward(self, x):
        """Forward step."""
        return self.embedding(x)


## Data

In [6]:
from google.colab import drive
drive.mount('/content/drive')
with open('/content/drive/My Drive/foo.txt', 'w') as f:
  f.write('Hello Google Drive!')
!cat /content/drive/My\ Drive/foo.txt

Mounted at /content/drive


In [21]:
!ls /content/drive/MyDrive/Transformers/data

tiny_shakespeare.txt


In [23]:
tiny_shakespeare = open('/content/drive/MyDrive/Transformers/data/tiny_shakespeare.txt',
            'rb').read().decode(encoding='utf-8')

# Training

In [51]:
# """Training."""
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
# # import Transformers.model.transformer.Transformer

text = tiny_shakespeare
print('Length of text: {} characters'.format(len(text)))
print(text[:250])

# unique characters in the file
vocab = sorted(set(text+"@"+"#")) # @ will be the initial character
                                  # and # the final character. They
                                  # are not in the text.
print('{} unique characters'.format(len(vocab)))

# Lookup tables
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

text_as_int = np.array([char2idx[c] for c in text])
print("char2idx")
print(char2idx)
print("idx2char")
print(idx2char)
print("text_as_int")
print(text_as_int)
print ('{} ---- characters mapped to int ---- >{}'.format(repr(text[:13]), text_as_int[:13]))

# Create training examples:
seq_length = 8
examples_per_epoch = len(text)//(seq_length)

int_text_tensor = torch.tensor(text_as_int)
chunks = torch.chunk(int_text_tensor, examples_per_epoch, 0)
print(int_text_tensor)

examples = [chunk[:-1] for chunk in chunks]
targets = [chunk[1:] for chunk in chunks]
print(f"""There are {len(examples)} chunks of {seq_length} characters available for the
network training.""")



model_parameters = {
    "dim_model": 256,
    "vocabulary_size": 67,
    "batch_size": 64,
    "encoder": {
        "nb_layers": 1,
        "dim_model": 256,
        "multihead": {
            "attention": {
                "dim_model": 256,
                "dim_key": 128,
                "dim_value": 128
                },
            "nb_heads": 2
            },
        "feedforward": {
            "dim_feedforward": 256
            }
        },
    "decoder": {
        "nb_layers": 1,
        "vocabulary_size": 67,
        "dim_model": 256,
        "multihead": {
            "attention": {
                "dim_model": 256,
                "dim_key": 128,
                "dim_value": 128,
            },
            "nb_heads": 2
        },
        "feedforward": {
            "dim_feedforward": 256
        }
    }
}

# transformer = Transformer(model_parameters)
# x = torch.randn(10, 128)
# lastOutput = torch.randn(3, 128)
# transformer(x, lastOutput)

decoder = LonelyDecoder(model_parameters)
x = torch.randn(10, model_parameters["vocabulary_size"])
lastOutput = torch.randn(3, model_parameters["vocabulary_size"])
decoder(x)


one_hot_examples = F.one_hot(torch.stack(examples[:-1]).long(),
                             model_parameters["vocabulary_size"]).float()
one_hot_targets = F.one_hot(torch.stack(targets[:-1]).long(), model_parameters["vocabulary_size"]).float()

decoder(one_hot_examples[0])
one_hot_examples[0]

loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(decoder(one_hot_examples[0]), one_hot_targets[0])
loss.backward()
optimizer = torch.optim.Adam(decoder.parameters())

from torch.utils.data import DataLoader, Dataset

data = torch.stack((one_hot_examples, one_hot_targets), dim=0)
class customDataset(Dataset):
    def __init__(self, data):
        self.data  = data
    def __len__(self):
        return data.shape[1]
    def __getitem__(self, idx):
        return data[0,idx], data[1, idx]

dataset = customDataset(data)

train_dataloader = DataLoader(customDataset(data[:,0:201]), batch_size=64, shuffle=True)

def train_loop(dataloader, model, loss_fn, optimizer):
    """Train loop. Taken from pytorch tutorial."""
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch
    # normalization and dropout layers Unnecessary in this situation
    # but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        print(f"X shape: {X.shape}")
        pred = model(X)
        pred = char2idx["@"]
        pred = F.one_hot(torch.Tensor([pred]).long(),
                         model_parameters["vocabulary_size"]).float()
        for i in range(seq_length):
            new_pred = model(pred)
            print(new_pred.shape)
            pred = torch.cat((pred, torch.unsqueeze(new_pred[-1], dim=0)))
        print(pred.shape)
        loss = loss_fn(torch.unsqueeze(pred[1:],0), y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            print(''.join([idx2char[i] for i in torch.max(pred[1:],
                                                          1)[1].tolist()]))

train_loop(train_dataloader, decoder, loss_fn, optimizer)



# device = (
#     "cuda"
#     if torch.cuda.is_available()
#     else "mps"
#     if torch.backends.mps.is_available()
#     else "cpu"
# )
# print(f"Using {device} device")
# #print(transformer)


Length of text: 1115394 characters
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

67 unique characters
char2idx
{'\n': 0, ' ': 1, '!': 2, '#': 3, '$': 4, '&': 5, "'": 6, ',': 7, '-': 8, '.': 9, '3': 10, ':': 11, ';': 12, '?': 13, '@': 14, 'A': 15, 'B': 16, 'C': 17, 'D': 18, 'E': 19, 'F': 20, 'G': 21, 'H': 22, 'I': 23, 'J': 24, 'K': 25, 'L': 26, 'M': 27, 'N': 28, 'O': 29, 'P': 30, 'Q': 31, 'R': 32, 'S': 33, 'T': 34, 'U': 35, 'V': 36, 'W': 37, 'X': 38, 'Y': 39, 'Z': 40, 'a': 41, 'b': 42, 'c': 43, 'd': 44, 'e': 45, 'f': 46, 'g': 47, 'h': 48, 'i': 49, 'j': 50, 'k': 51, 'l': 52, 'm': 53, 'n': 54, 'o': 55, 'p': 56, 'q': 57, 'r': 58, 's': 59, 't': 60, 'u': 61, 'v': 62, 'w': 63, 'x': 64, 'y': 65, 'z': 66}
idx2char
['\n' ' ' '!' '#' '$' '&' "'" ',' '-' '.' '3' ':' ';' '?' '@' 'A' 'B' 'C'
 'D' '

  return self._call_impl(*args, **kwargs)


X shape: torch.Size([64, 8, 67])


RuntimeError: The size of tensor a (8) must match the size of tensor b (64) at non-singleton dimension 1