In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class GeneInteractionModel(nn.Module):

    def __init__(self, hidden_size, num_layers, num_features=24, dropout=0.1):
        super(GeneInteractionModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.c1 = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=128, kernel_size=(2, 3), stride=1, padding=(0, 1)),
            nn.BatchNorm2d(128),
            nn.GELU(),
        )
        self.c2 = nn.Sequential(
            nn.Conv1d(in_channels=128, out_channels=108, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(108),
            nn.GELU(),
            nn.AvgPool1d(kernel_size=2, stride=2),

            nn.Conv1d(in_channels=108, out_channels=108, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(108),
            nn.GELU(),
            nn.AvgPool1d(kernel_size=2, stride=2),

            nn.Conv1d(in_channels=108, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.AvgPool1d(kernel_size=2, stride=2),
        )

        self.r = nn.GRU(128, hidden_size, num_layers, batch_first=True, bidirectional=True)

        self.s = nn.Linear(2 * hidden_size, 12, bias=False)

        self.d = nn.Sequential(
            nn.Linear(num_features, 96, bias=False),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(96, 64, bias=False),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, 128, bias=False)
        )

        self.head = nn.Sequential(
            nn.BatchNorm1d(140),
            nn.Dropout(dropout),
            nn.Linear(140, 1, bias=True),
        )

    def forward(self, g, x):
        g = torch.squeeze(self.c1(g), 2)
        g = self.c2(g)
        g, _ = self.r(torch.transpose(g, 1, 2))
        g = self.s(g[:, -1, :])

        x = self.d(x)

        out = self.head(torch.cat((g, x), dim=1))

        return F.softplus(out)

In [4]:
import sys, os
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F

class DeepCas9Model(nn.Module):
    def __init__(self, filter_size, filter_num, node_1=80, node_2=60):
        super(DeepCas9Model, self).__init__()
        length = 30

        # Define layers using torch.nn
        self.conv1 = nn.Conv2d(4, filter_num[0], kernel_size=(1, filter_size[0]))
        self.pool1 = nn.AvgPool2d(kernel_size=(1, 2), stride=(1, 2))
        self.conv2 = nn.Conv2d(filter_num[0], filter_num[1], kernel_size=(1, filter_size[1]))
        self.pool2 = nn.AvgPool2d(kernel_size=(1, 2), stride=(1, 2))
        self.conv3 = nn.Conv2d(filter_num[1], filter_num[2], kernel_size=(1, filter_size[2]))
        self.pool3 = nn.AvgPool2d(kernel_size=(1, 2), stride=(1, 2))
        self.flatten = nn.Flatten()
        self.dense1 = nn.Linear(filter_num[0] * ((length - filter_size[0]) // 2 + 1), node_1)
        self.dense2 = nn.Linear(filter_num[1] * ((length - filter_size[1]) // 2 + 1), node_2)
        self.output_layer = nn.Linear(filter_num[2] * ((length - filter_size[2]) // 2 + 1), 1)

    def forward(self, inputs):
        x = F.relu(self.conv1(inputs))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        x = self.flatten(x)
        x = F.relu(self.dense1(x))
        x = F.relu(self.dense2(x))
        outputs = self.output_layer(x)
        return outputs


def preprocess_seq(data, seq_length):

    seq_onehot = np.zeros((len(data), 1, seq_length, 4), dtype=float)

    for l in range(len(data)):
        for i in range(seq_length):
            try:
                data[l][i]
            except Exception:
                print(data[l], i, seq_length, len(data))

            if   data[l][i] in "Aa":  seq_onehot[l, 0, i, 0] = 1
            elif data[l][i] in "Cc":  seq_onehot[l, 0, i, 1] = 1
            elif data[l][i] in "Gg":  seq_onehot[l, 0, i, 2] = 1
            elif data[l][i] in "Tt":  seq_onehot[l, 0, i, 3] = 1
            elif data[l][i] in "Xx":  pass
            elif data[l][i] in "Nn.": pass
            else:
                print("[Input Error] Non-ATGC character " + data[l])
                sys.exit()

    return seq_onehot


# Create an instance of the custom model
filter_size = [3, 5, 7]
filter_num  = [100, 70, 40]
node_1 = 80
node_2 = 60

custom_model = DeepCas9Model(filter_size=filter_size, filter_num=filter_num, node_1=node_1, node_2=node_2)

# Define the optimizer and loss function
optimizer = torch.optim.Adam(custom_model.parameters(), lr=0.005)
loss_object = nn.MSELoss()

# Function for training the model
def train_step(inputs, targets):
    optimizer.zero_grad()
    predictions = custom_model(inputs)
    loss = loss_object(predictions, targets)
    loss.backward()
    optimizer.step()
    return loss.item()

# Function for predicting using the model
def predict(inputs):
    with torch.no_grad():
        outputs = custom_model(inputs)
    return outputs.numpy()


In [5]:
df = pd.read_csv('docs/dataset/DeepSpCas9_train.csv')
df

Unnamed: 0,Target_context,indel
0,TTCTGCCTTGTTTCTTTCCTCTCTGGGTCG,24.287805
1,ACGACCTTCAGCTCAGTGACAGTGAGGACA,69.500438
2,AGGACGACGACTACAATAAGCCTCTGGATC,25.994760
3,GCAGCAAACTGACGGAGAACCTTGTGGCCC,57.964590
4,CCGGCAGATATCCGTGAAGGCTCTAGGTAC,39.355020
...,...,...
12827,GGGAATACGACGACCAGAGAGCGCTGGAGA,40.853256
12828,TCATGGATTTCCTGGCTCGGGGACTGGTCT,11.480880
12829,GCCTTGTTTCTTTCCTCTCTGGGTCGGATT,63.861469
12830,TCGCACCTGATAGAGCATGTGACAAGGAGA,51.650932


In [19]:
# Prepare train dataset

random_seed = 0
device = 'cuda' if torch.cuda.is_available() else 'cpu'

torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)
np.random.seed(random_seed)

x_train = preprocess_seq(df['Target_context'], 30)
y_train = df['indel']

x_train = torch.tensor(x_train, dtype=torch.float32, device=device)
y_train = torch.tensor(y_train.to_numpy(), dtype=torch.float32, device=device)

In [18]:
x_train.shape

torch.Size([12832, 1, 30, 4])

In [14]:
data = preprocess_seq(df['Target_context'], 30)
data.shape

(12832, 1, 30, 4)

In [12]:
def input_generator(TEST_X):
    test_batch = 500
    TEST_Z = np.zeros((TEST_X.shape[0], 1), dtype=float)

    for i in range(int(np.ceil(float(TEST_X.shape[0]) / float(test_batch)))):
        TEST_X[i * test_batch:(i + 1) * test_batch]

    list_score = sum(TEST_Z.tolist(), [])

    return list_score

tensor([24.2878, 69.5004, 25.9948,  ..., 63.8615, 51.6509, 40.0191])

In [21]:
# TRAINING

# Create an instance of the custom model
filter_size = [3, 5, 7]
filter_num  = [100, 70, 40]
node_1 = 80
node_2 = 60

model = DeepCas9Model(filter_size=filter_size, filter_num=filter_num, node_1=node_1, node_2=node_2).to(device)

# Define the optimizer and loss function
loss_object = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

# Function for training the model
def train_step(inputs, targets):
    optimizer.zero_grad()
    predictions = model(inputs)
    loss = loss_object(predictions, targets)
    loss.backward()
    optimizer.step()
    return loss.item()

train_ = train_step(x_train, y_train)

RuntimeError: Calculated padded input size per channel: (30 x 1). Kernel size: (1 x 5). Kernel size can't be greater than actual input size