In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from collections import Counter
from rich import print
from sklearn.metrics import precision_score, recall_score, mean_absolute_error


class GenomicTokenizer:
    def __init__(self, ngram=3, stride=1):
        self.ngram = ngram
        self.stride = stride
    def tokenize(self, t):
        t = t.upper()
        if self.ngram == 1:
            toks = list(t)
        else:
            toks = [t[i:i+self.ngram] for i in range(0, len(t), self.stride) if len(t[i:i+self.ngram]) == self.ngram]
        if len(toks[-1]) < self.ngram:
            toks = toks[:-1]
        return toks


class GenomicVocab:
    def __init__(self, itos):
        self.itos = itos
        self.stoi = {v:k for k,v in enumerate(self.itos)}
        
    @classmethod
    def create(cls, tokens, max_vocab, min_freq):
        freq = Counter(tokens)
        itos = ['<pad>', '<cls>'] + [o for o,c in freq.most_common(max_vocab-1) if c >= min_freq]
        return cls(itos)


class SiRNADataset(Dataset):
    def __init__(self, df, columns, vocab, tokenizer, max_len):
        self.df = df
        self.columns = columns
        self.vocab = vocab
        self.tokenizer = tokenizer
        # self.bert_tokenizer = bert_tokenizer
        self.max_length = max_len
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        ########################
        # Tokenize and encode
        ########################
        # ['siRNA_antisense_seq', 'modified_siRNA_antisense_seq_list']
        # seq0 = [str(row['gene_target_symbol_name'])]
        seq1 = ['<cls>'] + self.tokenizer.tokenize(row['siRNA_antisense_seq'])
        seq2 = ['<cls>'] + row['modified_siRNA_antisense_seq_list'].split()
        
        seq1 = seq1[:max_len] + ['<pad>']*(max_len-len(seq1))
        seq2 = seq2[:max_len] + ['<pad>']*(max_len-len(seq2))
        
        # print(seq1+seq2+seq3)
        encoded = [
            torch.tensor(
                [self.vocab.stoi.get(token, 0) for token in seq1], # Use 0 (pad) for unknown tokens
                dtype=torch.int
            ),
            torch.tensor(
                [self.vocab.stoi.get(token, 0) for token in seq2], # Use 0 (pad) for unknown tokens
                dtype=torch.int
            )
        ]
        
        target = torch.tensor(row['mRNA_remaining_pct'], dtype=torch.float)
        
        return(encoded, target)


