In [2]:
import torch
import torch.nn as nn
import torch.optim as optim

# import torchtext
# from torchtext.legacy.data import Field, BucketIterator#, Iterator
# from torchtext.legacy import data

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

# import spacy
import numpy as np
import pandas as pd

import random
import math
import time
import io


In [None]:
torchtext.__version__

# Reading the text file

In [None]:
f = open(r"english_python_data.txt", "r", encoding="utf8")
file_lines = f.readlines()

In [None]:
file_lines[:7]

In [None]:
dps = []
dp = None
for line in file_lines:
    if line[0] == "#":
        if dp:
            dp['solution'] = ''.join(dp['solution'])
            dps.append(dp)
        dp = {"question": None, "solution": []}
        dp['question'] = line[1:]
    else:
        dp["solution"].append(line)

In [None]:
i=0
for dp in dps:
    print("\n Question no: ", i+1)
    i+=1
    print(dp['question'][1:])
    print(dp['solution'])
    if i>4:
        break

In [None]:
print("Dataset size:", len(dps))

## Using a custom tokenizer to tokenize python code

Python is a programming language with its own unique syntax. Regular tokenizers like spacy are meant to tokenize english scentences and are not optimized towards Python's syntax. Here, we write our own custom tokenizer that makes use of Python's default tokenize library. When we make use of this library we only extract the token type and the token string.

In [None]:
from tokenize import tokenize, untokenize


def tokenize_python_code(python_code_str):
    python_tokens = list(tokenize(io.BytesIO(python_code_str.encode('utf-8')).readline))
    tokenized_output = []
    for i in range(0, len(python_tokens)):
        tokenized_output.append((python_tokens[i].type, python_tokens[i].string))
    return tokenized_output

In [None]:
tokenized_sample = tokenize_python_code(dps[1]['solution'])
print(tokenized_sample)

In [None]:
print(untokenize(tokenized_sample).decode('utf-8'))

