In [None]:
import random
import torch
import torch.nn as nn
import math
import torch.optim as optim
from torch.nn import CrossEntropyLoss
import numpy as np

# 1. Data Preprocessing

## (a) Defining the token vocabulary:

We need to map each amino acid in our sequence to a different token, which is represented by a unique number

In [None]:
'''
The following dictionary maps each amino acid that can be part of a protein sequence to a unique integer.
The dictionary also contains two special tokens: <PAD> and <MASK>. <PAD> is used to pad sequences that are part of the same batch 
to the same length, and <MASK> is used to mask tokens during training.
'''

token_vocabulary = { "Y": 0, "A": 1, "C": 2, "D": 3, "E": 4, "F": 5, "G": 6, "H": 7, "I": 8, "K": 9, "L": 10,
                    "M": 11, "N": 12, "P": 13, "Q": 14, "R": 15, "S": 16, "T": 17, "V": 18, "W": 19, 
                    "<UNK>" : 20, "<PAD>": 21, "<MASK>": 22}

def tokenize_sequence(sequence, token_vocabulary):
    """Converts a sequence of amino acids to a list of tokens. 
    If a token is not in the vocabulary, it is replaced by the <UNK> token."""
    return [token_vocabulary.get(token, token_vocabulary["<UNK>"]) for token in sequence]

# Test the function
sequence = "ACDEFGHIKLMNPQRSTX*"
tokens = tokenize_sequence(sequence, token_vocabulary)
print(tokens)

## (b) Define masking function

<span style="color:red; font-weight:bold">Exercise 3.1 (a) Define the function mask_tokenized_sequence following the instrcutions from doctrsing of the function bewlow: </span>

In [None]:

def mask_tokenized_sequence(tokenized_sequence, token_vocabulary, mask_percentage=0.15):
    """
    Masks a given percentage of tokens in a sequence for MLM (Maskes Language Modeling) training.

    Args:
    tokenized_sequence: List of tokens representing the input sequence
    token_vocabulary: Dictionary mapping tokens to integers
    mask_percentage: Percentage of tokens to mask

    Returns:
    masked_tokenized_sequence: List of input tokens with a subset of tokens masked
    tokenized_sequence: List of input tokens without masking
    """
    
    
    
    return masked_tokenized_sequence, tokenized_sequence



<span style="color:red; font-weight:bold"> Is your function working? Test your function, using the following code:</span>

In [None]:
# Example usage
tokenized_sequence = tokenize_sequence("ACDEFGHIKLMNPQRSTVWYX", token_vocabulary)
masked_sequence, target_sequence = mask_tokenized_sequence(tokenized_sequence, token_vocabulary, mask_percentage=0.15)
masked_sequence, target_sequence 

### (c) Define padding function

<span style="color:red; font-weight:bold">Exercise 3.1 (b) Define the function pad_tokenized_sequences following the instrcutions from doctrsing of the function bewlow: </span>

In [None]:
def pad_tokenized_sequences(tokenized_sequences, max_length):
    """Pad sequences to the same length (max_length). 
    If sequence is longer than allowed max_length, truncate it.
    
    Args:
    tokenized_sequences: List of tokenized sequences
    max_length: Maximum length of a sequence

    Returns:
    padded_sequences: List of padded tokenized sequences
    """
    
    return padded_sequences

<span style="color:red; font-weight:bold"> Is your function working? Test your function, using the following code:</span>

In [None]:
sequence = "ACDEFGHIKRSTVWYX"
tokenized_sequence = tokenize_sequence(sequence, token_vocabulary)
padded_sequence = pad_tokenized_sequences([tokenized_sequence], 20)
padded_sequence

### (d) Combining padding and masking


