HOW TO USE OPTIMUS MODEL

- first install and import everithings
- set the paths
- execute everthing in saving end splitting data
- then you can exucute the treining

the saving and splitting data part need to be executed only one time just to create the datasets.

In [1]:
import torch
import torch.nn as nn
import pytorch_lightning as pl
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import math
import copy

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
datapath = "" #path to the CSV file containing the "sequence" and the "label"
savepath = "" # you will nead a folder named Dataset containing tree empty folders named ids, att_mask and labels. Assaign to this var the path to the Dataset folder.

#Once you crete the dataset you need to create other two folder named Dataset_validation and Dataset_Testing both containing tree empty folders named ids, att_mask and labels.

training_path=""    # equal to savepath
validation_path=""  # path of Dataset_validation
testing_path=""     # path of Dataset_testing


ARCHITECTURE


POSITIONAL ENCODING
Positional Encoding is used to inject the position information of each token in the input sequence. It uses sine and cosine functions of different frequencies to generate the positional encoding.

In [2]:
class PositionalEncoding(nn.Module): #subclass of nn.Module allowing it to be used as a pytorch layer
    def __init__(self, d_model, max_seq_length):
        #d_model, the dimension of the model's input 
        #max_seq_lenght, maximun lenght of the input sequence 
        super(PositionalEncoding, self).__init__()
        
        #tensor setted equal to zero that will be populated with pos encodings
        pe = torch.zeros(max_seq_length, d_model) 
        
        #torch.arange create a tensor of index from 0 up to max lenght
        #with unsqueeze we add a dimension so the tensor will pass from [max_len] to [max_len, 1]
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        
        #A term used to scale the position indices in a specific way.
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term) #sine to even indices
        pe[:, 1::2] = torch.cos(position * div_term) #cosine to the odd indices
        
        self.register_buffer('pe', pe.unsqueeze(0)) 
        
        
    def forward(self, x):
        #The forward method simply adds the positional encodings to the input x.

        #It uses the first x.size(1) elements of pe to ensure that the positional 
        # encodings match the actual sequence length of x.
        
        return x + self.pe[:, :x.size(1)]

The PositionalEncoding class adds information about the position of tokens within the sequence. Since the transformer model lacks inherent knowledge of the order of tokens (due to its self-attention mechanism), this class helps the model to consider the position of tokens in the sequence. The sinusoidal functions used are chosen to allow the model to easily learn to attend to relative positions, as they produce a unique and smooth encoding for each position in the sequence.

Position-wise Feed-Forward Networks
In summary, the PositionWiseFeedForward class defines a position-wise feed-forward neural network that consists of two linear layers with a ReLU activation function in between. In the context of transformer models, this feed-forward network is applied to each position separately and identically. It helps in transforming the features learned by the attention mechanisms within the transformer, acting as an additional processing step for the attention outputs.

In [3]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        #definition of the 2 linear transformation layer  
        self.fc1 = nn.Linear(d_model, d_ff) 
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

Multi-head Attention
The Multi-Head Attention mechanism computes the attention between each pair of positions in a sequence. It consists of multiple “attention heads” that capture different aspects of the input sequence.

