<a href="https://colab.research.google.com/github/carloscotrini/transformers_from_scratch/blob/main/AML_MyTransfomerV2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [211]:
import random

SHAPES = ["triangle", "circle"]
PLURALS = [shape + "s" for shape in SHAPES]
ARTICLES = ["a", "one"]
TWO_ARTICLES = ["two"]
CONNECTORS = ["and", "then"]
REVERSE_CONNECTORS = ["after"]
CLASS_TOKEN = "CLS"
MASK_TOKEN = "MASK"
SEP_TOKEN = "SEP"
PAD_TOKEN = "PAD"
EOS_TOKEN = "EOS"
SPECIAL_TOKENS = [CLASS_TOKEN, MASK_TOKEN, SEP_TOKEN, PAD_TOKEN, EOS_TOKEN]
VOCABULARY = SHAPES + PLURALS + ARTICLES + CONNECTORS + REVERSE_CONNECTORS + TWO_ARTICLES + SPECIAL_TOKENS
MAX_LEN_SENTENCE = 16 # Maximum possible length of a sequence

def generate_descr_from_list(r):
  if len(r) > 4:
    raise Exception("Too many items")
  elif len(r) == 0:
    return ""
  elif len(r) == 1:
    article = random.choice(ARTICLES)
    return "{} {}".format(article, r[0])
  else:
    reversed_descr = random.random() > 0.5
    if reversed_descr:
      descr = "{} {} {}".format(r[1], random.choice(REVERSE_CONNECTORS), r[0])
      if len(r) > 2:
        return descr + " " + random.choice(CONNECTORS) + " " + generate_descr_from_list(r[2:])
      return descr
    if r[0] == r[1]:
      plural_desc = random.random() > 0.5
      if plural_desc:
          return "{} {}s".format(random.choice(TWO_ARTICLES), r[0])
    return generate_descr_from_list([r[0]]) + " " + random.choice(CONNECTORS) + " " + generate_descr_from_list(r[1:])


def generate_random_shapes():
  num_shapes = random.randint(1, 4)
  result = []
  for _ in range(num_shapes):
    result.append(random.choice(SHAPES))
  return result


for i in range(100):
  print(generate_descr_from_list(generate_random_shapes()))



one triangle
triangle after triangle
two triangles
circle after triangle then a circle and one triangle
one triangle
one circle
circle after triangle and one triangle
triangle after triangle and a triangle
one circle
one circle and circle after triangle and a circle
circle after circle and circle after triangle
one circle then triangle after triangle and one circle
circle after triangle
a circle and one triangle and one triangle then a circle
one circle
one circle
triangle after circle
circle after triangle
triangle after circle
triangle after circle and a triangle
triangle after triangle
circle after triangle
triangle after circle then one triangle
two triangles
triangle after triangle
triangle after triangle
one circle and one circle
a triangle and two triangles
triangle after triangle
circle after triangle then triangle after circle
a triangle
a triangle then a circle
triangle after circle
a circle and circle after circle and one triangle
triangle after triangle
one triangle
a trian

In [212]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw
import random