Since we have mere 5000 data points, we make use of data augmentations to increase the size of our dataset. While tokenizing the python code, we mask the names of certain variables randomly(with 'var_1, 'var_2' etc) to ensure that the model that we train does not merly fixate on the way the variables are named and actually tries to understand the inhrent logic and syntax of the python code.

But, while randomly picking varibles to mask we avoid keyword literals(keyword.kwlist), control structures(as can be seen in below skip_list) and object properties. We add all such literals that need to be skipped into the skip_list

In [None]:
skip_list = ['range', 'enumerate', 'print', 'ord', 'int', 'float', 'char', 'list', 'dict', 'tuple', 'set', 'len', 'sum', 'min', 'max']

In [None]:
import keyword

print(keyword.kwlist)

In [None]:
def augment_tokenize_python_code(python_code_str, mask_factor=0.3):


    var_dict = {} # Dictionary that stores masked variables

    # certain reserved words that should not be treated as normal variables and
    # hence need to be skipped from our variable mask augmentations
    skip_list = ['range', 'enumerate', 'print', 'ord', 'int', 'float', 'zip'
                 'char', 'list', 'dict', 'tuple', 'set', 'len', 'sum', 'min', 'max']
    skip_list.extend(keyword.kwlist)

    var_counter = 1
    python_tokens = list(tokenize(io.BytesIO(python_code_str.encode('utf-8')).readline))
    tokenized_output = []

    for i in range(0, len(python_tokens)):
        if python_tokens[i].type == 1 and python_tokens[i].string not in skip_list:
        
            if i>0 and python_tokens[i-1].string in ['def', '.', 'import', 'raise', 'except', 'class']: # avoid masking modules, functions and error literals
                skip_list.append(python_tokens[i].string)
                tokenized_output.append((python_tokens[i].type, python_tokens[i].string))
            elif python_tokens[i].string in var_dict:  # if variable is already masked
                tokenized_output.append((python_tokens[i].type, var_dict[python_tokens[i].string]))
            elif random.uniform(0, 1) > 1-mask_factor: # randomly mask variables
                var_dict[python_tokens[i].string] = 'var_' + str(var_counter)
                var_counter+=1
                tokenized_output.append((python_tokens[i].type, var_dict[python_tokens[i].string]))
            else:
                skip_list.append(python_tokens[i].string)
                tokenized_output.append((python_tokens[i].type, python_tokens[i].string))
      
        else:
            tokenized_output.append((python_tokens[i].type, python_tokens[i].string))
    
    return tokenized_output

In [None]:
tokenized_sample = augment_tokenize_python_code(dps[1]['solution'])
print(tokenized_sample)

In [None]:
print(untokenize(tokenized_sample).decode('utf-8'))

## Building Train and Validation Dataset

In [None]:
python_problems_df = pd.DataFrame(dps)
python_problems_df.head()

In [None]:
python_problems_df.shape

In [None]:
from sklearn.model_selection import train_test_split

train_df, val_df = train_test_split(python_problems_df, test_size=0.15)

In [None]:
train_df.shape

In [None]:
val_df.shape

# Creating vocabulary using torchtext

In this section we will use torchtext Fields to construct the vocabulary for our sequence-to-sequence learning problem.

In [None]:
SEED = 1234

random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:
spacy.load('en_core_web_sm')

In [None]:
Input = data.Field(tokenize = 'spacy', init_token='', eos_token='', lower=True)

Output = data.Field(tokenize = augment_tokenize_python_code, init_token='', eos_token='', lower=False)

In [None]:
fields = [('Input', Input),('Output', Output)]

Since our data augmentations have the potential to increase the vocabulary beyond what it initially is, we must ensure that we capture as many variations as possible in the vocabulary that we develop. In the the below code we apply our data augmentations 100 times to ensure that we can capture a majority of augmentations into our vocabulary.

In [None]:
train_example = []
val_example = []

train_expansion_factor = 100
for j in range(train_expansion_factor):
  for i in range(train_df.shape[0]):
      try:
          ex = data.Example.fromlist([train_df.question[i], train_df.solution[i]], fields)
          train_example.append(ex)
      except:
          pass

for i in range(val_df.shape[0]):
    try:
        ex = data.Example.fromlist([val_df.question[i], val_df.solution[i]], fields)
        val_example.append(ex)
    except:
        pass    

In [None]:
print(len(train_example))
print(len(val_example))

In [None]:
train_data = data.Dataset(train_example, fields)
valid_data =  data.Dataset(val_example, fields)

In [None]:
Input.build_vocab(train_data, min_freq = 0)
Output.build_vocab(train_data, min_freq = 0)

In [None]:
Input.vocab
     

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
train_data[0].Output

In [None]:
print(vars(train_data.examples[1]))

Our Encoder accepts a batch of source sequences and sequence masks as input. The source mask contains 1 in locations where the input sequence has valid values and 0 where the input sequence has values. This ensures that the attention mechanism within the encoder does not pay attention to values.

We convert our source sequence tokens into embeddings(‘tok_embedding’) of ‘hid_dim’ length. Since were are not using any recurrent networks we need to tag each token with its positional indices in order to preserve sequential information. We create an indices tensor(i.e. ‘pos’) and convert this into an embedding(‘pos_embedding’) of length ‘hid_dim’. This is combined with the source sequence embeddings to create our initial Encoder Layer input tensor src. This src tensor is passed through a series of Encoder Layers.

In [None]:
class Encoder(nn.Module):
    def __init__(self, 
                 input_dim, 
                 hid_dim, 
                 n_layers, 
                 n_heads, 
                 pf_dim,
                 dropout, 
                 device,
                 max_length = 1000):
        super().__init__()

        self.device = device
        
        self.tok_embedding = nn.Embedding(input_dim, hid_dim)
        self.pos_embedding = nn.Embedding(max_length, hid_dim)
        
        self.layers = nn.ModuleList([EncoderLayer(hid_dim, 
                                                  n_heads, 
                                                  pf_dim,
                                                  dropout, 
                                                  device) 
                                     for _ in range(n_layers)])
        
        self.dropout = nn.Dropout(dropout)
        
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim])).to(device)
        
    def forward(self, src, src_mask):
        
        #src = [batch size, src len]
        #src_mask = [batch size, 1, 1, src len]
        
        batch_size = src.shape[0]
        src_len = src.shape[1]

        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        
        #pos = [batch size, src len]
        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))
        
        #src = [batch size, src len, hid dim]
        
        for layer in self.layers:
            src = layer(src, src_mask)
            
        #src = [batch size, src len, hid dim]
            
        return src

An EncoderLayer is the basic building block of our Transformer’s Encoder component. Our src tensor along with its ‘src_mask’ are sent into a multi-head self-attention operation to help our model focus on the necessary aspects of the src tensor. The output from the attention operation is combined with the src tensor(via skip connection) and normalized to avoid vanishing/exploding gradients(during training). This combined output is sent into a PositionwiseFeedForwardLayer.

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, 
                 hid_dim, 
                 n_heads, 
                 pf_dim,  
                 dropout, 
                 device):
        super().__init__()
        
        self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.ff_layer_norm = nn.LayerNorm(hid_dim)
        self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hid_dim, 
                                                                     pf_dim, 
                                                                     dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src, src_mask):
        
        #src = [batch size, src len, hid dim]
        #src_mask = [batch size, 1, 1, src len] 
                
        #self attention
        _src, _ = self.self_attention(src, src, src, src_mask)
        
        #dropout, residual connection and layer norm
        src = self.self_attn_layer_norm(src + self.dropout(_src))
        
        #src = [batch size, src len, hid dim]
        
        #positionwise feedforward
        _src = self.positionwise_feedforward(src)
        
        #dropout, residual and layer norm
        src = self.ff_layer_norm(src + self.dropout(_src))
        
        #src = [batch size, src len, hid dim]
        
        return src
     

A PositionwiseFeedForwardLayer takes the combined input and processes it further using two fully connected layers and a Relu activation function between them. This in combination with the src embedding is the final output of an EncoderLayer. This process repeats for each EncoderLayer block.