In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import CSVLogger
from torchmetrics import Accuracy, Precision, Recall, F1Score, AUROC
from sklearn.model_selection import train_test_split
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks import EarlyStopping


In [2]:
def fasta_to_ss(input_folder, input_fasta, output):
    
    with open(input_fasta, 'r') as file:
        lines = file.readlines()

    lines_to_write = ''
    for line in lines:
        if line.startswith('>'):
            file_name = line.split()[1]
            file_path = os.path.join(input_folder, file_name)
            with open(file_path, 'r') as f:
                ctlines = f.readlines()
                sequence = ''.join( ctline.split()[1] for ctline in ctlines[1:] )
                pairings = [ ( ctline.split()[0],ctline.split()[4]) for ctline in ctlines[1:]]
        
            pairs = [ a+'-'+b for a,b in pairings if int(b)!=0 ]
            ss = ' '.join(pairs)
            lines_to_write += line.split()[0][1:] +' '+ ss + '\n'
                
    with open(output,'w') as f:
        f.write(lines_to_write)
        
fasta_to_ss('archiveII', 'archiveII.fasta', 'ss.txt')


In [3]:
def make_csv(target_file, input_fasta, split_dir, split_file, output):
    with open( os.path.join(os.getcwd(),split_dir,split_file), 'r') as f:
        split_set = [line.strip() for line in f.readlines()]
    
    with open(target_file,'r') as f:
        lines = f.readlines()
        target_dict = {}
        for line in lines:
            pairs = ' '.join(line.split()[1:])
            target_dict[ line.split()[0] ] = pairs
        
    sequence_dict = {}
    with open(input_fasta,'r') as f:
        for line in f.readlines():
            if line.startswith('>'):
                seq_name = line.split()[0][1:]
            else: 
                sequence = line.strip()
                sequence_dict[seq_name]=sequence
                
    with open( os.path.join(os.getcwd(),split_dir,output),'w') as f:
        for seq in split_set:
            f.write(seq+','+sequence_dict[seq]+','+target_dict[seq]+'\n')
        

In [4]:
make_csv('ss.txt', 'archiveII.fasta', 'data_splits','test.txt', 'test_ss.csv')
make_csv('ss.txt', 'archiveII.fasta', 'data_splits','validation.txt', 'validation_ss.csv')
make_csv('ss.txt', 'archiveII.fasta', 'data_splits','train.txt', 'train_ss.csv')


In [5]:
# dataset, one-hot encoding
class RNADataset(Dataset):
    def __init__(self, file_path,MAX_LENGTH = 500):
        self.data = []
        self.labels = []
        self.MAX_LENGTH = MAX_LENGTH
        with open(file_path, 'r') as f:
            for line in f:
                if line.strip():
                    _,seq,pairs = line.strip().split(',')
                    if len(seq) > self.MAX_LENGTH: 
                        continue
                    self.data.append(self._encode_sequence(seq))
                    try:
                        self.labels.append(self._decode_label(pairs,seq))
                    except ValueError:
                        print(line)

    def _encode_sequence(self, sequence):
        tokenizer = {'A': 0, 'C': 1, 'G': 2, 'U': 3}
        return torch.tensor([tokenizer[c] for c in sequence])
    
    def _decode_label(self, pairs,sequence):
        L = len(sequence)
        a = np.zeros((L,L))
        if pairs != '':
            keys = [ list( map(int,pair.split('-')) ) for pair in pairs.split(' ') ]
            for i,j in keys:
                if j!=0:
                    a[i-1,j-1] = 1
                    a[j-1,i-1] = 1
        return torch.tensor( a, dtype=torch.float32 )

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# lightning datamodule
class RNADataModule(pl.LightningDataModule):
    def __init__(self, train_file, val_file, test_file, batch_size=32):
        super().__init__()
        self.train_file = train_file
        self.val_file = val_file
        self.test_file = test_file
        self.batch_size = batch_size

    def setup(self, stage=None):
        self.train_dataset = RNADataset(self.train_file)
        self.val_dataset = RNADataset(self.val_file)
        self.test_dataset = RNADataset(self.test_file)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size)