def generate_image(word_list, filename):
    # Create a blank 32x32 image
    image_size = 32
    patch_size = 16
    image = Image.new("1", (image_size, image_size), 1)  # '1' for 1-bit pixels, black and white

    for i, word in enumerate(word_list):
        if word not in ["triangle", "circle"]:
            continue

        # Determine the top-left corner of the patch
        x_offset = (i % 2) * patch_size
        y_offset = (i // 2) * patch_size

        # Draw the shape in the corresponding patch
        draw = ImageDraw.Draw(image)
        if word == "triangle":
            points = [(random.randint(x_offset, x_offset + patch_size), random.randint(y_offset, y_offset + patch_size)) for _ in range(3)]
            draw.polygon(points, fill=0)
        elif word == "circle":
            radius = random.randint(2, patch_size // 2)
            center_x = random.randint(x_offset + radius, x_offset + patch_size - radius)
            center_y = random.randint(y_offset + radius, y_offset + patch_size - radius)
            draw.ellipse([center_x - radius, center_y - radius, center_x + radius, center_y + radius], fill=0)

    # Save the image to the specified filename
    image.save(filename)

def plot_image(filename):
    # Open the image
    image = Image.open(filename)

    # Convert the image to a NumPy array
    image_array = np.array(image)

    # Plot the image
    plt.imshow(image_array)
    plt.axis('off')  # Turn off axis labels
    plt.show()

# Example usage:
generate_image(["circle", "triangle", "circle"], "output_image.png")


In [213]:
NUMBER_CODES = [str(i) for i in range(5)]
SHAPE_CODES = [shape[0] for shape in SHAPES]
CODES = NUMBER_CODES + SHAPE_CODES
CODE_VOCABULARY = SPECIAL_TOKENS + CODES

def generate_code_str(shape_list):
  codes = []
  i = 0
  while i < len(shape_list):
    j = i + 1
    while j < len(shape_list) and shape_list[i] == shape_list[j]:
      j += 1
    codes.append(f"{NUMBER_CODES[j-i]} {shape_list[i][0]}")
    i = j
  return " ".join(codes)

In [287]:
def generate_data(n_samples):
  examples = []
  for i in range(n_samples):
    shape_list = generate_random_shapes()
    code_str = generate_code_str(shape_list)
    text = generate_descr_from_list(shape_list)
    filename = f"f{i}.png"
    generate_image(shape_list, filename)
    examples.append({"shape_list": shape_list, "code_str": code_str, "text": text, "filename": filename})
  return examples


In [215]:
from torch.utils.data import Dataset


# Step 1: Prepare the Dataset
class CountingFiguresDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

In [216]:
!pip install einops



In [266]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import math
import torch.nn.functional as F
from einops import rearrange
from collections import OrderedDict
from easydict import EasyDict as edict

class MyTokenizer:
    def __init__(self, vocabulary):
        self.vocabulary = vocabulary

    def encode(self, text, add_special_tokens=True, max_length=MAX_LEN_SENTENCE, return_token_type_ids=False, padding='max_length', return_attention_mask=True, return_tensors='pt'):
        tokens = text.split()
        tokens.append(EOS_TOKEN)
        input_ids = [self.vocabulary.index(token) for token in tokens]
        attention_mask = [1] * len(input_ids)
        if add_special_tokens:
            input_ids = [self.vocabulary.index(CLASS_TOKEN)] + input_ids
            attention_mask += [1]

        sen_len = len(input_ids)
        if len(input_ids) > max_length:
            input_ids = input_ids[:max_length]
            attention_mask = attention_mask[:max_length]
            sen_len = max_length
        else:
            pad_length = max_length - len(input_ids)
            if pad_length >= 0:
                input_ids += [self.vocabulary.index(PAD_TOKEN)] * pad_length
                attention_mask += [0] * pad_length

        return sen_len, input_ids, attention_mask

    def encode_plus(self, text, add_special_tokens=True, max_length=MAX_LEN_SENTENCE, return_token_type_ids=False, padding='max_length', return_attention_mask=True, return_tensors='pt'):
        _, input_ids, attention_mask = self.encode(text, add_special_tokens, max_length, return_token_type_ids, padding, return_attention_mask, return_tensors)
        if return_attention_mask:
          return {
              'input_ids': torch.tensor(input_ids),
              'attention_mask': torch.tensor(attention_mask)
          }
        else:
          return {
              'input_ids': torch.tensor(input_ids)
          }


In [326]:
class MyAttention(nn.Module):
    def __init__(self, input_dim, hidden_key_dim, hidden_val_dim, enc_emb_dim=None, num_heads=1):
        """
          Implements an attention mechanism

          Args:
          input_dim: Dimensionality of input embedding.
          hidden_key_dim: Dimensionality of key and query vectors.
          hidden_val_dim: Dimensionality of value vectors.
          enc_emb_dim: Dimensionality of encoder embeddings. If None, self-attention is used.
          mask: Whether to apply masking. If True, the attention scores for masked positions are set to -inf.
          num_heads: Number of attention heads.
        """
        super(MyAttention, self).__init__()
        self.input_dim = input_dim
        self.hidden_key_dim = hidden_key_dim
        self.hidden_val_dim = hidden_val_dim
        self.enc_emb_dim = enc_emb_dim
        self.num_heads = num_heads

        self.to_q = nn.Linear(self.input_dim, self.hidden_key_dim * self.num_heads, bias=False)

        for i in range(self.num_heads):
          if enc_emb_dim is None:
              self.to_k = nn.Linear(self.input_dim, self.hidden_key_dim * self.num_heads, bias=False)
              self.to_v = nn.Linear(self.input_dim, self.hidden_val_dim * self.num_heads, bias=False)
          else:
              self.to_k = nn.Linear(self.enc_emb_dim, self.hidden_key_dim * self.num_heads, bias=False)
              self.to_v = nn.Linear(self.enc_emb_dim, self.hidden_val_dim * self.num_heads, bias=False)

        self.to_out = nn.Linear(self.hidden_val_dim * self.num_heads, self.input_dim)

    def forward(self, embeddings, encoder_embeddings=None, attention_mask=None):

        if encoder_embeddings is not None and attention_mask is not None:
            raise Exception("In cross attention there is no masking.")

        if encoder_embeddings is None:
            Q = self.to_q(embeddings)
            K = self.to_k(embeddings)
            V = self.to_v(embeddings)
        else:
            Q = self.to_q(embeddings)
            K = self.to_k(encoder_embeddings)
            V = self.to_v(encoder_embeddings)

        Q = rearrange(Q, 'B T (H D) -> B H T D', H=self.num_heads, D=self.hidden_key_dim)
        K = rearrange(K, 'B T (H D) -> B H T D', H=self.num_heads, D=self.hidden_key_dim)
        V = rearrange(V, 'B T (H D) -> B H T D', H=self.num_heads, D=self.hidden_val_dim)

        scores = torch.einsum("BHTD,BHSD->BHTS", Q, K)

        if attention_mask is not None and encoder_embeddings is None:
            # Originally, attention_mask has shape (batch_size, sequence_len)
            # To ensure propagation to the scores matrix, which has shape (batch_size, num_heads, sequence_len, sequence_len),
            # We need to make attention_mask's shape (batch_size, 1, 1, sequence_len)
            # We do this with the unsqueeze method, which adds a new dimension.
            attention_mask = attention_mask.unsqueeze(1).unsqueeze(1)
            print(scores.shape)
            print(attention_mask.shape)
            scores = scores.masked_fill(attention_mask == 0, float('-inf'))

        attnmats = F.softmax(scores / math.sqrt(self.hidden_key_dim), dim=-1)

        ctx_vecs = torch.einsum("BHTS,BHSD->BHTD", attnmats, V)
        ctx_vecs = rearrange(ctx_vecs, 'B H T D -> B T (H D)', H=self.num_heads, D=self.hidden_val_dim)
        return self.to_out(ctx_vecs)

In [219]:
class MyPositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=MAX_LEN_SENTENCE):
        super(MyPositionalEncoding, self).__init__()

        # Create a matrix of shape (max_len, d_model) with all zeros
        pe = torch.zeros(max_len, d_model)

        # Create a column vector of shape (max_len, 1) with values [0, 1, ..., max_len-1]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # Create a row vector of shape (1, d_model // 2) with values [0, 1, ..., d_model//2-1]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Apply sine to even indices and cosine to odd indices
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add a batch dimension (1, max_len, d_model) and register as buffer
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # Add positional encoding to the input tensor (B, T, D)
        x = x + self.pe[:, :x.size(1), :]
        return x

In [220]:
class MyTransformerEncoderLayer(nn.Module):
    def __init__(self, input_dim, hidden_key_dim, hidden_val_dim, output_dim, num_heads=1):
        super(MyTransformerEncoderLayer, self).__init__()
        self.input_dim = input_dim
        self.hidden_key_dim = hidden_key_dim
        self.hidden_val_dim = hidden_val_dim
        self.output_dim = output_dim
        self.num_heads = num_heads

        self.attention = MyAttention(self.input_dim, self.hidden_key_dim, self.hidden_val_dim, enc_emb_dim=None, num_heads=self.num_heads)
        self.norm1 = nn.LayerNorm(self.input_dim)
        self.feed_forward = nn.Sequential(
            nn.Linear(self.input_dim, self.output_dim),
            nn.ReLU()
        )
        self.norm2 = nn.LayerNorm(self.output_dim)

    def forward(self, x, attention_mask=None):
        x = self.norm1(self.attention(x, attention_mask=attention_mask) + x)
        x = self.norm2(self.feed_forward(x) + x)
        return x

In [221]:
class MyTransformerEncoder(nn.Module):
    def __init__(self, num_tokens, input_dim, hidden_key_dim, hidden_val_dim, output_dim, max_length, num_layers=1, num_heads=1):
        super(MyTransformerEncoder, self).__init__()
        self.num_tokens = num_tokens
        self.input_dim = input_dim
        self.hidden_key_dim = hidden_key_dim
        self.hidden_val_dim = hidden_val_dim
        self.output_dim = output_dim
        self.max_length = max_length
        self.num_heads = num_heads

        self.embedding = nn.Embedding(num_tokens, self.input_dim)
        self.positional_encoding = MyPositionalEncoding(self.input_dim, max_length)
        self.layers = nn.ModuleList([MyTransformerEncoderLayer(self.input_dim, self.hidden_key_dim, self.hidden_val_dim, self.input_dim, num_heads) for _ in range(num_layers)])
        self.linear = nn.Linear(self.input_dim, self.output_dim)
        self.norm = nn.LayerNorm(self.output_dim)

    def forward(self, x, attention_mask=None):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x, attention_mask=attention_mask)
        x = self.linear(x)
        x = self.norm(x)
        return x

