In [None]:
import os
import random
import torch
import numpy as np
import pandas as pd
import copy

seed = 12345

os.environ['PYTHONHASHSEED']=str(seed)
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)
torch.cuda.manual_seed_all(seed)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=8):
        super(ChannelAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)

        self.fc1 = nn.Conv2d(in_planes, in_planes // ratio, kernel_size=1, bias=False)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Conv2d(in_planes // ratio, in_planes, kernel_size=1, bias=False)
        
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = self.fc2(self.relu1(self.fc1(self.avg_pool(x))))
        max_out = self.fc2(self.relu1(self.fc1(self.max_pool(x))))
        out = avg_out + max_out
        return self.sigmoid(out)

class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()

        assert kernel_size in (3, 7), 'Kernel size must be 3 or 7'
        padding = 3 if kernel_size == 7 else 1

        self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv1(x)
        return self.sigmoid(x)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiBranchConv(nn.Module):
    def __init__(self, output_channels=16, attention=True):
        super(MultiBranchConv, self).__init__()
        
        self.branch1 = nn.Conv2d(in_channels=1, out_channels=output_channels, kernel_size=(1, 16))
        self.branch2 = nn.Conv2d(in_channels=1, out_channels=output_channels, kernel_size=(2, 16))
        self.branch3 = nn.Conv2d(in_channels=1, out_channels=output_channels, kernel_size=(3, 16))
        self.branch4 = nn.Conv2d(in_channels=1, out_channels=output_channels, kernel_size=(4, 16))
        
        self.attn = attention
        self.ca1 = ChannelAttention(output_channels)
        self.ca2 = ChannelAttention(output_channels)
        self.ca3 = ChannelAttention(output_channels)
        self.ca4 = ChannelAttention(output_channels)
        self.sa1 = SpatialAttention(kernel_size=7)
        self.sa2 = SpatialAttention(kernel_size=7)
        self.sa3 = SpatialAttention(kernel_size=7)
        self.sa4 = SpatialAttention(kernel_size=7)

    def forward(self, x):

        # Branch 1: No padding needed
        out1 = F.relu(self.branch1(x))

        # Branch 2: Pad to the right (end) 
        x_pad2 = F.pad(x, (0, 0, 0, 1))  # Padding only at the end (on the height dimension)
        out2 = F.relu(self.branch2(x_pad2))

        # Branch 3: Pad one row at the beginning and one at the end
        x_pad3 = F.pad(x, (0, 0, 1, 1))  # One padding at the start and one at the end
        out3 = F.relu(self.branch3(x_pad3))

        # Branch 4: Pad two rows at the beginning and one at the end
        x_pad4 = F.pad(x, (0, 0, 1, 2))  # One at the start, Two at the end
        out4 = F.relu(self.branch4(x_pad4))

        # Apply attention if enabled
        if self.attn:
            out1 = out1 * self.ca1(out1)
            out1 = out1 * self.sa1(out1)

            out2 = out2 * self.ca2(out2)
            out2 = out2 * self.sa2(out2)

            out3 = out3 * self.ca3(out3)
            out3 = out3 * self.sa3(out3)

            out4 = out4 * self.ca4(out4)
            out4 = out4 * self.sa4(out4)

        # Remove last dimension of size 1 (from Conv2D)
        out1 = out1.squeeze(-1)
        out2 = out2.squeeze(-1)
        out3 = out3.squeeze(-1)
        out4 = out4.squeeze(-1)

        # Transpose for concatenation later
        out1 = out1.transpose(1, 2)
        out2 = out2.transpose(1, 2)
        out3 = out3.transpose(1, 2)
        out4 = out4.transpose(1, 2)

        # Concatenate along the last dimension
        output = torch.cat((out1, out2, out3, out4), dim=-1)
        
        return output


In [None]:
import torch
import torch.nn as nn
import math

class CRISPRTransformerModel(nn.Module):
    def __init__(self, config):
        super(CRISPRTransformerModel, self).__init__()
        
        # Model parameters
        self.input_dim = 64
        self.num_layers = config.get("num_layers", 2)
        self.num_heads = config.get("num_heads", 8)
        self.dropout_prob = config["dropout_prob"]
        self.number_hidden_layers = config["number_hidder_layers"]
        self.seq_length = config.get("seq_length", 20)
        
        
        # Positional encoding
        self.pos_encoder = nn.Parameter(torch.randn(1, self.seq_length, self.input_dim))
        
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=self.input_dim,
            nhead=self.num_heads,
            dim_feedforward=self.input_dim * 4,
            dropout=self.dropout_prob,
            batch_first=True,
            norm_first=True  
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=self.num_layers
        )
        
        # Convolutional preprocessing
        self.conv = MultiBranchConv(attention=config["attn"])
        
        # Hidden layers
        self.hidden_layers = []
        start_size = self.seq_length*self.input_dim
        for i in range(self.number_hidden_layers):
            layer = nn.Sequential(
                nn.Linear(start_size, start_size // 2),
                nn.GELU(),
                nn.Dropout(self.dropout_prob)
            )
            self.hidden_layers.append(layer)
            start_size = start_size // 2
        self.hidden_layers = nn.ModuleList(self.hidden_layers)
        
        # Output layer
        self.output = nn.Linear(start_size, 2)

    def forward(self, x, src_mask=None):
        # Apply conv layer
        x = self.conv(x)  # Shape: [batch_size, seq_len, input_dim]
        
        # Add positional encoding
        x = x + self.pos_encoder
        
        # Apply transformer encoder
        x = self.transformer_encoder(x)
        
        x = x.view(x.size(0), -1)
        
        # Apply hidden layers
        for layer in self.hidden_layers:
            x = layer(x)
        
        x = self.output(x)
        
        return x

In [7]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

class TrainerDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs = torch.tensor(inputs, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
        self.targets = torch.tensor(targets, dtype=torch.long)

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

In [8]:
def tester(model, test_x, test_y):
    test_dataset = TrainerDataset(test_x, test_y)
    test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)
    model.eval()
    results = []
    true_labels = []
    with torch.no_grad():
        for test_features, test_labels in test_dataloader:
            outputs = model(test_features.to(device)).detach().to("cpu")
            results.extend(outputs)
            true_labels.extend(test_labels)
    return true_labels, results

In [9]:
class Stats:
    def __init__(self):
        self.acc = 0
        self.pre = 0
        self.re = 0
        self.f1 = 0
        self.roc = 0
        self.prc = 0
        self.tn = 0
        self.fp = 0
        self.fn = 0
        self.tp = 0
    def print(self):
        print('Accuracy: %.4f' %self.acc)
        print('Precision: %.4f' %self.pre)
        print('Recall: %.4f' %self.re)
        print('F1 Score: %.4f' %self.f1)
        print('ROC: %.4f' %self.roc)
        print('PR AUC: %.4f' %self.prc)
        print("Confusion Matrix")
        print(self.tn, "\t", self.fp)
        print(self.fn, "\t", self.tp)

In [10]:
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import auc
from sklearn.metrics import accuracy_score

def eval_matrices(model, test_x, test_y, debug = True):
    true_y, results = tester(model, test_x, test_y)
    predictions = [torch.nn.functional.softmax(r) for r in results]
    pred_y = np.array([y[1].item() for y in predictions])
    pred_y_list = []
    test_y = np.array([y.item() for y in true_y])

    for x in pred_y:
        if(x>0.5):
            pred_y_list.append(1)
        else:
            pred_y_list.append(0)

    pred_y_list = np.array(pred_y_list)
    tn, fp, fn, tp = confusion_matrix(test_y, pred_y_list).ravel()
    precision, recall, _ = precision_recall_curve(test_y, pred_y)
    auc_score = auc(recall, precision)
    acc = accuracy_score(test_y, pred_y_list)

    pr = -1
    re = -1
    f1 = -1
    try:
        pr = tp / (tp+fp)
        re = tp / (tp+fn)
        f1 = 2*pr*re / (pr+re)
    except:
        f1 = -1

    stats = Stats()
    stats.acc = acc
    stats.pre = pr
    stats.re = re
    stats.f1 = f1
    stats.roc = roc_auc_score(test_y, pred_y)
    stats.prc = auc_score
    stats.tn = tn
    stats.fp = fp
    stats.fn = fn
    stats.tp = tp

    if debug:
        print('Accuracy: %.4f' %acc)
        print('Precision: %.4f' %pr)
        print('Recall: %.4f' %re)
        print('F1 Score: %.4f' %f1)
        print('ROC:',roc_auc_score(test_y, pred_y))
        print('PR AUC: %.4f' % auc_score)

        # print(classification_report(test_y, pred_y_list, digits=4))
        # print("Confusion Matrix")
        # print(confusion_matrix(test_y, pred_y_list))

    return stats

In [11]:
import numpy as np
import pandas as pd

def one_hot_features(df):
    print("Generating One hot encoding features...")
    
    # Nucleotides and possible pairs
    nucleotides = ['A', 'T', 'G', 'C']
    pairs = [f'{n1}{n2}' for n1 in nucleotides for n2 in nucleotides]  # 16 possible pairs
    
    # Initialize the pairwise feature matrix (rows = positions, columns = 16 pairs)
    pairwise_features = np.zeros((len(df), 20, len(pairs)))  # (samples, positions=20, pairs=16)
    
    # Loop through each row in the DataFrame and populate the pairwise features
    for idx, row in df.iterrows():
        on_seq = row['On']
        off_seq = row['Off']
        
        for pos in range(20):  # Loop through positions 1 to 20
            pair = on_seq[pos] + off_seq[pos]  # Create the pair from the same position in both sequences
            if pair in pairs:
                pair_idx = pairs.index(pair)  # Get the index of the pair
                pairwise_features[idx, pos, pair_idx] = 1  # Set the feature value to 1
    
    # Return a DataFrame with the pairwise features
    # Reshape to (len(df), 20, 16) as the final output
    return pairwise_features


In [None]:
def eval(model):
    print("Circleseq")
    data_path = "datasets/circleseq_all.csv"
    test_data = pd.read_csv(data_path)
    test_x = one_hot_features(test_data)
    test_y = test_data['Active'].to_numpy()
    stats = eval_matrices(model, test_x, test_y)

    print("surroseq")
    data_path = "datasets/surroseq.csv"
    test_data = pd.read_csv(data_path)
    test_x = one_hot_features(test_data)
    test_y = test_data['Active'].to_numpy()
    stats = eval_matrices(model, test_x, test_y)

    print("guideseq")
    data_path = "datasets/guideseq.csv"
    test_data = pd.read_csv(data_path)
    test_x = one_hot_features(test_data)
    test_y = test_data['Active'].to_numpy()
    stats = eval_matrices(model, test_x, test_y)

    print("ttiss")
    data_path = "datasets/ttiss.csv"
    test_data = pd.read_csv(data_path)
    test_x = one_hot_features(test_data)
    test_y = test_data['Active'].to_numpy()
    stats = eval_matrices(model, test_x, test_y)

    

In [13]:
import torch
from torch.utils.data import DataLoader

def train_model(model, train_loader, optimizer, scheduler, criterion, device, num_epochs):
    model = model.to(device)
    history = {'train_loss': []}
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        
        # Training loop
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Loss tracking
            train_loss += loss.item()
        
        # Average loss for the epoch
        train_loss /= len(train_loader)
        history['train_loss'].append(train_loss)
        
        print(f"Epoch {epoch + 1}/{num_epochs} | Train Loss: {train_loss:.4f}")
        scheduler.step()
    
    return model, history

In [None]:
def trainer(config, train_x, train_y):
    seed = 12345
    os.environ['PYTHONHASHSEED']=str(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    train_dataset = TrainerDataset(train_x, train_y)
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)

    model = CRISPRTransformerModel(config)
    optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
    class_weights = torch.tensor([1.0, config['pos_weight']]).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    
    trained_model, history = train_model(model, train_loader, optimizer, scheduler, criterion, device, config['epochs'])
    return trained_model, history

In [None]:
from sklearn.model_selection import train_test_split

# Load dataset
data_path = "datasets/changeseq_siteseq.csv"
train_data = pd.read_csv(data_path)
train_x = one_hot_features(train_data)
train_y = train_data['Active'].to_numpy()

Generating One hot encoding features...


In [16]:
config = {
    'num_layers': 2, 
    'num_heads': 4, 
    'number_hidder_layers': 2, 
    'dropout_prob': 0.2, 
    'batch_size': 128, 
    'epochs': 50, 
    'learning_rate': 0.001, 
    'pos_weight': 30, 
    'attn': False
}
model, history = trainer(config, train_x, train_y)
eval(model)



Epoch 1/50 | Train Loss: 0.2372
Epoch 2/50 | Train Loss: 0.1883
Epoch 3/50 | Train Loss: 0.1733
Epoch 4/50 | Train Loss: 0.1625
Epoch 5/50 | Train Loss: 0.1537
Epoch 6/50 | Train Loss: 0.1448
Epoch 7/50 | Train Loss: 0.1364
Epoch 8/50 | Train Loss: 0.1310
Epoch 9/50 | Train Loss: 0.1250
Epoch 10/50 | Train Loss: 0.1224
Epoch 11/50 | Train Loss: 0.1594
Epoch 12/50 | Train Loss: 0.1607
Epoch 13/50 | Train Loss: 0.1605
Epoch 14/50 | Train Loss: 0.1563
Epoch 15/50 | Train Loss: 0.1552
Epoch 16/50 | Train Loss: 0.1551
Epoch 17/50 | Train Loss: 0.1498
Epoch 18/50 | Train Loss: 0.1499
Epoch 19/50 | Train Loss: 0.1459
Epoch 20/50 | Train Loss: 0.1387
Epoch 21/50 | Train Loss: 0.1350
Epoch 22/50 | Train Loss: 0.1312
Epoch 23/50 | Train Loss: 0.1280
Epoch 24/50 | Train Loss: 0.1237
Epoch 25/50 | Train Loss: 0.1209
Epoch 26/50 | Train Loss: 0.1166
Epoch 27/50 | Train Loss: 0.1143
Epoch 28/50 | Train Loss: 0.1122
Epoch 29/50 | Train Loss: 0.1116
Epoch 30/50 | Train Loss: 0.1109
Epoch 31/50 | Train

  predictions = [torch.nn.functional.softmax(r) for r in results]


Accuracy: 0.9772
Precision: 0.5261
Recall: 0.6930
F1 Score: 0.5981
ROC: 0.9706498645845513
PR AUC: 0.6366
surroseq
Generating One hot encoding features...


  predictions = [torch.nn.functional.softmax(r) for r in results]


Accuracy: 0.6306
Precision: 0.1972
Recall: 0.6752
F1 Score: 0.3053
ROC: 0.717570604726568
PR AUC: 0.4767
guideseq
Generating One hot encoding features...


  predictions = [torch.nn.functional.softmax(r) for r in results]


Accuracy: 0.9795
Precision: 0.0549
Recall: 0.9649
F1 Score: 0.1039
ROC: 0.9944579290851235
PR AUC: 0.5227
ttiss
Generating One hot encoding features...


  predictions = [torch.nn.functional.softmax(r) for r in results]


Accuracy: 0.9902
Precision: 0.0977
Recall: 0.7427
F1 Score: 0.1727
ROC: 0.9730377878558901
PR AUC: 0.4207