In [6]:
testint = 'sequence1818,GGGCCCGUCGUCUAGCCUGGCUAAGAUGCGGGGUACGGGACCCCGUGGUCCGGGGUUCAAAUCCCCGCGGGCCCACCA,1-74 2-73 3-72 4-71 5-70 6-69 7-68 10-27 11-26 12-25 13-24 24-13 25-12 26-11 27-10 29-45 30-44 31-43 32-42 33-41 41-33 42-32 43-31 44-30 45-29 51-67 52-66 53-65 54-64 55-63 63-55 64-54 65-53 66-52 67-51 68-7 69-6 70-5 71-4 72-3 73-2 74-1'
_,sequence,pairs = testint.strip().split(',')
pairs

keys = [ list( map(int,pair.split('-')) ) for pair in pairs.split(' ') ]
L = len(sequence)
a = np.zeros((L,L))
for i,j in keys:
    if j!=0:
        a[i-1,j-1] = 1
        a[j-1,i-1] = 1
torch.tensor( a, dtype=torch.float32 )

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

In [7]:
class ResidualBlock(pl.LightningModule):
    def __init__(self, in_channels, out_channels, kernel_size=3, padding=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        
    def forward(self, x):
        residual = x 
        out = self.batch_norm1(x)
        out = self.conv1(out)
        out = self.relu(out)
        out = self.batch_norm2(x)
        out = self.conv2(out)
        return out + residual
    
class RNASecondaryStructurePredictor(pl.LightningModule):
    def __init__(self, vocab_size, embedding_dim, num_residual_blocks=1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        self.residual_blocks = nn.ModuleList([
            ResidualBlock(in_channels=2*embedding_dim, out_channels=2*embedding_dim)
            for _ in range(num_residual_blocks)
        ])
        self.relu = nn.ReLU()
        self.linear = nn.Linear(2*embedding_dim, 1)
        self.sigmoid = nn.Sigmoid()
        
        # loss fn
        self.loss_fn = nn.BCELoss()
        # additional metrics:
        self.accuracy = Accuracy(task='binary')
        self.precision = Precision(task='binary')
        self.recall = Recall(task='binary')
        self.f1 = F1Score(task='binary')
        self.auc = AUROC(task='binary')
        
    def forward(self, x):
        N, L = x.size()
        # (N,L)->(N, L, E)
        embeddings = self.embedding(x)  
        
        # (N, L, E) -> (N, L, L, 2E), outer concatenetion into pair-embedding representation
        # first (N,L,E)->(N,L,L,E)
        e1 = embeddings.unsqueeze(2).expand(-1, -1, L, -1) 
        e2 = embeddings.unsqueeze(1).expand(-1, L, -1, -1)  
        # concat along last dim, (N,L,L,E)->(N,L,L,2E)
        concatenated = torch.cat((e1,e2), dim=-1)
        
        # (N,L,L,2E) -> (N,2E,L,L), conv2d wants (batch,filters,x,y)
        concatenated = concatenated.permute(0, 3, 1, 2)  
        # relu
        concatenated = self.relu(concatenated)
        for block in self.residual_blocks:
            concatenated = block(concatenated)
        
        # go back to (N,2E,L,L) -> (N,L,L,2E)
        concatenated = concatenated.permute(0,2,3,1)
        # (N,L,L,2E) -> (N,L,L,1)
        logits = self.sigmoid( self.linear(concatenated) )
        return logits

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x).squeeze(-1) 
        loss = self.loss_fn(y_hat, y)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x).squeeze(-1) 
        loss = self.loss_fn(y_hat, y)
        self.log('val_loss', loss)
        
        preds = torch.round(y_hat)
        self.log('val_accuracy', self.accuracy(preds, y))
        self.log('val_precision', self.precision(preds, y))
        self.log('val_recall', self.recall(preds, y))
        self.log('val_f1', self.f1(preds, y))
        self.log('val_auc', self.auc(y_hat, y))
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x).squeeze(-1) 
        loss = self.loss_fn(y_hat, y)
        self.log('test_loss', loss)
        
        preds = torch.round(y_hat)
        self.log('test_accuracy', self.accuracy(preds, y))
        self.log('test_precision', self.precision(preds, y))
        self.log('test_recall', self.recall(preds, y))
        self.log('test_f1', self.f1(preds, y))
        self.log('test_auc', self.auc(y_hat, y))
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)