In [222]:
class MyTransformerClassifier(nn.Module):
    def __init__(self, transf_enc, num_classes):
        super(MyTransformerClassifier, self).__init__()
        self.num_classes = num_classes
        self.transf_enc = transf_enc

        self.linear = nn.Linear(self.transf_enc.output_dim, self.num_classes)

    def forward(self, input_ids, labels=None, attention_mask=None):
        x = self.transf_enc(input_ids, attention_mask)[:, 0, :] # Just the embedding of the first token, which is the CLS token.
        logits = self.linear(x)

        loss = None
        if labels is not None:
            criterion = nn.CrossEntropyLoss()
            loss = criterion(logits, labels)
        return (loss, logits) if loss is not None else logits

In [223]:
import torch
from transformers import Trainer, TrainingArguments
import pandas as pd

# Step 2: Tokenizer
# tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
tokenizer = MyTokenizer(VOCABULARY)
tokenizer.encode_plus(
            "one circle after one circle and two triangles",
            add_special_tokens=True,
            max_length=MAX_LEN_SENTENCE,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

{'input_ids': tensor([10,  5,  1,  8,  5,  1,  6,  9,  2, 14, 13, 13, 13, 13, 13, 13]),
 'attention_mask': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0])}

In [224]:
# Create Dataset
lengths = list(map(len, shape_lists))
dataset = CountingFiguresDataset(descriptions, lengths, tokenizer, max_length=MAX_LEN_SENTENCE)

