In [1]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

Looking in indexes: https://download.pytorch.org/whl/cu118


In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import itertools

# Synthetic dataset of simple code snippets and their outputs
data = [
    ("print('hello')", "hello"),
    ("x = 5\nprint(x)", "5"),
    ("a = 10\nb = 20\nprint(a + b)", "30"),
    ("for i in range(3):\n  print(i)", "0\n1\n2"),
    ("def greet(name):\n  return f'Hi, {name}'\nprint(greet('Alice'))", "Hi, Alice")
]

# Separate input and output sequences
input_sequences, output_sequences = zip(*data)

print("Input Sequences:")
for seq in input_sequences:
    print(seq)

print("\nOutput Sequences:")
for seq in output_sequences:
    print(seq)

Input Sequences:
print('hello')
x = 5
print(x)
a = 10
b = 20
print(a + b)
for i in range(3):
  print(i)
def greet(name):
  return f'Hi, {name}'
print(greet('Alice'))

Output Sequences:
hello
5
30
0
1
2
Hi, Alice


In [3]:
# Tokenize and build vocabulary for input sequences
input_tokens = [list(seq) for seq in input_sequences]
input_vocab = Counter(itertools.chain(*input_tokens))
input_stoi = {token: i + 2 for i, (token, count) in enumerate(input_vocab.most_common())}
input_stoi['<pad>'] = 0
input_stoi['<unk>'] = 1
input_itos = {i: token for token, i in input_stoi.items()}

# Tokenize and build vocabulary for output sequences
output_tokens = [list(seq) for seq in output_sequences]
output_vocab = Counter(itertools.chain(*output_tokens))
output_stoi = {token: i + 2 for i, (token, count) in enumerate(output_vocab.most_common())}
output_stoi['<pad>'] = 0
output_stoi['<unk>'] = 1
output_stoi['<sos>'] = len(output_stoi) # Start of sequence token
output_stoi['<eos>'] = len(output_stoi) # End of sequence token
output_itos = {i: token for token, i in output_stoi.items()}

print("Input Vocabulary Size:", len(input_stoi))
print("Output Vocabulary Size:", len(output_stoi))
print("\nInput stoi:", input_stoi)
print("\nOutput stoi:", output_stoi)

Input Vocabulary Size: 38
Output Vocabulary Size: 20

Input stoi: {' ': 2, 'r': 3, 'e': 4, 'i': 5, 'n': 6, 't': 7, '(': 8, ')': 9, "'": 10, '\n': 11, 'p': 12, 'a': 13, 'l': 14, '=': 15, 'f': 16, 'g': 17, 'o': 18, 'x': 19, '0': 20, 'b': 21, ':': 22, 'm': 23, 'h': 24, '5': 25, '1': 26, '2': 27, '+': 28, '3': 29, 'd': 30, 'u': 31, 'H': 32, ',': 33, '{': 34, '}': 35, 'A': 36, 'c': 37, '<pad>': 0, '<unk>': 1}

Output stoi: {'l': 2, 'e': 3, '0': 4, '\n': 5, 'i': 6, 'h': 7, 'o': 8, '5': 9, '3': 10, '1': 11, '2': 12, 'H': 13, ',': 14, ' ': 15, 'A': 16, 'c': 17, '<pad>': 0, '<unk>': 1, '<sos>': 18, '<eos>': 19}


In [4]:
# Convert tokens to numerical sequences
input_numerical_sequences = [[input_stoi.get(token, input_stoi['<unk>']) for token in seq] for seq in input_tokens]
output_numerical_sequences = [[output_stoi.get(token, output_stoi['<unk>']) for token in seq] for seq in output_tokens]

# Add <sos> and <eos> tokens to output sequences
output_numerical_sequences = [[output_stoi['<sos>']] + seq + [output_stoi['<eos>']] for seq in output_numerical_sequences]

# Determine maximum sequence lengths for padding
max_input_length = max(len(seq) for seq in input_numerical_sequences)
max_output_length = max(len(seq) for seq in output_numerical_sequences)

# Pad sequences
padded_input_sequences = [seq + [input_stoi['<pad>']] * (max_input_length - len(seq)) for seq in input_numerical_sequences]
padded_output_sequences = [seq + [output_stoi['<pad>']] * (max_output_length - len(seq)) for seq in output_numerical_sequences]

