<a href="https://colab.research.google.com/github/ScierKnave/dna_aptamers_modelling/blob/main/Copy_of_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [30]:
import math
import os
import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import TensorDataset, DataLoader
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
import json
import pandas as pd
import numpy as np
import sklearn
from sklearn.preprocessing import OneHotEncoder

if torch.cuda.is_available():
  torch.set_default_tensor_type('torch.cuda.FloatTensor')

# Hyperparameters

In [31]:
# Transformer HyperHPeters
# The length of our token sequences
ntokens_HP = 30
# Number of attention heads
nheads_HP = 8
# The encoding dimensions of our tokens
token_encode_size_HP = 4
# The embedding string size
# Pytorch will cut this embedded sequence 
# and give an equal amount to each head, different 
# than in theory 
embed_size_HP = token_encode_size_HP *  nheads_HP
# Output size of the heads, which 
# learn an embedding.
head_embedsize_HP = 4
# For some reason PyTorch needs us to do this manually
d_model_HP = head_embedsize_HP * nheads_HP
# No dropout for now
dropout_HP = 0
# Standard stuff
activation_HP = "relu"
layer_norm_eps_HP = 1e-5
batch_first_HP = True
norm_first_HP = False

# Trainig and validation Hyperparameters
datasetsize_HP = 1_000_000
split_HP = 0.9
batchsize_HP = 32
batchsize_HP = min(batchsize_HP, (int)(datasetsize_HP*0.2))
nepochs_HP = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
loss_HP = nn.MSELoss()
learnrate_HP = 1e-4

In [32]:
model_parameters = filter(lambda p: p.requires_grad, transformer.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])
print(params)

139425


In [33]:
'''
embed_layer = nn.Embedding(4, 16)
input = torch.rand(10, 30).to(torch.int)
embedded = embed_layer(input)
print(output.shape)

encoder_layer = nn.TransformerEncoderLayer(d_model=16, nhead=8)
out = encoder_layer(embedded)
print(out.shape)
'''

'\nembed_layer = nn.Embedding(4, 16)\ninput = torch.rand(10, 30).to(torch.int)\nembedded = embed_layer(input)\nprint(output.shape)\n\nencoder_layer = nn.TransformerEncoderLayer(d_model=16, nhead=8)\nout = encoder_layer(embedded)\nprint(out.shape)\n'

# Import and preprocess the data

In [34]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [35]:
features_df = pd.read_json('/content/drive/MyDrive/strandenergylist.json')
energy_df = features_df[1]
energy_df = energy_df.head(datasetsize_HP)
features_df = features_df.head(datasetsize_HP)
features_df = features_df[0].str.split('', expand=True)
features_df = features_df.drop([0, 31], axis=1)
labels_df = features_df[1]

### Transformation into pytorch tensors

In [36]:
# Transform string features into as DNA tensor
dna = features_df.to_numpy()
for i in range(dna.shape[0]):
    for j in range(dna.shape[1]):
        if (dna[i,j] == 'A'): dna[i,j] = 0
        if (dna[i,j] == 'C'): dna[i,j] = 1
        if (dna[i,j] == 'G'): dna[i,j] = 2
        if (dna[i,j] == 'T'): dna[i,j] = 3
dna = dna.astype(int)
dna = torch.from_numpy(dna)
#dna = F.one_hot(dna.to(torch.int64), num_classes=4).to(device)
#dna = dna.reshape(datasetsize_HP, 120)
# Get free energy as pytorch tensor
energy = torch.tensor(energy_df.values).to(torch.float)

### Creation of test and validation iterators

In [37]:
dataset = TensorDataset(dna, energy)
ntrain = (int) (split_HP * datasetsize_HP)
ntest = datasetsize_HP - ntrain
train_set, vali_set = torch.utils.data.random_split(dataset, [ntrain, ntest])

train_dataloader = DataLoader(train_set, batch_size=batchsize_HP, shuffle=True)
vali_dataloader = DataLoader(vali_set, batch_size=batchsize_HP, shuffle=True)

RuntimeError: ignored

In [None]:
len(train_dataloader.dataset)

# Create the transformer model

