In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader

from transformers import AutoTokenizer

from datapy import DataPy, PreProcessingTransform
from embedding import EmbeddingBlock
from model import MyTransformer, Classification

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
tokenizer = AutoTokenizer.from_pretrained('codeparrot/codeparrot')
tokenizer.add_special_tokens({'pad_token': '[PAD]'})

PAD_IDX = tokenizer.pad_token_id
VOCAB_SIZE = tokenizer.vocab_size
PAD_IDX, VOCAB_SIZE

(32768, 32768)

In [3]:
dataset = DataPy(
  csv_filename='./data/dataset.csv',
  path='./data/',
  transform=PreProcessingTransform(tokenizer)
)

dl = DataLoader(
  dataset,
  batch_size=3,
  shuffle=True,
  collate_fn=DataPy.create_collate_fn(True, PAD_IDX)
)

In [4]:
X_batch, lengths, padding_mask = next(iter(dl))
X_batch.shape, X_batch.dtype, lengths.shape, lengths.dtype, padding_mask.shape

(torch.Size([3, 328]),
 torch.int64,
 torch.Size([3]),
 torch.int64,
 torch.Size([3, 328]))

In [5]:
d_model = 256
max_steps = 64

In [6]:
embedding_block = EmbeddingBlock(VOCAB_SIZE+1, d_model, PAD_IDX, max_steps)

In [7]:
def create_mask_from_lengths(X, lengths):
  batch_size, seq_length = X.shape
  mask = torch.linspace(
    0, seq_length-1, seq_length, dtype=torch.long
  ).expand(batch_size, -1) < lengths.reshape(-1, 1)
  return mask
#create_mask_from_lengths(X_batch, lengths)

In [8]:
batch_size, seq_length = X_batch.shape


In [62]:
import torch.nn.functional as F
class MyEncoderTransformer(nn.Module):
  def __init__(self, d_model, nheads, dropout=0.1, dim_feedforward=2048, activation=F.gelu, device=None, dtype=None):
    factory_kwargs = { 'device': device, 'dtype': dtype }
    super(MyEncoderTransformer, self).__init__()
    
    self.self_attn = nn.MultiheadAttention(d_model, nheads, dropout, True, batch_first=True, **factory_kwargs)
    self.dropout1 = nn.Dropout(dropout)
    self.norm1 = nn.LayerNorm(d_model, **factory_kwargs)

    self.linear1 = nn.Linear(d_model, dim_feedforward, **factory_kwargs)
    self.dropout = nn.Dropout(dropout)
    self.linear2 = nn.Linear(dim_feedforward, d_model, **factory_kwargs)
    self.dropout2 = nn.Dropout(dropout)

    self.norm2 = nn.LayerNorm(d_model, **factory_kwargs)

    self.activation = activation
  
  def _sa_block(self, x, attn_mask, padding_mask) -> torch.Tensor:
    x, _ = self.self_attn(x, x, x, attn_mask=attn_mask, key_padding_mask=padding_mask, need_weights=False)
    return self.dropout1(x)

  def _ff_block(self, x) -> torch.Tensor:
    x = self.linear2(self.dropout(self.activation(self.linear1(x))))
    return self.dropout2(x)

  def forward(self, src, attn_mask=None, padding_mask=None):
    x = src
    x = self.norm1(x + self._sa_block(x, attn_mask, padding_mask))
    x = self.norm2(x + self._ff_block(x))
    return x

class MyTransformer(nn.Module):
  def __init__(self, d_model, nheads, num_layers, dropout=0.1, dim_feedforward=2048, activation=F.gelu, device=None, dtype=None):
    factory_kwargs = { 'device': device, 'dtype': dtype }
    super(MyTransformer, self).__init__()
    self.layers = nn.ModuleList([
      MyEncoderTransformer(d_model, nheads, dropout, dim_feedforward, activation, **factory_kwargs)
      for _ in range(num_layers)
    ])
  
  def forward(self, src, attn_mask=None, padding_mask=None):
    for layer in self.layers:
      src = layer(src, attn_mask, padding_mask)
    return src

In [63]:
model = nn.ModuleDict({
  'backbone': MyTransformer(d_model, 4, 4),
  'classification': Classification(d_model, VOCAB_SIZE+1)
})

In [100]:
code = """import numpy as np"""
new_tokens = 10

In [101]:
with torch.no_grad():
  embedding_block.eval()
  model.eval()

  # [1] Pre-Processing
  input_pp = dataset.transform(code).unsqueeze(0)

  # [2] Embedding First (word embedding)
  input_emb_f = embedding_block.first.word_embedding(input_pp)

  # [3] Noise
  noise = torch.randn(1, new_tokens, d_model)

  # [4] concat origin with noise | get concated and origin mask
  concated = torch.concat([input_emb_f, noise], dim=1)
  origin_mask = torch.linspace(0, concated.shape[1]-1, concated.shape[1], dtype=torch.long) < input_pp.shape[1]
  origin_mask = origin_mask.unsqueeze(0)

  print(concated.shape)

  # Tensor (i)
  tensor = concated.clone()

  for i in range(64):
    step = i

    # [5] Masked Replace
    tensor[origin_mask] = concated[origin_mask]

    # [6] Embedding Block (second)
    tensor = embedding_block.second.forward(tensor, torch.LongTensor([63-step]), ~origin_mask)

    # [7] Backbone Model
    tensor = model.backbone(tensor)

    # [8] Back
  
  # [9] Logits
  logits = model.classification(tensor).argmax(dim=-1)
  
  # [10] Reverse Vocab
  new_text = tokenizer.decode(logits[0].tolist()[-new_tokens:])

# Show Results
print(code + new_text)

torch.Size([1, 14, 256])
import numpy as np [_ [_ [_ [_ [_ [_ [_ [_ [_ [_