In [225]:
num_tokens = len(VOCABULARY)
input_dim = 16
hidden_key_dim = 8
hidden_val_dim = 8
num_heads = 2
output_dim = 16
num_layers = 3
num_labels = 5

# Step 3: Model
transf_enc = MyTransformerEncoder(num_tokens, input_dim=input_dim, hidden_key_dim=hidden_key_dim, hidden_val_dim=hidden_val_dim, output_dim=output_dim, max_length=MAX_LEN_SENTENCE, num_layers=num_layers, num_heads=num_heads)
model = MyTransformerClassifier(transf_enc, num_labels)

# Step 4: Training
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=200,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    warmup_steps=10,
    weight_decay=0.001,
    logging_dir='./logs',
    logging_steps=10,
    eval_strategy="epoch"
)

# Split dataset into train and eval
train_size = int(0.8 * len(dataset))
eval_size = len(dataset) - train_size
train_dataset, eval_dataset = torch.utils.data.random_split(dataset, [train_size, eval_size])

# Custom Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset
)

trainer.train()



Epoch,Training Loss,Validation Loss
1,1.3619,1.538641
2,1.6947,1.52977
3,1.3827,1.519997
4,1.3731,1.509507
5,1.5346,1.500978
6,1.3953,1.490364
7,1.4567,1.482291
8,1.4457,1.473454
9,1.4391,1.463923
10,1.5855,1.457543