In [4]:

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure that the model dimension (d_model) is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        #The initialization checks if d_model is divisible by num_heads, 
        #and then defines the transformation weights for query, key, value, and output.
                
        # Initialize dimensions
        self.d_model = d_model # Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value
        
        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query transformation layer
        self.W_k = nn.Linear(d_model, d_model) # Key transformation layer
        self.W_v = nn.Linear(d_model, d_model) # Value transformation layer 
        self.W_o = nn.Linear(d_model, d_model) # Output transformation layer
        #obviously considering the size of the input
        
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None): #here i need to apply the attention mask
        # Calculate attention scores
        # attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k). 
        # Here, the attention scores are calculated by taking the dot product of queries (Q) and keys (K), 
        # and then scaling by the square root of the key dimension (d_k).
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # Apply mask if provided 
        # (useful for preventing attention to certain parts like padding)
        # this is foundamental in our specific use case since we are working with sequence with different lenght
        # so we must use the attention mask to use only the actual attention score refering the actual lenght
        if mask is not None:
            #print(mask.shape)
            #print(attn_scores.shape)
            # Reshape mask to broadcast along dimensions 2 and 3
            mask = mask.unsqueeze(1).unsqueeze(2)  # Add two singleton dimensions

            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        
        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)
        
        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        #This method reshapes the input x into the shape (batch_size, num_heads, seq_length, d_k). 
        # It enables the model to process multiple attention heads concurrently, allowing for parallel computation.
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        # After applying attention to each head separately, this method combines the results back into 
        # a single tensor of shape (batch_size, seq_length, d_model). This prepares the result for further processing.
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):

        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        
        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

Encoder Layer

we create the Encoder layer in the classic way

In [5]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

In [6]:
import pytorch_lightning as pl
import torch.nn.functional as F
import torchmetrics

In [7]:
import torchmetrics.classification


class Transformer(pl.LightningModule):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        # The constructor takes the following parameters:

        # src_vocab_size: Source vocabulary size.
        # tgt_vocab_size: Target vocabulary size.
        # d_model: The dimensionality of the model's embeddings.
        # num_heads: Number of attention heads in the multi-head attention mechanism.
        # num_layers: Number of layers for both the encoder and the decoder.
        # d_ff: Dimensionality of the inner layer in the feed-forward network.
        # max_seq_length: Maximum sequence length for positional encoding.
        # dropout: Dropout rate for regularization.

        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        
        self.fc = nn.Linear(d_model, tgt_vocab_size)

        self.output_activation = nn.Sigmoid()
          
        self.dropout = nn.Dropout(dropout)
        # for validation/testing
        self.accuracy = torchmetrics.Accuracy(task="binary")
        self.f1=torchmetrics.classification.BinaryF1Score()
        self.precision=torchmetrics.classification.BinaryPrecision()
        self.recall=torchmetrics.classification.BinaryRecall()



    def forward(self, src, tgt, src_mask):

        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        #print("Output shape pre fc:" + str(enc_output.shape))
        cls_output = enc_output[:,0]

        #print("CLS output shape: "+str(cls_output.shape))

        output = self.fc(cls_output)
        #print("Output shape after:" + str(output.shape))

        output = self.output_activation(output.squeeze())

    
        return output
    
    def cross_entropy_loss(self, logits, labels):
      return F.binary_cross_entropy(logits, labels)

    def training_step(self, train_batch, batch_idx):
        x, y, att = train_batch
        
        logits = self.forward(x,y,att)
        loss = self.cross_entropy_loss(logits, y)
        self.log('train_loss', loss)
        return loss


    def validation_step(self, val_batch, batch_idx):
        x, y, att = val_batch
        logits = self.forward(x,y,att)
        loss = self.cross_entropy_loss(logits, y)
        acc = self.accuracy(logits,y)
        f1_val=self.f1(logits,y)
        precision_val=self.precision(logits,y)
        recall_val=self.recall(logits,y)
        self.log('val_loss', loss)
        self.log('val_accuracy', acc)
        self.log('val_f1', f1_val)
        self.log('val_precision', precision_val)
        self.log('val_recall', recall_val)

    def test_step(self, test_batch, batch_idx):
        x, y, att = test_batch
        logits = self.forward(x,y,att)
        loss = self.cross_entropy_loss(logits, y)
        acc = self.accuracy(logits,y)
        f1_test=self.f1(logits,y)
        precision_test=self.precision(logits,y)
        recall_test=self.recall(logits,y)

        self.log('test_loss', loss)
        self.log('test_f1', f1_test)
        self.log('test_accuracy', acc)
        self.log('test_precision', precision_test)
        self.log('test_recall', recall_test)

    def configure_optimizers(self):
      optimizer = torch.optim.AdamW(self.parameters(), lr=0.00001)
      return optimizer
    
    