class SiRNATransformerModel(nn.Module):
    def __init__(
        self,
        n_tokens: int,
        d_embed: int=200,
        n_heads: int=4,
        d_feedforward: int=256,
        n_layers: int=3,
        dropout: float=0.5
    ):
        super().__init__()
        self.d_embed = d_embed
        self.embedding = nn.Embedding(n_tokens, d_embed, padding_idx=0)
        self.dropout = nn.Dropout(dropout)
    ##########################
    # Model 2: Transformer Encoder
    ##########################
        transformer_encoder_layer = nn.TransformerEncoderLayer(d_model=d_embed, nhead=n_heads, dim_feedforward=d_feedforward, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(transformer_encoder_layer, num_layers=n_layers)
        self.linear = nn.Linear(d_embed*2, 1)
    def forward(self, input):
        embedded = [self.embedding(seq) for seq in input]
        outputs = []
        for embed in embedded:
            x = self.transformer_encoder(embed)
            # x = self.dropout(x.mean(dim=1)) # mean pooling
            x = self.dropout(x[:, 0, :])  # Use the first hidden state
            outputs.append(x)
        x = torch.cat(outputs, dim=1)
        x = self.linear(x)
        return(x.squeeze())
    ##########################
    
    ##########################
    # Model 1: GRU
    ##########################
    #     self.gru = nn.GRU(d_embed, d_feedforward, n_layers, bidirectional=True, batch_first=True, dropout=dropout)
    #     self.linear = nn.Linear(d_feedforward*4, 1)
    # def forward(self, input):
    #     embedded = [self.embedding(seq) for seq in input]
    #     outputs = []
    #     for embed in embedded:
    #         x, _ = self.gru(embed)
    #         x = self.dropout(x[:, -1, :])  # Use last hidden state
    #         outputs.append(x)
    #     x = torch.cat(outputs, dim=1)
    #     x = self.linear(x)
    #     return(x.squeeze())
    ##########################


def calculate_metrics(y_true, y_pred, threshold=30):
    mae = np.mean(np.abs(y_true - y_pred))

    y_true_binary = (y_true < threshold).astype(int)
    y_pred_binary = (y_pred < threshold).astype(int)

    mask = (y_pred >= 0) & (y_pred <= threshold)
    range_mae = mean_absolute_error(y_true[mask], y_pred[mask]) if mask.sum() > 0 else 100
    
    precision = precision_score(y_true_binary, y_pred_binary, average='binary')
    recall = recall_score(y_true_binary, y_pred_binary, average='binary')
    f1 = 2 * precision * recall / (precision + recall)
    score = (1 - mae / 100) * 0.5 + (1 - range_mae / 100) * f1 * 0.5
    return score

def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler=None, num_epochs=50, device='cuda'):
    model.to(device)
    best_score = -float('inf')
    best_model = None
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for x, y in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
            x = [temp.to(device) for temp in x]
            y.to(device)
            
            optimizer.zero_grad()
            outputs = model(x)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        model.eval()
        val_loss = 0
        val_preds = []
        val_targets = []
        
        with torch.no_grad():
            for x, y in val_loader:
                outputs = model(x)
                loss = criterion(outputs, y)
                val_loss += loss.item()
                val_preds.extend(outputs.cpu().numpy())
                val_targets.extend(y.cpu().numpy())
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        
        val_preds = np.array(val_preds)
        val_targets = np.array(val_targets)
        score = calculate_metrics(val_targets, val_preds)
        
        print(f'Epoch {epoch+1}/{num_epochs}')
        print(f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
        print(f'Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}')
        print(f'Validation Score: {score:.4f}')
        
        if score > best_score:
            best_score = score
            best_model = model.state_dict().copy()
            print(f'New best model found with score: {best_score:.4f}')
        if not scheduler is None:
            scheduler.step()
    
    return best_model




In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Load data
train_data = pd.read_csv('train_data.csv')
test_data = pd.read_csv('sample_submission.csv')

columns = ['siRNA_antisense_seq', 'modified_siRNA_antisense_seq_list']
train_data.dropna(subset=columns + ['mRNA_remaining_pct'], inplace=True)
train_data, val_data = train_test_split(train_data, test_size=0.1, random_state=42)

# Create vocabulary
tokenizer = GenomicTokenizer(ngram=3, stride=1)

all_tokens = []
for seq in train_data['siRNA_antisense_seq']:
    all_tokens.extend(tokenizer.tokenize(seq))
for seq in train_data['modified_siRNA_antisense_seq_list']:
    all_tokens.extend(seq.split())
# for seq in train_data['gene_target_symbol_name']:
#     all_tokens.append(seq)

vocab = GenomicVocab.create(all_tokens, max_vocab=10000, min_freq=1)
print('Vocabulary Size: '+str(len(vocab.itos)))
# Find max sequence length
max_len = max(max(len(seq.split()) if ' ' in seq else len(tokenizer.tokenize(seq)) 
                  for seq in train_data[col]) for col in columns)

# Create datasets
train_dataset = SiRNADataset(train_data, columns, vocab, tokenizer, max_len)
val_dataset = SiRNADataset(val_data, columns, vocab, tokenizer, max_len)
test_dataset = SiRNADataset(test_data, columns, vocab, tokenizer, max_len)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)


In [None]:

# Initialize model
model = SiRNATransformerModel(
    n_tokens=len(vocab.itos),
    d_embed=200,
    n_heads=2,
    d_feedforward=256,
    n_layers=3,
    dropout=0.5
)
criterion = nn.MSELoss()


In [None]:

model_optimizer = optim.Adam(model.parameters(), lr=0.001)
optim_scheduler = None
# optim_scheduler = optim.lr_scheduler.StepLR(model_optimizer, step_size=1, gamma=0.95)

training_epochs = 300
train_model(model, train_loader, val_loader, criterion, model_optimizer, optim_scheduler, training_epochs, device)
print("Finished training.")


In [None]:
import csv
import os

predictions = []
for test_inputs, targets in tqdm(test_loader):
    inputs = [x.to(device) for x in test_inputs]
    outputs = model(inputs)
    predictions.extend(outputs.detach().cpu().numpy())
print("Finished get the prediction output.")

input_file = 'sample_submission.csv'
output_file = 'processed_submission.csv'

# Check if the output file exists and remove it if it does
if os.path.exists(output_file):
    os.remove(output_file)
with open(input_file, mode='r') as infile, open(output_file, mode='w', newline='') as outfile:
    reader = csv.DictReader(infile)
    fieldnames = reader.fieldnames
    writer = csv.DictWriter(outfile, fieldnames=fieldnames)

    writer.writeheader()
    for i, row in enumerate(reader):
        row['mRNA_remaining_pct'] = predictions[i]
        writer.writerow(row)
print("Finished save outputs to result file(processed_submission.csv).")