TrainOutput(global_step=8000, training_loss=0.7032698403298855, metrics={'train_runtime': 146.4317, 'train_samples_per_second': 109.266, 'train_steps_per_second': 54.633, 'total_flos': 0.0, 'train_loss': 0.7032698403298855, 'epoch': 200.0})

In [226]:
from torch.utils.data import DataLoader

# Step 5: Evaluation
def evaluate_model(texts, labels):
    eval_dataset = CountingFiguresDataset(texts, labels, tokenizer, max_length=MAX_LEN_SENTENCE)
    eval_loader = DataLoader(eval_dataset, batch_size=2)
    total_correct = 0
    total_samples = len(labels)
    model.eval()
    with torch.no_grad():
        for batch in eval_loader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['labels']
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, dim=-1)
            total_correct += (preds == labels).sum().item()

    accuracy = total_correct / total_samples
    print(f'Accuracy: {accuracy * 100:.2f}%')


# Generate descriptions and images for each shape list

test_shape_lists = [generate_random_shapes() for _ in range(100)]

eval_descriptions = []
eval_lengths = []
for i, shape_list in enumerate(test_shape_lists):
  eval_descriptions.append(generate_descr_from_list(shape_list))
  eval_lengths.append(len(shape_list))

evaluate_model(eval_descriptions, eval_lengths)

Accuracy: 88.00%


In [227]:
class MyTransformerDecoderLayer(nn.Module):
    def __init__(self, input_dim, hidden_key_dim, hidden_val_dim, output_dim, enc_emb_dim, num_heads):
        super(MyTransformerDecoderLayer, self).__init__()
        self.input_dim = input_dim
        self.hidden_key_dim = hidden_key_dim
        self.hidden_val_dim = hidden_val_dim
        self.output_dim = output_dim
        self.enc_emb_dim = enc_emb_dim
        self.num_heads = num_heads

        self.masked_att = MyAttention(self.input_dim, self.hidden_key_dim, self.hidden_val_dim, enc_emb_dim=None, num_heads=self.num_heads)
        self.norm1 = nn.LayerNorm(self.input_dim)
        self.cross_att = MyAttention(self.input_dim, self.hidden_key_dim, self.hidden_val_dim, enc_emb_dim=self.enc_emb_dim, num_heads=self.num_heads)
        self.norm2 = nn.LayerNorm(self.input_dim)
        self.self_att = MyAttention(self.input_dim, self.hidden_key_dim, self.hidden_val_dim, enc_emb_dim=None, num_heads=self.num_heads)
        self.norm3 = nn.LayerNorm(self.input_dim)
        self.feed_forward = nn.Sequential(
            nn.Linear(self.input_dim, self.output_dim),
            nn.ReLU()
        )

    def forward(self, x, enc_emb, attention_mask):
        x = self.norm1(x + self.masked_att(x, attention_mask=attention_mask))
        x = self.norm2(x + self.cross_att(x, encoder_embeddings=enc_emb))
        x = self.norm3(x + self.self_att(x, attention_mask=attention_mask))
        x = self.feed_forward(x)
        return x

