In [1]:
import mpramnist
from mpramnist.deepstarrdataset import DeepStarrDataset
from mpramnist import transforms as t
from mpramnist import target_transforms as t_t

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data

import pytorch_lightning as pl
from pytorch_lightning import loggers as pl_loggers

In [2]:
BATCH_SIZE = 1024
NUM_WORKERS = 103
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

activity_columns = DeepStarrDataset.activity_columns

## Reverse-complement

In case you want to use reverse-complement as it was used in the original study, then use *use_original_reverse_complement* attribute. Default *None*

**WARNING**: in the original study, reverse-complement  was applied **only** to the training set.

For example:

In [3]:
train_transform = t.Compose([
    t.Seq2Tensor(),# t.Reverse(0.5) is not needed here
])
val_test_transform = t.Compose([
    t.Seq2Tensor(),
])
orig_rev_comp_train_dataset = DeepStarrDataset(activity_columns = activity_columns,
                                               split = "train",transform = train_transform)
# VAL NO CHANGES
val_dataset = DeepStarrDataset(activity_columns = activity_columns, split = "val",transform = val_test_transform)
# TEST NO CHANGES
test_dataset = DeepStarrDataset(activity_columns = activity_columns, split = "test", transform = val_test_transform) 

Note: The training set contains reverse-complement augmentation as implemented in the original study.  
• Dataset size: 2N (N original + N reverse-complemented sequences)  
• Label consistency: y_rc ≡ y_original  
• Do not reapply this transformation during preprocessing. 


In [4]:
print(len(orig_rev_comp_train_dataset))
print(len(val_dataset))
print(len(test_dataset))

402278
40570
41186


But we suggest using reverse-complement transformation by writing **transforms.Reverse(prob = 0.5)**. *prob = 0.5* means that the sequence will be reversed with probability of 0.5.

For example:

In [5]:
train_transform = t.Compose([
    t.Seq2Tensor(),
    t.ReverseComplement(0.5)
])
val_test_transform = t.Compose([
    t.Seq2Tensor(),
    t.ReverseComplement(0)
])
train_dataset = DeepStarrDataset(activity_columns = activity_columns,
                                 use_original_reverse_complement = False,
                                 split = "train",transform = train_transform)

val_dataset = DeepStarrDataset(activity_columns = activity_columns, split = "val",transform = val_test_transform) 

test_dataset = DeepStarrDataset(activity_columns = activity_columns, split = "test", transform = val_test_transform)

In [6]:
print(len(train_dataset))
print(len(val_dataset))
print(len(test_dataset))

201139
40570
41186


## Data splitting

Sequences from the **first** and **second** half of chr2R were held out for validation and testing, respectively.

The remainig chromosomes are used for training set.

You can use *"train"*, *"val"*, *"test"* to define the training, validation or test set respectively **using the same approach as the original study**

For example:

In [7]:
train_transform = t.Compose([
    t.Seq2Tensor(),
])
val_test_transform = t.Compose([
    t.Seq2Tensor(),
])
orig_train_dataset = DeepStarrDataset(activity_columns = activity_columns,
                                 split = "train",transform = train_transform)

val_dataset = DeepStarrDataset(activity_columns = activity_columns, split = "val",transform = val_test_transform) 

test_dataset = DeepStarrDataset(activity_columns = activity_columns, split = "test", transform = val_test_transform)

Note: The training set contains reverse-complement augmentation as implemented in the original study.  
• Dataset size: 2N (N original + N reverse-complemented sequences)  
• Label consistency: y_rc ≡ y_original  
• Do not reapply this transformation during preprocessing. 


In [8]:
print(len(orig_train_dataset))
print(len(val_dataset))
print(len(test_dataset))

402278
40570
41186


On the other hand, you can define a list of specific chromosomes that you want to use as training, validation ot test set

For example:

In [9]:
list_of_chr = DeepStarrDataset.list_of_chr
print(list_of_chr)

['chr2L', 'chr2LHet', 'chr2RHet', 'chr3L', 'chr3LHet', 'chr3R', 'chr3RHet', 'chr4', 'chrX', 'chrXHet', 'chrYHet', 'chr2R']


In [10]:
my_train_dataset = DeepStarrDataset(activity_columns = activity_columns,
                                    split = ['chr2L', 'chr2LHet', 'chr2RHet',  # Reverse complement transformation is disabled for chromosome list splits. 
                                              'chr3L', 'chr3LHet', 'chr3R',    # Set use_original_reverse_complement=True to apply original paper augmentation.
                                              'chr3RHet', 'chr4'], transform = train_transform) 