In [None]:
def mask_tokens_and_pad(tokenized_sequences, token_vocabulary, max_length):
    """Given a list of sequences, this function masks and pads the sequences and returns
    the masked sequences and the labels for the masked tokens.
    
    
    Args:
    tokenized_sequences: List of tokenized sequences
    token_vocabulary: Dictionary mapping tokens to integers
    max_length: Maximum length of a sequence (allowed for the Transformer Network)

    Returns:
    padded_inputs: List of padded tokenized sequences
    all_labels: List of labels for each token in the padded sequences
    """
    
    all_labels = []
    padded_inputs = []
    for input in tokenized_sequences:
        #padding input
        masked_input, _ = mask_tokenized_sequence(input, token_vocabulary, mask_percentage=0.15)
        pad_input = pad_tokenized_sequences([masked_input], max_length = max_length)
        padded_inputs.append(pad_input)


        #masking input
        unmasked_indices = list(np.where(np.array(pad_input) != token_vocabulary["<MASK>"]))
        #calculate output labels
        if len(input) >= max_length:
            labels = np.array(input)
        else:
            labels = np.array(input + [token_vocabulary.get("<PAD>")]*(max_length - len(input)))
        labels[unmasked_indices] = -100
        all_labels.append(list(labels))
    return padded_inputs, all_labels

<span style="color:red; font-weight:bold">Exercise 3.1 (c) What labels do have the masked tokens? What labels do have the tokens that are not masked? Can you guess why?</span>

## (c) Implement batching:

Implementing a function for batching a list of sequences

In [None]:

def create_batches(sequences, batch_size, shuffle = False):
    if shuffle:
        random.shuffle(sequences)
    batches = [sequences[i:i + batch_size] for i in range(0, len(sequences), batch_size)]
    return batches

toy_sequences = ['KHCIGHNWNCDDCCTTMD',
                    'QNIGNYLGKGXC',
                    'DPTYSMMMFRLSFYPCCKH',
                    'LKVMASPAXTVQSSHKEPW',
                    'YDEPITQGMDETHWAG',
                    'RPILVYCQXSE',
                    'CMLIGYHRALPSGTDHP',
                    'TVLVYVYFEVCWCVEACFT',
                    'HLDMTHDCGQX',
                    'KTEWCAPTMVHAEDPCG']

# Create batches
batch_size = 2
max_input_length = 18
batches = create_batches(toy_sequences, batch_size)

# Display the first batch for illustration

first_batch = batches[0]

print("All batches for the toy dataset:")
print(batches)
print("First batch of the toy dataset:")
print(first_batch)


# 2. Model Implementation

## (a) Implementing positional encoding

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_length=512):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_length, d_model)
        for pos in range(max_length):
            for i in range(0, d_model, 2):
                self.encoding[pos, i] = math.sin(pos / 10000 ** (2 * i / d_model))
                self.encoding[pos, i + 1] = math.cos(pos / 10000 ** (2 * i / d_model))
                
        self.encoding = self.encoding.unsqueeze(0)
    
    def forward(self, x):
        seq_length, batch_size, _ = x.size()
        # Expand the encoding to match the batch size
        encoding = self.encoding.expand(batch_size, -1, -1)
        #reshape to shape (seq_length, batch_size, d_model)
        encoding = encoding.permute(1, 0, 2)
        return encoding

# Test the function

positional_encoding = PositionalEncoding(d_model = 12, max_length=18)
# Create a dummy input tensor with batch size 2, sequence length 18
input = torch.ones(18,2, 1)
pos_encoding = positional_encoding(input)
pos_encoding.shape

<span style="color:red; font-weight:bold">Exercise 3.1 (d) What kind of positional encoding are we using in the function defined above? </span>

<span style="color:red; font-weight:bold">Exercise 3.1 (e) For the defined ProteinEncoder below, shorty describe the forward process of an input batch. </span>

<span style="color:red; font-weight:bold">Exercise 3.1 (f) For the defined ProteinEncoder below, define the function return_representations by following the instructions from the function's docstring. </span>

In [None]:


class ProteinEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, dim_feedforward, num_encoder_layers, max_length):
        super(ProteinEncoder, self).__init__()
        self.d_model = d_model
        self.dropout=0.1

        #token embeddings and positional encoding
        self.embedding = nn.Embedding(vocab_size, self.d_model) 
        self.pos_encoder = PositionalEncoding(self.d_model, max_length)

        #transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(self.d_model, n_heads, dim_feedforward, self.dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers)

        #prediction layer
        self.prediction_layer = nn.Linear(d_model, 21)

        #initialize weights
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.prediction_layer.bias.data.zero_()
        self.prediction_layer.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_mask=None):
        src = self.embedding(src) 
        src = self.pos_encoder(src) + src
        output = self.transformer_encoder(src, src_key_padding_mask=src_mask)
        output = self.prediction_layer(output)
        return output
    
    def return_representations(self, src, src_mask=None):
        '''
        This function returns all representations of all amino acids of the input sequences.

        Args:
        src: Tokenized input sequences
        src_mask: Attention masking for the input sequences

        Returns:
        output: All representations of the input sequences of dim (max_seq_length, batch_size, d_model)
        '''

        return output