In [228]:
class MyTransformerDecoder(nn.Module):
    def __init__(self, num_tokens, input_dim, hidden_key_dim, hidden_val_dim, output_dim, enc_emb_dim, max_length, num_layers, num_heads):
        super(MyTransformerDecoder, self).__init__()
        self.num_tokens = num_tokens
        self.input_dim = input_dim
        self.hidden_key_dim = hidden_key_dim
        self.hidden_val_dim = hidden_val_dim
        self.output_dim = output_dim
        self.enc_emb_dim = enc_emb_dim
        self.max_length = max_length
        self.num_heads = num_heads

        self.embedding = nn.Embedding(num_tokens, self.input_dim)
        self.positional_encoding = MyPositionalEncoding(self.input_dim, max_length)
        self.layers = nn.ModuleList([MyTransformerDecoderLayer(input_dim=self.input_dim,
                                                               hidden_key_dim=self.hidden_key_dim,
                                                               hidden_val_dim=self.hidden_val_dim,
                                                               enc_emb_dim=self.enc_emb_dim,
                                                               output_dim=self.output_dim,
                                                               num_heads=num_heads) for _ in range(num_layers)])
        self.linear = nn.Linear(self.input_dim, self.output_dim)
        self.norm = nn.LayerNorm(self.output_dim)

    def forward(self, x, enc_emb, attention_mask):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x, enc_emb, attention_mask=attention_mask)
        x = self.linear(x)
        x = self.norm(x)
        return x

In [296]:
class MyTransformerTranslator(nn.Module):
    def __init__(self, transf_enc, transf_dec, num_tokens_target_vocab):
        super(MyTransformerTranslator, self).__init__()
        self.transf_enc = transf_enc
        self.transf_dec = transf_dec
        self.linear = nn.Linear(self.transf_dec.output_dim, num_tokens_target_vocab)

    def forward(self, source_tokens, target_tokens, attention_mask, labels=None):
        source_embeddings = self.transf_enc(source_tokens)
        decoded_embeddings = self.transf_dec(target_tokens, source_embeddings, attention_mask)[:, 0, :]
        logits = self.linear(decoded_embeddings)

        loss = None
        if labels is not None:
            criterion = nn.CrossEntropyLoss()
            loss = criterion(logits, labels)
        return (loss, logits) if loss is not None else logits

In [267]:

class MyRandomMaskTokenizer(MyTokenizer):
  def __init__(self, vocabulary):
    super().__init__(vocabulary)

  def encode_plus(self, text, add_special_tokens=True, max_length=MAX_LEN_SENTENCE, return_token_type_ids=False, padding='max_length', return_attention_mask=True, return_tensors='pt'):
    sen_len, input_ids, attention_mask = super().encode(text, add_special_tokens, max_length, return_token_type_ids, padding, return_attention_mask, return_tensors)
    if return_attention_mask:
      new_len = random.randint(1, sen_len-1)
      new_attention_mask = torch.cat((torch.ones(new_len), torch.zeros(len(input_ids) - new_len)))
      return {
              'input_ids': torch.tensor(input_ids),
              'attention_mask': new_attention_mask,
              'sen_len': new_len
      }
    else:
      return {
              'input_ids': torch.tensor(input_ids),
              'sen_len': new_len
      }

tokenizer = MyRandomMaskTokenizer(CODE_VOCABULARY)
tokenizer.encode_plus(
            "c 2 t 2",
            add_special_tokens=True,
            max_length=MAX_LEN_SENTENCE,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )


{'input_ids': tensor([ 0, 11,  7, 10,  7,  4,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3]),
 'attention_mask': tensor([1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]),
 'sen_len': 2}