my_val_dataset = DeepStarrDataset(activity_columns = activity_columns, split = ['chrX', 'chrXHet', 'chrYHet'], transform = val_test_transform) 

my_test_dataset = DeepStarrDataset(activity_columns = activity_columns, split = 'chr2R', transform = val_test_transform)

In [11]:
print(len(my_train_dataset))
print(len(my_val_dataset))
print(len(my_test_dataset))

151641
49498
81756


## Regression task

In [12]:
in_channels = len(train_dataset[0][0])
out_channels = len(activity_columns)

## Legnet

In [13]:
import math

import torch
import torch.nn as nn
import torch.nn.functional as F

def initialize_weights(m):
    if isinstance(m, nn.Conv1d):
        n = m.kernel_size[0] * m.out_channels
        m.weight.data.normal_(0, math.sqrt(2 / n))
        if m.bias is not None:
            nn.init.constant_(m.bias.data, 0)
    elif isinstance(m, nn.BatchNorm1d):
        nn.init.constant_(m.weight.data, 1)
        nn.init.constant_(m.bias.data, 0)
    elif isinstance(m, nn.Linear):
        m.weight.data.normal_(0, 0.001)
        if m.bias is not None:
            nn.init.constant_(m.bias.data, 0)

class SELayer(nn.Module):
    def __init__(self, inp, reduction=4):
        super(SELayer, self).__init__()
        self.fc = nn.Sequential(
                nn.Linear(inp, int(inp // reduction)),
                nn.SiLU(),
                nn.Linear(int(inp // reduction), inp),
                nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, = x.size()
        y = x.view(b, c, -1).mean(dim=2)
        y = self.fc(y).view(b, c, 1)
        return x * y

class EffBlock(nn.Module):
    def __init__(self, in_ch, ks, resize_factor, activation, out_ch=None, se_reduction=None):
        super().__init__()
        self.in_ch = in_ch
        self.out_ch = self.in_ch if out_ch is None else out_ch
        self.resize_factor = resize_factor
        self.se_reduction = resize_factor if se_reduction is None else se_reduction
        self.ks = ks
        self.inner_dim = self.in_ch * self.resize_factor

        block = nn.Sequential(
                        nn.Conv1d(
                            in_channels=self.in_ch,
                            out_channels=self.inner_dim,
                            kernel_size=1,
                            padding='same',
                            bias=False
                       ),
                       nn.BatchNorm1d(self.inner_dim),
                       activation(),

                       nn.Conv1d(
                            in_channels=self.inner_dim,
                            out_channels=self.inner_dim,
                            kernel_size=ks,
                            groups=self.inner_dim,
                            padding='same',
                            bias=False
                       ),
                       nn.BatchNorm1d(self.inner_dim),
                       activation(),
                       SELayer(self.inner_dim, reduction=self.se_reduction),
                       nn.Conv1d(
                            in_channels=self.inner_dim,
                            out_channels=self.in_ch,
                            kernel_size=1,
                            padding='same',
                            bias=False
                       ),
                       nn.BatchNorm1d(self.in_ch),
                       activation(),
        )

        self.block = block

    def forward(self, x):
        return self.block(x)

class LocalBlock(nn.Module):
    def __init__(self, in_ch, ks, activation, out_ch=None):
        super().__init__()
        self.in_ch = in_ch
        self.out_ch = self.in_ch if out_ch is None else out_ch
        self.ks = ks

        self.block = nn.Sequential(
                       nn.Conv1d(
                            in_channels=self.in_ch,
                            out_channels=self.out_ch,
                            kernel_size=self.ks,
                            padding='same',
                            bias=False
                       ),
                       nn.BatchNorm1d(self.out_ch),
                       activation()
        )

    def forward(self, x):
        return self.block(x)

class ResidualConcat(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, **kwargs):
        return torch.concat([self.fn(x, **kwargs), x], dim=1)

class MapperBlock(nn.Module):
    def __init__(self, in_features, out_features, activation=nn.SiLU):
        super().__init__()
        self.block = nn.Sequential(
            nn.BatchNorm1d(in_features),
            nn.Conv1d(in_channels=in_features,
                      out_channels=out_features,
                      kernel_size=1),
        )

    def forward(self, x):
        return self.block(x)

class HumanLegNet(nn.Module):
    def __init__(self,
                 in_ch,
                 output_dim,
                 stem_ch,
                 stem_ks,
                 ef_ks,
                 ef_block_sizes,
                 pool_sizes,
                 resize_factor,
                 activation=nn.SiLU,
                 ):
        super().__init__()
        assert len(pool_sizes) == len(ef_block_sizes)

        self.in_ch = in_ch
        self.stem = LocalBlock(in_ch=in_ch,
                               out_ch=stem_ch,
                               ks=stem_ks,
                               activation=activation)

        blocks = []
        self.output_dim = output_dim
        in_ch = stem_ch
        out_ch = stem_ch
        for pool_sz, out_ch in zip(pool_sizes, ef_block_sizes):
            blc = nn.Sequential(
                ResidualConcat(
                    EffBlock(
                        in_ch=in_ch,
                        out_ch=in_ch,
                        ks=ef_ks,
                        resize_factor=resize_factor,
                        activation=activation)
                ),
                LocalBlock(in_ch=in_ch * 2,
                           out_ch=out_ch,
                           ks=ef_ks,
                           activation=activation),
                nn.MaxPool1d(pool_sz) if pool_sz != 1 else nn.Identity()
            )
            in_ch = out_ch
            blocks.append(blc)
        self.main = nn.Sequential(*blocks)

        self.mapper = MapperBlock(in_features=out_ch,
                                  out_features=out_ch * 2)
        self.head = nn.Sequential(nn.Linear(out_ch * 2, out_ch * 2),
                                   nn.BatchNorm1d(out_ch * 2),
                                   activation(),
                                   nn.Linear(out_ch * 2, self.output_dim))

    def forward(self, x):
        x = self.stem(x)
        x = self.main(x)
        x = self.mapper(x)
        x =  F.adaptive_avg_pool1d(x, 1)
        x = x.squeeze(-1)
        x = self.head(x)
        x = x.squeeze(-1)
        return x

## Trainer

In [14]:
import pytorch_lightning as L
from torch.nn import functional as F

def pearson_correlation(x, y):
    vx = x - torch.mean(x, dim=0)
    vy = y - torch.mean(y, dim=0)
    pearsons = torch.sum(vx * vy, dim=0) / (torch.sqrt(torch.sum(vx ** 2, dim=0)) * torch.sqrt(torch.sum(vy ** 2, dim=0)) + 1e-10)
    return torch.mean(pearsons)
    
class Seq1Model(L.LightningModule):
    
    def __init__(self, in_ch, out_ch, lr=3e-4):
        super().__init__()
        
        self.model = HumanLegNet(in_ch=in_ch,
                                 output_dim = out_ch,
                                 stem_ch=64,
                                 stem_ks=11,
                                 ef_ks=9,
                                 ef_block_sizes=[80, 96, 112, 128],
                                 pool_sizes=[2,2,2,2],
                                 resize_factor=4)
        self.model.apply(initialize_weights)
    
        self.loss = nn.MSELoss() 
        self.lr = lr
        self.train_loss = torch.tensor([]).to(device)
        self.val_loss = torch.tensor([]).to(device)
        self.pred = torch.empty(0).to(device)
        self.target = torch.empty(0).to(device)
        
    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_nb):
        X, y = batch
        y_hat = self.model(X)
        
        loss = self.loss(y_hat, y)
        self.log("train_loss", loss, prog_bar=True,  on_step=True, on_epoch=True, logger = True)
        self.train_loss = torch.cat([self.train_loss, loss.unsqueeze(0)])
    
        lr = self.optimizers().param_groups[0]['lr']
        self.log('learning_rate', lr, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        
        return loss
        
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.val_loss = torch.cat([self.val_loss, loss.unsqueeze(0)])

        self.pred = torch.cat([self.pred, y_hat], dim = 0)
        self.target = torch.cat([self.target, y], dim = 0)
        
    def on_validation_epoch_end(self):
        train_loss = torch.mean(self.train_loss)
        val_loss = torch.mean(self.val_loss)
        
        res_str = '|' + ' {}: {:.5f} |'.format("current_epoch", self.current_epoch) 
        res_str += ' {}: {:.5f} |'.format("val_loss", val_loss)

        corr = pearson_correlation(self.pred.clone().detach(), self.target.clone().detach())
        self.log("val_pearson", corr, on_epoch=True, prog_bar=True, on_step = False)
        res_str += ' {}: {:.5f} |'.format("val_pearson_r", corr)
    
        res_str += ' {}: {:.5f} |'.format("train_loss", train_loss)
        
        border = '-'*100 #len(res_str)
        print("\n".join(['', border, res_str, border, '']))

        self.val_loss = torch.tensor([]).to(device)
        self.pred = torch.empty(0).to(device)
        self.target = torch.empty(0).to(device)
        self.train_loss = torch.tensor([]).to(device)
        
    def test_step(self, batch, _):
        x, y = batch
        y_hat = self.model(x)
        loss = self.loss(y_hat, y)
        
        self.log('test_loss', 
                 loss, 
                 prog_bar=True, 
                 on_step=False,
                 on_epoch=True)

        self.pred = torch.cat([self.pred, y_hat], dim = 0)
        self.target = torch.cat([self.target, y], dim = 0)
        
    def on_test_epoch_end(self):
        corr = pearson_correlation(self.pred, self.target)
        self.log('test_pearson_r', corr, on_step=False, on_epoch=True, prog_bar=True)
        
    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        x, y = batch
        y_hat = self.model(x)          
        
        return {"y": y.cpu().detach().float(), "pred": y_hat.cpu().detach().float()}
    
    def train_dataloader(self):
        return data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers = NUM_WORKERS, pin_memory=True)

    def val_dataloader(self):
        return data.DataLoader(dataset=val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers = NUM_WORKERS, pin_memory=True)
    
    def test_dataloader(self):
        return data.DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers = NUM_WORKERS, pin_memory=True)
    
    def configure_optimizers(self):
        
        self.optimizer = torch.optim.AdamW(self.parameters(),
                                               lr=self.lr,
                                               weight_decay = 1e-6)
        
        lr_scheduler = torch.optim.lr_scheduler.OneCycleLR(self.optimizer, # type: ignore
                                                        max_lr=self.lr,
                                                        three_phase=False, 
                                                        total_steps=self.trainer.estimated_stepping_batches, # type: ignore
                                                        pct_start=0.3,
                                                        cycle_momentum =False)
        lr_scheduler_config = {
                    "scheduler": lr_scheduler,
                    "interval": "step",
                    "frequency": 1,
                    "name": "cycle_lr"
            }
            
        return [self.optimizer], [lr_scheduler_config]
        

In [15]:
seq_model = Seq1Model(in_ch = in_channels, out_ch = out_channels, lr = 0.002)

# Initialize a trainer
trainer = L.Trainer(
    accelerator="gpu",
    devices=[0],
    max_epochs=5,
    gradient_clip_val=1,
    precision='16-mixed', 
    enable_progress_bar = True,
    num_sanity_val_steps=0
)

# Train the model
trainer.fit(seq_model)

trainer.test(seq_model)

Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 3090') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
2025-03-24 17:10:24.752113: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-24 17:10:24.767020: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one 

Training: |                                                                                       | 0/? [00:00…

Validation: |                                                                                     | 0/? [00:00…


----------------------------------------------------------------------------------------------------
| current_epoch: 0.00000 | val_loss: 1.87937 | val_pearson_r: 0.57532 | train_loss: 1.88302 |
----------------------------------------------------------------------------------------------------



Validation: |                                                                                     | 0/? [00:00…


----------------------------------------------------------------------------------------------------
| current_epoch: 1.00000 | val_loss: 1.37672 | val_pearson_r: 0.68757 | train_loss: 1.39281 |
----------------------------------------------------------------------------------------------------



Validation: |                                                                                     | 0/? [00:00…


----------------------------------------------------------------------------------------------------
| current_epoch: 2.00000 | val_loss: 1.28486 | val_pearson_r: 0.71133 | train_loss: 1.21586 |
----------------------------------------------------------------------------------------------------



Validation: |                                                                                     | 0/? [00:00…


----------------------------------------------------------------------------------------------------
| current_epoch: 3.00000 | val_loss: 1.21206 | val_pearson_r: 0.72371 | train_loss: 1.06064 |
----------------------------------------------------------------------------------------------------



Validation: |                                                                                     | 0/? [00:00…

`Trainer.fit` stopped: `max_epochs=5` reached.



----------------------------------------------------------------------------------------------------
| current_epoch: 4.00000 | val_loss: 1.21670 | val_pearson_r: 0.72184 | train_loss: 0.92478 |
----------------------------------------------------------------------------------------------------



LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]


Testing: |                                                                                        | 0/? [00:00…

[{'test_loss': 1.1926705837249756, 'test_pearson_r': 0.7334587574005127}]