In [None]:
# Created entirely by the Pytorch team and pasted here.
# Adds information of position in the encoding of the tokens.
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class RegressionTranformer(nn.Module):
    def __init__(self):
        super(RegressionTranformer, self).__init__()
        self.encoder = nn.TransformerEncoderLayer(
            d_model = embed_size_HP,
            nhead = nheads_HP,
            dropout = dropout_HP,
            activation = activation_HP,
            layer_norm_eps = layer_norm_eps_HP,
            batch_first = batch_first_HP,
            norm_first = norm_first_HP
        )
        # Takes indexed sequences of tokens and embeds them.
        # [batch_size, seq_len] -> [seq_len, batch_size, embedding_dim] TODO check this!
        self.embedder = nn.Embedding(ntokens_HP, embed_size_HP)
        # Will add information about position in sequence to each token.
        # The second argument is the dropout probability.
        # [seq_len, batch_size, embedding_dim] -> [seq_len, batch_size, embedding_dim] TODO check this!
        self.posi_encoder = PositionalEncoding(embed_size_HP, 0.5)

        # This final layer will take the represented ouput and maps to a real number.
        # [batch_size, flatten_seq_len] -> [1]
        self.linear = nn.Linear(embed_size_HP * ntokens_HP, 1)


    def forward(self, x):
        #print("embedder input: ", x.shape)
        ŷ = self.embedder(x)
        #print("embedder output: ", ŷ.shape)

        ŷ = torch.permute(ŷ, (1,0,2)).to(device) # Necessary for the posi. enc. function.
        #print("posi input: ", ŷ.shape)
        ŷ = self.posi_encoder(ŷ)
        #print("posi output: ", ŷ.shape)

        ŷ = torch.permute(ŷ, (1,0,2)).to(device) # Back to normal
        #print("encod input: ", ŷ.shape)
        ŷ = self.encoder(ŷ)
        #print("encod output: ", ŷ.shape)

        #print("concat input: ", ŷ.shape)
        ŷ = ŷ.flatten(1,2).to(device)
        #print("concat output: ", ŷ.shape)

        #print("linear input: ", ŷ.shape)
        ŷ = self.linear(ŷ)
        #print("linear output: ", ŷ.shape)

        return ŷ
        
transformer = RegressionTranformer().to(device)

# Train the Transformer

In [None]:
def get_reg_accuracy():
    total = 0
    
    with torch.inference_mode():
      transformer.eval()
      for x_batch, y_batch in vali_dataloader:
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)
            ŷ_batch = transformer(x_batch)
            batch_loss = loss_HP(ŷ_batch, y_batch.unsqueeze(1))
            total += batchsize_HP * batch_loss.item()

    transformer.train()
    return total / (datasetsize_HP*(1-split_HP))


transformer_optimizer = torch.optim.Adam(transformer.parameters(), lr=learnrate_HP)
def trainroutine():
    for epoch in range(nepochs_HP):
        running_loss = 0.0
        for i, (x_batch, y_batch) in enumerate(train_dataloader):
            x_batch.to(device)
            y_batch.to(device)

            transformer_optimizer.zero_grad()
            
            # Forward propagation
            ŷ_batch = transformer(x_batch)
            
            # Backpropagation
            batch_loss = loss_HP(ŷ_batch, y_batch.unsqueeze(1))
            batch_loss.backward()
            transformer_optimizer.step()
            
            

            # Print statistics
            running_loss += batch_loss.item()*batchsize_HP
        print("[%d] Training loss: %.3f" %(epoch+1, running_loss/(datasetsize_HP*split_HP)))
        print("[%d] Test loss: %.3f" %(epoch+1, get_reg_accuracy()))
        
trainroutine()
torch.save(transformer.state_dict(),"transformer_weights.pt")


# Get validation accuracy

In [None]:
def get_reg_accuracy():
    total = 0
    
    with torch.inference_mode():
      transformer.eval()
      for x_batch, y_batch in vali_dataloader:
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)
            ŷ_batch = transformer(x_batch)
            batch_loss = loss_HP(ŷ_batch, y_batch.unsqueeze(1))
            total += batchsize_HP * batch_loss.item()

    transformer.train()
#print( get_reg_accuracy() )

# Notes

- We could add a positionnal encoding (with
PositionalEncoding) (see https://pytorch.org/tutorials/beginner/transformer_tutorial.html)

Usually, embedding is done in order to compress the representation of words (loss of information).
In our case, their are 4 possible tokens, which means that no compression is needed.