In [268]:
class TranslationDataset(Dataset):
    def __init__(self, src_texts, tgt_texts, src_tokenizer, tgt_tokenizer, src_max_len, tgt_max_len):
        self.src_texts = src_texts
        self.tgt_texts = tgt_texts
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer
        self.src_max_len = src_max_len
        self.tgt_max_len = tgt_max_len

        self.tgt_encodings = [self.tgt_tokenizer.encode_plus(
              txt,
              add_special_tokens=True,
              max_length=self.tgt_max_len,
              return_token_type_ids=False,
              padding='max_length',
              return_attention_mask=True,
              return_tensors='pt'
            ) for txt in self.tgt_texts]

        self.labels=[]
        for tgt_encoding in self.tgt_encodings:
          input_ids = tgt_encoding['input_ids']
          label = input_ids[tgt_encoding['sen_len']]
          self.labels.append(label)

    def __len__(self):
        return len(self.src_texts)

    def __getitem__(self, idx):
        src_encoding = self.src_tokenizer.encode_plus(
            self.src_texts[idx],
            add_special_tokens=True,
            max_length=self.src_max_len,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'source_embeddings': src_encoding['input_ids'].flatten(),
            'target_embeddings': self.tgt_encodings[idx]['input_ids'].flatten(),
            'attention_mask': self.tgt_encodings[idx]['attention_mask'].flatten(),
            'labels': self.labels[idx]
        }



In [291]:
MAX_CODE_LEN=8

n_examples = 200
data = generate_data(n_examples)
data[0].keys()

dict_keys(['shape_list', 'code_str', 'text', 'filename'])

In [292]:
descriptions = [d["text"] for d in data]
code_lists = [d["code_str"] for d in data]

In [293]:
transl_dataset = TranslationDataset(descriptions, code_lists, MyTokenizer(VOCABULARY), MyRandomMaskTokenizer(CODE_VOCABULARY), MAX_LEN_SENTENCE, tgt_max_len=MAX_CODE_LEN)
for example in transl_dataset:
  print(example)

