In [2]:
import torch.nn as nn
import torch
from torch.nn import functional as F

In [14]:

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, n_heads) -> None:
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.n_heads = n_heads
        self.head_dims = embed_size // n_heads

        assert embed_size % n_heads == 0

        self.values = nn.Linear(embed_size, embed_size)
        self.keys = nn.Linear(embed_size, embed_size)
        self.queries = nn.Linear(embed_size, embed_size)
        self.fc = nn.Linear(embed_size, embed_size)

        
    def forward(self, values, keys, queries, mask=None):
        # values.shape = [batch_size, seq_len, embed_dim]
        N = values.shape[0]
        values_len = values.shape[1]
        keys_len = keys.shape[1]
        queries_len = queries.shape[1]

        values = values.reshape(N, values_len, self.n_heads, self.head_dims)
        keys = keys.reshape(N, keys_len, self.n_heads, self.head_dims)
        queries = queries.reshape(N, queries_len, self.n_heads, self.head_dims)

        x = self.scaled_dot_product(values, keys, queries, mask)
        return self.fc(x)

        

    def scaled_dot_product(self, values, keys, queries, mask):
        N = values.shape[0]
        queries_len = queries.shape[1]

        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

        if mask is not None:
            energy.masked_fill(mask == 0, float('-1e20'))
        
        attention = F.softmax((energy / (self.embed_size ** 0.5)), dim=3)

        out = torch.einsum('nhqk,nvhd->nqhd', [attention, values]).reshape(N, queries_len, self.n_heads * self.head_dims)
        return out
        

In [15]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, n_heads, drop_out, forward_expension) -> None:
        super(TransformerBlock, self).__init__()
        self.multi_head_attention = MultiHeadAttention(embed_size, n_heads)
        self.norm_1 = nn.LayerNorm(embed_size)
        self.norm_2 = nn.LayerNorm(embed_size)

        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, forward_expension * embed_size),
            nn.ReLU(),
            nn.Linear(forward_expension * embed_size, embed_size),
        )

        self.dropout = nn.Dropout(drop_out)

    def forward(self, values, keys, queries, mask):
        attenion = self.multi_head_attention(values, keys, queries, mask)
        x = self.dropout(self.norm_1(attenion + queries))
        forward = self.feed_forward(x)
        out = self.dropout(self.norm_2(forward + x))
        return out

In [16]:
class Encoder(nn.Module):
    def __init__(
        self, vocab_size, embed_size, n_layers, n_heads, forward_expansion, dropout, device, max_length
        )  -> None:
        super(Encoder, self).__init__()
        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_embedding = nn.Embedding(max_length, embed_size)
        self.layers = nn.ModuleList([
                TransformerBlock(embed_size, n_heads, drop_out=dropout, forward_expension=forward_expansion) 
            for _ in range(n_layers)
        ]
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        N, sequence_length = x.shape
        positions = torch.arange(0, sequence_length).expand(N, sequence_length).to(self.device)
        x = self.dropout(self.word_embedding(x) + self.positional_embedding(positions))
        for layer in self.layers:
            x = layer(x, x, x, mask)
        return x


In [17]:
class DecoderBlock(nn.Module):
    def __init__(self, embed_size, n_heads, forward_expension, dropout, device) -> None:
        super(DecoderBlock, self).__init__()
        self.attention = MultiHeadAttention(embed_size, n_heads)
        self.norm = nn.LayerNorm(embed_size)
        self.transformer_block = TransformerBlock(
            embed_size, n_heads, dropout, forward_expension
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, values, keys, src_mask, trg_mask):
        attention = self.attention(x, x, x, trg_mask)
        queries = self.dropout(self.norm(attention + x))

        out = self.transformer_block(values, keys, queries, src_mask)
        return out

class Decoder(nn.Module):
    def __init__(self, trg_vocab_size, embed_size, num_layers, n_heads, forward_expansion, dropout, device, max_length) -> None:
        super(Decoder, self).__init__()
        self.device = device

        self.word_embeddings = nn.Embedding(trg_vocab_size, embed_size)
        self.positional_embeddings = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList([
            DecoderBlock(embed_size, n_heads, forward_expansion, dropout, device)
            for _ in range(num_layers)
        ])

        self.fc = nn.Linear(embed_size, trg_vocab_size)
        self.dropout = nn.Dropout(dropout)


    def forward(self, x, enc_out, src_mask, trg_mask):
        N, sequence_length = x.shape[0], x.shape[1]
        positions = torch.arange(0, sequence_length).expand(N, sequence_length).to(device=self.device)
        x = self.dropout(self.word_embeddings(x) + self.positional_embeddings(positions))
        

        for layer in self.layers:
            x = layer(x, enc_out, enc_out, src_mask, trg_mask)

        out = self.fc(x)
        return out


class Transformer(nn.Module):
    def __init__(self, src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, embed_size=256, num_layers=6, forward_expansion=4, heads=8, dropout=0, device='cuda', max_length=100) -> None:
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, device, max_length)
        self.decoder = Decoder(trg_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, device, max_length)

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        enc_src = self.encoder(src, src_mask)
        pred = self.decoder(trg, enc_src, src_mask, trg_mask)
        return pred


    def make_src_mask(self, src):
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask.to(self.device)

    def make_trg_mask(self, trg):
        N, trg_len = trg.shape
        trg_mask = torch.tril(torch.ones(trg_len, trg_len)).expand(
            N, 1, trg_len, trg_len
        )
        return trg_mask.to(self.device)