print("Padded Input Sequences (first):", padded_input_sequences[0])
print("Padded Output Sequences (first):", padded_output_sequences[0])
print("\nMax Input Length:", max_input_length)
print("Max Output Length:", max_output_length)

Padded Input Sequences (first): [12, 3, 5, 6, 7, 8, 10, 24, 4, 14, 14, 18, 10, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Padded Output Sequences (first): [18, 7, 3, 2, 2, 8, 19, 0, 0, 0, 0]

Max Input Length: 61
Max Output Length: 11


In [5]:
# Convert padded sequences to PyTorch tensors
input_tensors = torch.LongTensor(padded_input_sequences)
output_tensors = torch.LongTensor(padded_output_sequences)

# For teacher forcing, the target is the output sequence shifted by one position
target_tensors = output_tensors[:, 1:] # Exclude the <sos> token

print("Input Tensors shape:", input_tensors.shape)
print("Output Tensors shape:", output_tensors.shape)
print("Target Tensors shape:", target_tensors.shape)

Input Tensors shape: torch.Size([5, 61])
Output Tensors shape: torch.Size([5, 11])
Target Tensors shape: torch.Size([5, 10])


In [6]:
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()

        self.hid_dim = hid_dim
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src shape: (sequence_length, batch_size)

        embedded = self.dropout(self.embedding(src))
        # embedded shape: (sequence_length, batch_size, emb_dim)

        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs shape: (sequence_length, batch_size, hid_dim * num_directions)
        # hidden shape: (n_layers * num_directions, batch_size, hid_dim)
        # cell shape: (n_layers * num_directions, batch_size, hid_dim)

        # We only need the final hidden and cell states for the decoder
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()

        self.output_dim = output_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers

        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim + hid_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(emb_dim + hid_dim * 2, output_dim) # Adjust size for concat
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input shape: (batch_size) -> needs unsqueezing
        # hidden shape: (n_layers, batch_size, hid_dim)
        # cell shape: (n_layers, batch_size, hid_dim)

        input = input.unsqueeze(0)
        # input shape: (1, batch_size)

        embedded = self.dropout(self.embedding(input))
        # embedded shape: (1, batch_size, emb_dim)

        # Context vector from encoder (last hidden state)
        # We will concatenate the embedded input with the context vector
        context = hidden[-1, :, :].unsqueeze(0) # Using the last layer's hidden state as context
        # context shape: (1, batch_size, hid_dim)


        rnn_input = torch.cat((embedded, context), dim=2)
        # rnn_input shape: (1, batch_size, emb_dim + hid_dim)


        output, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        # output shape: (1, batch_size, hid_dim * num_directions)
        # hidden shape: (n_layers * num_directions, batch_size, hid_dim)
        # cell shape: (n_layers * num_directions, batch_size, hid_dim)

        # output is from the top RNN layer
        # hidden and cell are from all layers

        # For the linear layer, we need to concatenate the output, embedded input, and context
        output = output.squeeze(0) # Remove the sequence length dimension (which is 1)
        embedded = embedded.squeeze(0) # Remove the sequence length dimension (which is 1)
        context = context.squeeze(0) # Remove the sequence length dimension (which is 1)


        prediction = self.fc_out(torch.cat((output, embedded, context), dim=1))
        # prediction shape: (batch_size, output_dim)


        return prediction, hidden, cell


In [7]:
import random
import torch.optim as optim
import torch.nn.functional as F

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

        assert encoder.hid_dim == decoder.hid_dim, \
            "Hidden dimensions of encoder and decoder must be equal!"
        assert encoder.n_layers == decoder.n_layers, \
            "Number of layers of encoder and decoder must be equal!"

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src shape: (sequence_length, batch_size)
        # trg shape: (sequence_length, batch_size)
        # teacher_forcing_ratio is probability to use teacher forcing

        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        # Tensor to store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        # Encoder outputs the final hidden and cell states
        hidden, cell = self.encoder(src)

        # First input to the decoder is the <sos> tokens
        input = trg[0, :]

        for t in range(1, trg_len):
            # Insert input token embedding, previous hidden and cell states
            # receive output prediction and new hidden and cell states
            output, hidden, cell = self.decoder(input, hidden, cell)

            # Place predictions in a tensor holding predictions for each token
            outputs[t] = output

            # Decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio

            # Get the highest predicted token from the output
            top1 = output.argmax(1)

            # If teacher forcing, use actual next token as next input
            # if not, use predicted token
            input = trg[t, :] if teacher_force else top1

        return outputs

# Instantiate models and set up training parameters
INPUT_DIM = len(input_stoi)
OUTPUT_DIM = len(output_stoi)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)