{'source_embeddings': tensor([10,  5,  1, 14, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13]), 'target_embeddings': tensor([ 0,  6, 11,  4,  3,  3,  3,  3]), 'attention_mask': tensor([1., 1., 0., 0., 0., 0., 0., 0.]), 'labels': tensor(11)}
{'source_embeddings': tensor([10,  9,  2, 14, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13]), 'target_embeddings': tensor([ 0,  9, 10,  4,  3,  3,  3,  3]), 'attention_mask': tensor([1., 1., 0., 0., 0., 0., 0., 0.]), 'labels': tensor(10)}
{'source_embeddings': tensor([10,  5,  1,  7,  5,  0,  6,  5,  1, 14, 13, 13, 13, 13, 13, 13]), 'target_embeddings': tensor([ 0,  6, 11,  6, 10,  6, 11,  4]), 'attention_mask': tensor([1., 1., 1., 1., 1., 0., 0., 0.]), 'labels': tensor(6)}
{'source_embeddings': tensor([10,  0,  8,  1,  6,  5,  1, 14, 13, 13, 13, 13, 13, 13, 13, 13]), 'target_embeddings': tensor([ 0,  6, 11,  6, 10,  6, 11,  4]), 'attention_mask': tensor([1., 1., 0., 0., 0., 0., 0., 0.]), 'labels': tensor(11)}
{'source_embeddings': tensor([10,  5,

In [295]:
num_src_tokens=len(VOCABULARY)
num_tgt_tokens=len(CODE_VOCABULARY)
input_dim = 16
hidden_key_dim = 8
hidden_val_dim = 8
num_heads = 2
output_dim = 16
num_layers = 3

transf_enc = MyTransformerEncoder(num_src_tokens, input_dim=input_dim, hidden_key_dim=hidden_key_dim, hidden_val_dim=hidden_val_dim, output_dim=output_dim, max_length=MAX_LEN_SENTENCE, num_layers=num_layers, num_heads=num_heads)
transf_dec = MyTransformerDecoder(num_tgt_tokens, input_dim=input_dim, hidden_key_dim=hidden_key_dim, hidden_val_dim=hidden_val_dim, output_dim=output_dim, enc_emb_dim=output_dim, max_length=MAX_CODE_LEN, num_layers=num_layers, num_heads=num_heads)
model = MyTransformerTranslator(transf_enc, transf_dec, len(CODE_VOCABULARY))

# Step 4: Training
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=200,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    warmup_steps=10,
    weight_decay=0.001,
    logging_dir='./logs',
    logging_steps=10,
    eval_strategy="epoch"
)

# Split dataset into train and eval
train_size = int(0.8 * len(transl_dataset))
eval_size = len(transl_dataset) - train_size
train_dataset, eval_dataset = torch.utils.data.random_split(transl_dataset, [train_size, eval_size])

# Custom Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset
)

trainer.train()

Epoch,Training Loss,Validation Loss
1,2.4939,2.537457
2,2.335,2.285964
3,2.1258,2.190226
4,2.0992,2.112291
5,2.1885,2.029186
6,1.9187,1.96803
7,1.8225,1.880706
8,1.7398,1.793312
9,1.6478,1.727037
10,1.6993,1.654134


TrainOutput(global_step=16000, training_loss=0.8446894781142473, metrics={'train_runtime': 710.8454, 'train_samples_per_second': 45.017, 'train_steps_per_second': 22.508, 'total_flos': 0.0, 'train_loss': 0.8446894781142473, 'epoch': 200.0})

In [300]:
n_examples = 200
data = generate_data(n_examples)
data[0].keys()

dict_keys(['shape_list', 'code_str', 'text', 'filename'])

In [298]:
texts = [d["text"] for d in data]
code_lists = [d["code_str"] for d in data]

In [299]:
eval_dataset = TranslationDataset(texts, code_lists, MyTokenizer(VOCABULARY), MyRandomMaskTokenizer(CODE_VOCABULARY), MAX_LEN_SENTENCE, tgt_max_len=MAX_CODE_LEN)
eval_loader = DataLoader(eval_dataset, batch_size=2)
total_correct = 0
total_samples = len(texts)
model.eval()
with torch.no_grad():
    for batch in eval_loader:
        src_emb = batch['source_embeddings']
        tgt_emb = batch['target_embeddings']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        outputs = model(source_embeddings=src_emb, target_embeddings=tgt_emb, attention_mask=attention_mask)
        _, preds = torch.max(outputs, dim=-1)
        total_correct += (preds == labels).sum().item()

accuracy = total_correct / total_samples
print(f'Accuracy: {accuracy * 100:.2f}%')




Accuracy: 72.50%


In [333]:
def translate(text, src_tokenizer, tgt_tokenizer, src_max_length, tgt_max_length):
    src_tokens = src_tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=src_max_length,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )
    translation = ""
    tgt_tokens = tgt_tokenizer.encode_plus(
            translation,
            add_special_tokens=True,
            max_length=tgt_max_length,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )
    token_count = 1
    while True and token_count < tgt_max_length:
      src_token_list = src_tokens["input_ids"].unsqueeze(0)
      tgt_token_list = tgt_tokens["input_ids"].unsqueeze(0)
      att_mask = torch.cat((torch.ones(token_count), torch.zeros(tgt_token_list.shape[1] - token_count)))
      print(src_token_list)
      print(tgt_token_list)
      print(att_mask)
      print(src_token_list.shape[1])
      print(tgt_token_list.shape[1])
      print(len(att_mask))
      outputs = model(source_embeddings=src_tokens["input_ids"], target_embeddings=tgt_tokens["input_ids"], attention_mask=tgt_tokens["attention_mask"])
      _, pred = torch.max(output, dim=-1)
      next_word = CODE_VOCABULARY[pred]
      if next_word == EOS_TOKEN:
        break
      translation.append(next_word)
    return " ".join(translation)

n_examples = 10
data = generate_data(n_examples)
data[0].keys()

dict_keys(['shape_list', 'code_str', 'text', 'filename'])

In [334]:
for d in data:
  print(translate(d["text"], MyTokenizer(VOCABULARY), MyRandomMaskTokenizer(CODE_VOCABULARY), MAX_LEN_SENTENCE, MAX_CODE_LEN))

tensor([[10,  9,  3, 14, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13]])
tensor([[0, 4, 3, 3, 3, 3, 3, 3]])
tensor([1., 0., 0., 0., 0., 0., 0., 0.])
16
8
8


RuntimeError: The size of tensor a (8) must match the size of tensor b (2) at non-singleton dimension 1