In [49]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

x = torch.tensor([[1, 2, 3, 4, 5, 6]]).to(device)
y = torch.tensor([[4, 1, 3, 9, 5, 8]]).to(device)

src_pad_idx = 0
trg_pad_idx = 0
src_vocab_size = 8
trg_vocab_size = 10

model = Transformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, device=device).to(device)

out = model(x, y[:, :1])
probs, indx = torch.max(out, axis=-1)
translation = torch.tensor([[1, 2]])
for _ in range(99):
    out = model(x, translation)
    probs, indx = torch.max(out, axis=-1)
    translation = torch.concat([translation[0], indx[0, -1:]]).unsqueeze(0)
    # translation = torch.concat([translation[0], indx[0, -1:]]).unsqueeze(0)
translation

cpu


tensor([[1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
         3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
         3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
         3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
         3, 3, 3, 3, 3]])

In [58]:
import json
import os
filepath = "../data/python/final/jsonl/train/python_train_0.jsonl"
with open(filepath) as f:
    data = [json.loads(line) for line in f]

In [78]:
import pprint
pprint.pprint(
data[1]["code"]
)

('def predict(X_img_path, knn_clf=None, model_path=None, '
 'distance_threshold=0.6):\n'
 '    """\n'
 '    Recognizes faces in given image using a trained KNN classifier\n'
 '\n'
 '    :param X_img_path: path to image to be recognized\n'
 '    :param knn_clf: (optional) a knn classifier object. if not specified, '
 'model_save_path must be specified.\n'
 '    :param model_path: (optional) path to a pickled knn classifier. if not '
 'specified, model_save_path must be knn_clf.\n'
 '    :param distance_threshold: (optional) distance threshold for face '
 'classification. the larger it is, the more chance\n'
 '           of mis-classifying an unknown person as a known one.\n'
 '    :return: a list of names and face locations for the recognized faces in '
 'the image: [(name, bounding box), ...].\n'
 "        For faces of unrecognized persons, the name 'unknown' will be "
 'returned.\n'
 '    """\n'
 '    if not os.path.isfile(X_img_path) or os.path.splitext(X_img_path)[1][1:] '
 'not in 

In [76]:
valid_samples = 0
invalid_samples = 0
for row in data:
    func_length = len(row["code"].split('"""'))
    func_length_single_quote = len(row["code"].split("'''"))
    if func_length == 3 or func_length_single_quote == 3:
        valid_samples += 1
        val_example = row
    else:
        invalid_samples +=1
        inval_example = row
print(f"Percentage valid: {valid_samples / (valid_samples + invalid_samples)}")



Percentage valid: 0.96


In [10]:
import torch
from torch import nn
pos_emb = nn.Parameter(torch.randn(1, 12, 10))
print(pos_emb)
pos_emb[:, :2, :]


Parameter containing:
tensor([[[ 0.7726,  0.1959, -0.2742, -0.4414, -0.1205,  0.5896,  0.2676,
           0.2441,  1.3759, -0.0921],
         [-0.7718, -1.2759, -0.1908,  0.4687,  0.5650,  0.2740,  0.0949,
           0.0123,  1.1677,  0.1628],
         [ 0.2920, -1.5287, -1.2610, -0.3102,  0.0399, -0.5319,  0.2206,
          -0.8736,  0.2447, -0.4729],
         [ 0.4001,  0.0745,  0.8375, -0.1703, -1.0116, -1.2133,  1.6221,
          -0.9419, -1.3988,  0.0463],
         [ 0.9865, -0.9952,  0.4861, -0.0090,  0.9086, -0.1690,  0.3133,
           1.3860, -0.1374, -0.4883],
         [ 1.0023, -1.8284,  0.3466,  1.0263, -1.5314, -1.4405,  1.4825,
          -0.6440, -0.5110, -0.3031],
         [-0.9094,  1.4731, -0.1728, -0.2754,  1.6843,  0.8862,  0.2473,
           0.0916, -0.3818,  1.0779],
         [ 0.6954, -0.9430, -0.3950,  0.9595, -1.8052,  0.6318,  1.0113,
          -0.1440, -1.6109, -0.8765],
         [-0.4137, -0.9999, -0.5800,  2.2038,  0.7193, -0.5238, -0.2172,
           0.6618

tensor([[[ 0.7726,  0.1959, -0.2742, -0.4414, -0.1205,  0.5896,  0.2676,
           0.2441,  1.3759, -0.0921],
         [-0.7718, -1.2759, -0.1908,  0.4687,  0.5650,  0.2740,  0.0949,
           0.0123,  1.1677,  0.1628]]], grad_fn=<SliceBackward0>)

In [None]:
class GPTConfig:

    embedding_dropout = 0.1
    residual_dropout = 0.1
    attention_dropout = 0.1

class GPT2Config(GPTConfig):
    embedding_size = 768
    n_heads = 12
    n_layers = 12
    max_sequence_length = 256

class CausalSelfAttention(nn.Module):

    def __init__(self, config: GPT2Config) -> None:
        super().__init__()
        assert config.embedding_size % config.n_heads == 0
        self.values = nn.Linear(config.embedding_size, config.embedding_size)
        self.keys = nn.Linear(config.embedding_size, config.embedding_size)
        self.queries = nn.Linear(config.embedding_size, config.embedding_size)

        self.attention_dropout = nn.Dropout(config.attention_dropout)
        self.residual_dropout = nn.Dropout(config.residual_dropout)

        self.fc = nn.Linear(config.embedding_size, config.embedding_size)
        
        # triangular lower filled with ones 
        self.causal_mask = torch.tril(torch.ones(config.max_sequence_length, config.max_sequence_length)).view(1, 1, config.embedding_size, config.embedding_size)

        self.n_heads = config.n_heads

    def forward(self, x):
        N, sequence_length, embed_size = x.shape

        # B, sequence_length, n_heads, head_size
        values = self.values(x).view(N, sequence_length, self.n_heads, embed_size // self.n_heads)
        keys = self.keys(x).view(N, sequence_length, self.n_heads, embed_size // self.n_heads)
        queries = self.queries(x).view(N, sequence_length, self.n_heads, embed_size // self.n_heads)

        attention = torch.einsum('nqhd,nkhd->nhqk', [queries, keys]) * ( 1 / torch.sqrt(keys.shape[0]))
        attention = attention.masked_fill(self.causal_mask == 0, float('-1e20'))
        attention = F.softmax(attention, dim=-1)
        attention = self.attention_dropout(attention)
        out = torch.einsum('nhqk,nkhd->nqhd', [attention, values]).reshape(N, sequence_length, embed_size)
        out = self.residual_dropout(out)
        return out
        
class Decoder(nn.Module):

    def __init__(self, config: GPT2Config) -> None:
        super().__init__()
        self.norm_1 = nn.LayerNorm(config.embedding_size)
        self.norm_2 = nn.LayerNorm(config.embedding_size)
        self.attention = CausalSelfAttention(config)
        self.feed_forward = nn.Sequential(
            nn.Linear(config.embedding_size, config.embedding_size * 4),
            nn.GELU(),
            nn.Linear(config.embedding_size * 4, config.embedding_size),
            nn.Dropout(config.residual_dropout)
        )

    def forward(self, x):
        x = x + self.attention(self.norm_1(x))
        x = x + self.feed_forward(self.norm_2(x))
        return x


class GPT(nn.Module):
    """  the full GPT language model, with a context size of block_size """

    def __init__(self, config: GPT2Config):
        super().__init__()

        self.word_embedding = nn.Embedding(config.vocab_size, config.embedding_size)
        self.positional_embedding = nn.parameter(1, config.max_sequence_length, config.embedding_size)
        self.dropout = nn.Dropout(config.embedding_dropout)

        self.layers = nn.ModuleList(
            [
                Decoder(config) for _ in config.n_layers
            ]
        )

        self.layer_norm = nn.LayerNorm(config.embedding_size)
        self.fc = nn.Linear(config.embedding_size, config.vocab_size, bias=False)


    def forward(self, x):
        _, sequence_length = x.shape

        # word_embedding.shape = batch_size x sequence_length x embedding_dim
        word_embedding = self.word_embedding(x)
        # positional_encoding.shape = 1 x sequence_length x embedding_dim
        positional_encoding = self.positional_embedding[:, :sequence_length, :]
        x = self.dropout(word_embedding + positional_encoding)
        x = self.layers(x)
        x = self.layer_norm(x)
        logits = self.fc(x)
        return logits



tensor([[[0.0444,   -inf,   -inf],
         [0.0729, 0.5914,   -inf]]])