In [8]:
src_vocab_size = 5000
tgt_vocab_size = 1
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 500
dropout = 0.1


In [9]:
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

In [10]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("zhihan1996/DNABERT-2-117M", cls_token="[token]",  trust_remote_code=True)

SAVING DATA

In [11]:
from torch.utils.data import Dataset
import pandas as pd


class CustomDataSet(Dataset):
    def __init__(self, csv_file):
        self.df = pd.read_csv(csv_file)

    def __len__(self):
        return self.df.shape[0]

    def __getitem__(self, index):
        sequence = self.df["sequence"][index]
        sequence = "[token]" + sequence #adding the cls_token for classification purpose
        label = self.df["label"][index]
        return sequence, label

In [12]:
import torch
# Create custom dataset object
train_data_object = CustomDataSet(datapath)

train_loader = torch.utils.data.DataLoader(train_data_object,
        batch_size=32, shuffle = False)

In [17]:


for i,item in enumerate(train_loader):
  dna,label = item
  inputs = tokenizer(dna, return_tensors = 'pt', add_special_tokens=False, padding=True)
  
  ids = inputs["input_ids"]
  att_mask = inputs["attention_mask"]
  
  torch.save(ids, savepath + "/ids/%d.pt" % i)
  torch.save(att_mask, savepath + "/att_mask/%d.pt" % i)
  torch.save(label, savepath + "/labels/%d.pt" % i )
  

SPLITTING DATA


In [18]:
#codice utilizzato per splittare il dataset in Training, Validation e Testing in rapporto 80/10/10



import os
import shutil



training_ids_path = os.path.join(training_path, "ids")
training_att_mask_path = os.path.join(training_path, "att_mask")
training_labels_path = os.path.join(training_path, "labels")

validation_ids_path = os.path.join(validation_path, "ids")
validation_att_mask_path = os.path.join(validation_path, "att_mask")
validation_labels_path = os.path.join(validation_path, "labels")

testing_ids_path = os.path.join(testing_path, "ids")
testing_att_mask_path = os.path.join(testing_path, "att_mask")
testing_labels_path = os.path.join(testing_path, "labels")


for i, el in enumerate(os.listdir(training_ids_path)):
    if i % 10 == 0:
        shutil.move(os.path.join(training_ids_path, el), validation_ids_path)
        shutil.move(os.path.join(training_att_mask_path, el), validation_att_mask_path)
        shutil.move(os.path.join(training_labels_path, el), validation_labels_path)
    if i % 10 == 1:
        shutil.move(os.path.join(training_ids_path, el), testing_ids_path)
        shutil.move(os.path.join(training_att_mask_path, el), testing_att_mask_path)
        shutil.move(os.path.join(training_labels_path, el), testing_labels_path)


LOAD DATA

In [11]:
from torch.utils.data import Dataset
import pandas as pd

In [12]:
import os
class MyDataSet(Dataset):
    def __init__(self, path):
        self.path = path
        self.df_ids = os.listdir(path+'/ids')
        self.df_att_mak = os.listdir(path+'/att_mask')
        self.df_labels = os.listdir(path+'/labels')

    def __len__(self):
        return len(self.df_ids)

    def __getitem__(self, index):
        ids = torch.load(self.path+'/ids/'+self.df_ids[index])
        att_mask = torch.load(self.path+'/att_mask/'+self.df_att_mak[index])
        label = torch.load(self.path+'/labels/'+self.df_labels[index]).float()
        print("loading file"+self.path+'/ids/'+self.df_ids[index])

        return ids,label,att_mask

In [13]:

# Create custom dataset object
train_data_object = MyDataSet(DataTrainpath)
test_data_object = MyDataSet(DataTestpath)
val_data_object = MyDataSet(DataValpath)