In [8]:
rna = 'AUCGCUCUGUCGUCCACACUCUAAAAA'
tokenizer = {'A': 0, 'C': 1, 'G': 2, 'U': 3}
x = torch.tensor([tokenizer[c] for c in rna])
x = x.unsqueeze(0)

In [9]:
RNASecondaryStructurePredictor(vocab_size=4,embedding_dim=10)(x).shape

torch.Size([1, 27, 27, 1])

In [10]:
x

tensor([[0, 3, 1, 2, 1, 3, 1, 3, 2, 3, 1, 2, 3, 1, 1, 0, 1, 0, 1, 3, 1, 3, 0, 0,
         0, 0, 0]])

In [11]:
 # File paths
train_file = "data_splits/train_ss.csv"
val_file = "data_splits/validation_ss.csv"
test_file = "data_splits/test_ss.csv"

# Initialize DataModule
data_module = RNADataModule(train_file, val_file, test_file, batch_size=1)


In [12]:
data_module.setup()

In [13]:
test_batch = next(iter(data_module.train_dataloader()))
test_batch

[tensor([[2, 1, 2, 0, 2, 1, 1, 2, 1, 3, 2, 3, 3, 0, 1, 1, 1, 2, 3, 2, 1, 2, 2, 2,
          2, 2, 3, 1, 2, 2, 1, 3, 1, 2, 2, 3, 2, 2, 0, 2, 2, 1, 1, 3, 1, 0, 2, 3,
          2, 2, 3, 2, 1, 1, 2, 1, 3, 2, 3, 0, 2, 3, 2, 3, 2, 2, 2, 2, 3, 2, 3, 3,
          0, 2, 1, 2, 2, 1, 1, 0, 0, 0, 1, 2, 1, 1, 1, 1, 0, 1, 3, 2, 2, 2, 0, 3,
          1, 2, 1, 0, 1, 1, 1, 1, 1, 1, 2, 2, 1, 2, 0, 3, 1, 1, 1, 1, 2, 1, 2, 1,
          2, 2, 0, 2, 2, 1, 2, 2, 2, 0, 2, 2, 0, 3, 1, 2, 1, 3, 2, 2, 0, 2, 0, 3,
          2, 1, 3, 2, 1, 2, 2, 1, 2, 1, 2, 1, 0, 0, 1, 2, 1, 1, 1, 1, 0, 2, 2, 1,
          3, 2, 2, 0, 0, 0, 1, 0, 2, 0, 2, 1, 0, 3, 2, 3, 3, 0, 0, 0, 2, 3, 2, 1,
          1, 1, 2, 1, 3, 2, 1, 2, 3, 3, 1, 1, 2, 1, 2, 2, 3, 2, 2, 2, 0, 1, 0, 0,
          1, 2, 2, 2, 2, 2, 2, 1, 1, 2, 1, 2, 1, 1, 1, 2, 2, 2, 1, 3, 2, 1, 2, 2,
          1, 2, 2, 1, 1, 1, 2, 1, 2, 2, 1, 2, 2, 1, 1, 0, 0, 1, 2, 1, 1, 2, 0, 2,
          1, 3, 3, 3, 3]]),
 tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
          [0., 0., 0.,  ...,

In [14]:
model = RNASecondaryStructurePredictor(vocab_size=4,embedding_dim=10,num_residual_blocks=1)

checkpoint_dir = os.path.join('logs', model.__class__.__name__)
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_callback = ModelCheckpoint(
    monitor="val_loss",      
    mode="min",              
    save_top_k=1,           
    dirpath=checkpoint_dir, 
    filename="{epoch}-{val_loss:.2f}"  
)
early_stopping = EarlyStopping(
    monitor="val_loss",
    mode="min",
    patience=2, 
    verbose=True
)

csv_logger = CSVLogger("logs", name="ss_predict")

trainer = pl.Trainer(
    max_epochs=20,
    callbacks=[checkpoint_callback, early_stopping],
    logger=csv_logger)

trainer.fit(model, datamodule=data_module)
best_model_path = checkpoint_callback.best_model_path
print(f"Best model saved at: {best_model_path}")

best_model = model.load_from_checkpoint(best_model_path)

model_metrics = trainer.test(best_model, datamodule=data_module)
model_metrics

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
  rank_zero_warn(f"Checkpoint directory {dirpath} exists and is not empty.")

   | Name            | Type            | Params
-----------------------------------------------------
0  | embedding       | Embedding       | 40    
1  | residual_blocks | ModuleList      | 7.3 K 
2  | relu            | ReLU            | 0     
3  | linear          | Linear          | 21    
4  | sigmoid         | Sigmoid         | 0     
5  | loss_fn         | BCELoss         | 0     
6  | accuracy        | BinaryAccuracy  | 0     
7  | precision       | BinaryPrecision | 0     
8  | recall          | BinaryRecall    | 0     
9  | f1              | BinaryF1Score   | 0     
10 | auc             | BinaryAUROC     | 0     
-----------------------------------------------------
7.4 K     Trainable params
0         Non-trainable params
7.4 K     Total params
0.030     

Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Metric val_loss improved. New best score: 0.026


Validation: 0it [00:00, ?it/s]

Metric val_loss improved by 0.000 >= min_delta = 0.0. New best score: 0.026


Validation: 0it [00:00, ?it/s]

Metric val_loss improved by 0.000 >= min_delta = 0.0. New best score: 0.026


Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Metric val_loss improved by 0.000 >= min_delta = 0.0. New best score: 0.026


Validation: 0it [00:00, ?it/s]

Metric val_loss improved by 0.000 >= min_delta = 0.0. New best score: 0.026


Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Monitored metric val_loss did not improve in the last 2 records. Best score: 0.026. Signaling Trainer to stop.


Best model saved at: /home/fgloblek/Documents/Programiranje/rna/logs/RNASecondaryStructurePredictor/epoch=5-val_loss=0.03.ckpt


TypeError: RNASecondaryStructurePredictor.__init__() missing 2 required positional arguments: 'vocab_size' and 'embedding_dim'

In [None]:
best_model = model.load_from_checkpoint(best_model_path,vocab_size=4,embedding_dim=10,num_residual_blocks=1)

model_metrics = trainer.test(best_model, datamodule=data_module)
model_metrics

  rank_zero_warn(


Testing: 0it [00:00, ?it/s]



In [208]:
tokenizer = {'A': 0, 'C': 1, 'G': 2, 'U': 3}
sequence = 'ACGUG'
test =  torch.tensor([tokenizer[c] for c in sequence])

model(test.unsqueeze(0)).squeeze(0)

tensor([[[0.0010],
         [0.0047],
         [0.0037],
         [0.0041],
         [0.0033]],

        [[0.0052],
         [0.0037],
         [0.0055],
         [0.0042],
         [0.0018]],

        [[0.0033],
         [0.0050],
         [0.0074],
         [0.0056],
         [0.0037]],

        [[0.0039],
         [0.0038],
         [0.0056],
         [0.0042],
         [0.0025]],

        [[0.0032],
         [0.0014],
         [0.0035],
         [0.0027],
         [0.0028]]], grad_fn=<SqueezeBackward1>)