model = Seq2Seq(encoder, decoder, device).to(device)

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
TRG_PAD_IDX = output_stoi['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

# Simple DataLoader (for demonstration purposes, a proper DataLoader should be used)
# Assuming input_tensors and output_tensors are already created and on the correct device
# In a real scenario, you would create a Dataset and DataLoader
# For this simple case, we'll treat the entire dataset as a single batch for demonstration
train_input = input_tensors.T.to(device) # Transpose to get (sequence_length, batch_size)
train_output = output_tensors.T.to(device) # Transpose to get (sequence_length, batch_size)

# Training loop (one epoch for demonstration)
def train(model, src, trg, optimizer, criterion, clip):
    model.train()
    optimizer.zero_grad()

    output = model(src, trg)

    # trg shape: (trg_len, batch_size)
    # output shape: (trg_len, batch_size, output_dim)

    output_dim = output.shape[-1]

    # Reshape for criterion (batch_size * trg_len, output_dim)
    # Target needs to be (batch_size * trg_len)
    output = output[1:].view(-1, output_dim) # Exclude <sos> token output
    trg = trg[1:].contiguous().view(-1)     # Exclude <sos> token target

    loss = criterion(output, trg)

    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
    optimizer.step()

    return loss.item()

# Training (demonstration with a single "batch")
CLIP = 1
loss = train(model, train_input, train_output, optimizer, criterion, CLIP)

print(f'Loss after one training step: {loss:.4f}')

Loss after one training step: 3.0091


In [10]:
import numpy as np
import nltk
from nltk.translate.bleu_score import sentence_bleu

def evaluate(model, src, trg, criterion, output_itos, trg_pad_idx, device):
    model.eval()
    epoch_loss = 0
    all_predicted_tokens = []
    all_target_tokens = []

    with torch.no_grad():
        # Iterate through the data (treating each sequence as a batch for simplicity)
        for i in range(src.shape[1]):
            single_src = src[:, i].unsqueeze(1) # Shape: (seq_len, 1)
            single_trg = trg[:, i].unsqueeze(1) # Shape: (seq_len, 1)

            # Forward pass to get encoder hidden and cell states
            hidden, cell = model.encoder(single_src)

            # Decoder's first input is the <sos> token
            input = single_trg[0, :] # Shape: (1)

            # Tensor to store decoder outputs for this sequence
            # Initialize with a size based on max_output_length to avoid index errors
            outputs = torch.zeros(max_output_length, model.decoder.output_dim).to(device)
            predicted_tokens = []

            # Greedy decoding
            for t in range(1, max_output_length): # Iterate up to max_output_length
                output, hidden, cell = model.decoder(input, hidden, cell)

                # Store the output for loss calculation
                outputs[t] = output.squeeze(0)

                # Get the highest predicted token
                top1 = output.argmax(1)

                # Use the predicted token as the next input
                input = top1

                # Convert predicted token index back to token and store
                predicted_token = output_itos[top1.item()]
                if predicted_token == '<eos>':
                    break # Stop decoding if <eos> is predicted
                if predicted_token != '<pad>': # Don't include padding in prediction
                     predicted_tokens.append(predicted_token)


            # Calculate loss for this sequence
            # Only consider the predicted tokens up to the point decoding stopped or max length reached
            actual_len = min(t + 1, single_trg.shape[0]) # Length of target sequence including <eos>
            predicted_len = len(predicted_tokens) + 1 # Length of predicted sequence including <sos> and potential <eos>

            # Ensure that the output tensor used for loss calculation matches the length of the target tensor slice
            loss = criterion(outputs[1:actual_len].view(-1, outputs.shape[-1]), single_trg[1:actual_len].contiguous().view(-1))
            epoch_loss += loss.item()


            # Convert target sequence to tokens for BLEU calculation
            # Exclude <sos> and <eos> for BLEU calculation
            target_tokens = [output_itos[token.item()] for token in single_trg[1:] if output_itos[token.item()] != '<eos>' and token.item() != trg_pad_idx]

            # Convert token lists to tuples of strings for nltk
            all_predicted_tokens.append(tuple(predicted_tokens))
            all_target_tokens.append(tuple(target_tokens))

    # Calculate average loss
    avg_loss = epoch_loss / src.shape[1]

    # Calculate BLEU score (corpus BLEU is more robust for small datasets)
    # Need to format for sentence_bleu or corpus_bleu
    # reference_corpus = list of list of tokens (each inner list is a reference)
    # candidate_corpus = list of tokens
    reference_corpus = [[list(tokens)] for tokens in all_target_tokens] # sentence_bleu expects a list of references
    candidate_corpus = [list(tokens) for tokens in all_predicted_tokens]

    # Calculate sentence BLEU for each example and average
    # Ensure references are lists of lists, candidates are lists
    bleu_scores = [sentence_bleu([list(ref)], list(cand)) for ref, cand in zip(all_target_tokens, all_predicted_tokens)]
    avg_bleu = np.mean(bleu_scores)


    return avg_loss, avg_bleu

# Call the evaluation function
eval_loss, eval_bleu = evaluate(model, train_input, train_output, criterion, output_itos, TRG_PAD_IDX, device)

print(f'Evaluation Loss: {eval_loss:.4f}')
print(f'Evaluation BLEU Score: {eval_bleu:.4f}')

Evaluation Loss: 2.8140
Evaluation BLEU Score: 0.0000


The hypothesis contains 0 counts of 2-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 3-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 4-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()


In [11]:
import torch

def translate_sequence(sequence, model, input_stoi, output_itos, device, max_output_length=50):
    model.eval() # Set model to evaluation mode

    with torch.no_grad():
        # Preprocess the input sequence
        tokens = [input_stoi.get(token, input_stoi['<unk>']) for token in list(sequence)]
        src_tensor = torch.LongTensor(tokens).unsqueeze(1).to(device) # Add batch dimension and move to device

        # Pass through encoder
        hidden, cell = model.encoder(src_tensor)

        # Initialize decoder input with <sos> token
        input_tensor = torch.LongTensor([output_stoi['<sos>']]).to(device) # Shape: (1)

        generated_indices = []

        # Generate output sequence
        for _ in range(max_output_length):
            # Pass current input and previous hidden/cell states to decoder
            output, hidden, cell = model.decoder(input_tensor, hidden, cell)

            # Get the index of the predicted next token
            predicted_token_index = output.argmax(1).item()
            generated_indices.append(predicted_token_index)

            # Stop if <eos> token is predicted
            if predicted_token_index == output_stoi['<eos>']:
                break

            # Use the predicted token as the input for the next step
            input_tensor = torch.LongTensor([predicted_token_index]).to(device)

        # Convert generated indices back to tokens
        generated_tokens = [output_itos[index] for index in generated_indices]

        # Remove <sos> and <eos> tokens if present
        if generated_tokens[0] == '<sos>':
            generated_tokens = generated_tokens[1:]
        if generated_tokens and generated_tokens[-1] == '<eos>':
            generated_tokens = generated_tokens[:-1]

    return ''.join(generated_tokens) # Join tokens back into a string

# Demonstrate usage with an example input sequence
example_input = "print('hello')"
translated_output = translate_sequence(example_input, model, input_stoi, output_itos, device)

print(f"Input: {example_input}")
print(f"Generated Output: {translated_output}")

example_input_2 = "x = 5\nprint(x)"
translated_output_2 = translate_sequence(example_input_2, model, input_stoi, output_itos, device)

print(f"\nInput: {example_input_2}")
print(f"Generated Output: {translated_output_2}")

Input: print('hello')
Generated Output: 01

Input: x = 5
print(x)
Generated Output: 0101