# 3. Model Training

The following code initializes the enocder and defines functions for training the model for one epoch and for evaluation the model

In [None]:
d_model = 256
n_heads = 4
dim_feedforward = 512
num_encoder_layers = 4
max_length = 512

model = ProteinEncoder(vocab_size=len(token_vocabulary), d_model=d_model, n_heads=n_heads, dim_feedforward=dim_feedforward,
                        num_encoder_layers=num_encoder_layers, max_length=max_length)

optimizer = optim.Adam(model.parameters(), lr=5e-5)
loss_fn = CrossEntropyLoss()


In [None]:
def train_one_epoch(model, dataset, optimizer, loss_fn):
    model.train()
    total_loss = 0
    for batch in dataset:
        inputs = [tokenize_sequence(seq, token_vocabulary) for seq in batch] # Tokenize sequences
        inputs, labels = mask_tokens_and_pad(inputs, token_vocabulary, max_length=max_length)
        labels = torch.tensor(labels)
        inputs = torch.tensor(inputs)
        attention_mask = (inputs != token_vocabulary["<PAD>"]).bool().view(-1, max_length)
        inputs = inputs[:,0,:].transpose(0,1)


        optimizer.zero_grad()
        outputs = model(src=inputs, src_mask = attention_mask)
        
        # Adjust outputs and labels to be flat for calculating loss
        outputs = outputs.view(-1, outputs.shape[-1])
        labels = labels.view(-1)
        
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    return total_loss / len(dataset)


def evaluate(model, dataset, loss_fn):
    model.eval()
    total_loss = 0
    with torch.no_grad():
         for batch in dataset:
            inputs = [tokenize_sequence(seq, token_vocabulary) for seq in batch] # Tokenize sequences
            inputs, labels = mask_tokens_and_pad(inputs, token_vocabulary, max_length=max_length)
            labels = torch.tensor(labels)
            inputs = torch.tensor(inputs)
            attention_mask = (inputs != token_vocabulary["<PAD>"]).bool().view(-1, max_length)
            inputs = inputs[:,0,:].transpose(0,1)

            optimizer.zero_grad()
            outputs = model(inputs, src_mask = attention_mask)
            loss = loss_fn(outputs.view(-1, outputs.shape[-1]), labels.view(-1))
            total_loss += loss.item()
    return total_loss / len(dataset)

<span style="color:red; font-weight:bold">Exercise 3.1 (g) Load the training, validation, and test datasets from the optmal pH prediction task from Worksheet 1. Store all training sequences in train_sequences, the validation sequences in val_sequences, and the test sequences in test_sequences. For sequences that are longer than 511 amino acids, only use the first 511 amino acids: </span>

<span style="color:red; font-weight:bold">Exercise 3.1 (h) Train the Protein encoder on the training sequences for at least 5 epochs. If you cannot train the model on your own PC try to reduce the model size, batch size and or maximum sequence length. Alternatively, you can run the training on the HPC. 
</span>

In [None]:
val_dataset = create_batches(val_sequences, batch_size=8)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    train_dataset = create_batches(train_sequences, batch_size=8, shuffle = True)
    train_loss = train_one_epoch(model, train_dataset, optimizer, loss_fn)
    val_loss = evaluate(model, val_dataset, loss_fn)
    print(f'Epoch {epoch}, Train Loss: {train_loss}, Validation Loss: {val_loss}')

<span style="color:red; font-weight:bold">Exercise 3.1 (i) Define a function that calculates for a given protein amino acid sequence a single numerical representation. This representation should be the element-wise mean of all amino acid representations. Test this function for the first sequence of train_sequences </span>

<span style="color:red; font-weight:bold">Exercise 3.1 (j) How many learnable parameters are in the transformer network? How many trainable parameters are in the atention heads, feedforward blocks, embeddings layer and the final prediction layer? </span>