def collate(batch): #
  (a, b, c) = batch[0]
  return (a,b,c)

In [14]:
import pytorch_lightning as pl

In [14]:
class MyDataModule(pl.LightningDataModule):

  def setup(self, stage):
    self.dataset = ""#MyDataSet("")


  def train_dataloader(self):
    return torch.utils.data.DataLoader(train_data_object,
        batch_size=1, shuffle = False, collate_fn=collate)
  def val_dataloader(self):
    return torch.utils.data.DataLoader(val_data_object,
        batch_size=1, shuffle = False, collate_fn=collate)
  def test_dataloader(self):
    return torch.utils.data.DataLoader(test_data_object,
       batch_size=1, shuffle = False, collate_fn=collate)


In [17]:
trainer = pl.Trainer(max_epochs=4)

data_module = MyDataModule()

trainer.fit(transformer, data_module)
# Valutazione del modello
p = trainer.test(transformer, data_module)
print("Loss sul set di validazione:", p)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]



  | Name                | Type               | Params
-----------------------------------------------------------
0 | encoder_embedding   | Embedding          | 2.6 M 
1 | positional_encoding | PositionalEncoding | 0     
2 | encoder_layers      | ModuleList         | 18.9 M
3 | fc                  | Linear             | 513   
4 | output_activation   | Sigmoid            | 0     
5 | dropout             | Dropout            | 0     
6 | accuracy            | BinaryAccuracy     | 0     
-----------------------------------------------------------
21.5 M    Trainable params
0         Non-trainable params
21.5 M    Total params
85.899    Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_testing/ids/385.pt
Sanity Checking DataLoader 0:  50%|█████     | 1/2 [00:00<00:00,  2.90it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_testing/ids/373.pt
Epoch 0:   0%|          | 0/425 [00:00<?, ?it/s]                           loading file/home/antoniodeblasi/Scaricati/Dataset/ids/297.pt
Epoch 0:   0%|          | 1/425 [00:00<00:35, 11.91it/s, v_num=7]loading file/home/antoniodeblasi/Scaricati/Dataset/ids/245.pt
Epoch 0:   0%|          | 2/425 [00:00<00:57,  7.34it/s, v_num=7]loading file/home/antoniodeblasi/Scaricati/Dataset/ids/102.pt
Epoch 0:   1%|          | 3/425 [00:01<02:30,  2.80it/s, v_num=7]loading file/home/antoniodeblasi/Scaricati/Dataset/ids/487.pt
Epoch 0:   1%|          | 4/425 [00:02<03:38,  1.93it/s, v_num=7]loading file/home/antoniodeblasi/Scaricati/Dataset/ids/213.pt
Epoch 0:   1%|          | 5/425 [00:03<04:18,  1.62it/s, v_num=7]loading file/home/antonio

`Trainer.fit` stopped: `max_epochs=4` reached.


Epoch 3: 100%|██████████| 425/425 [07:14<00:00,  0.98it/s, v_num=7]


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: |          | 0/? [00:00<?, ?it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_validation/ids/368.pt
Testing DataLoader 0:   2%|▏         | 1/54 [00:00<00:20,  2.61it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_validation/ids/234.pt
Testing DataLoader 0:   4%|▎         | 2/54 [00:00<00:19,  2.63it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_validation/ids/22.pt
Testing DataLoader 0:   6%|▌         | 3/54 [00:01<00:19,  2.56it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_validation/ids/181.pt
Testing DataLoader 0:   7%|▋         | 4/54 [00:01<00:19,  2.57it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_validation/ids/56.pt
Testing DataLoader 0:   9%|▉         | 5/54 [00:01<00:19,  2.58it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_validation/ids/446.pt
Testing DataLoader 0:  11%|█         | 6/54 [00:02<00:18,  2.58it/s]loading file/home/antoniodeblasi/Scaricati/Dataset_validation/ids/244.pt
Testing DataLoader 0:  13%|█▎        | 7/