In [2]:
# Using Google Colab for faster computation and GPU utilization
# My Google Drive contains all the required files

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Installing dependencies

!pip install rdkit

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting rdkit
  Downloading rdkit-2023.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (29.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m29.7/29.7 MB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: rdkit
Successfully installed rdkit-2023.3.1


In [4]:
# Importing dependencies

import numpy as np
from rdkit import Chem
from scipy.spatial import distance_matrix
from rdkit.Chem.rdmolops import GetAdjacencyMatrix
import pickle as pkl
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.utils.data as data_utils

In [5]:
# Loading train and test keys

with open ('drive/MyDrive/keys/train_keys.pkl', 'rb') as fp:
  train_keys = pkl.load(fp)
with open ('drive/MyDrive/keys/test_keys.pkl', 'rb') as fp:
  test_keys = pkl.load(fp)

In [6]:
# This cell gives the functions for the one-hot encoded feature vector of the protein molecule

def one_of_k_encoding(x, allowable_set):
    if x not in allowable_set:
        raise Exception("input {0} not in allowable set{1}:".format(x, allowable_set))
    return list(map(lambda s: x == s, allowable_set))

def one_of_k_encoding_unk(x, allowable_set):
    if x not in allowable_set:
        x = allowable_set[-1]
    return list(map(lambda s: x == s, allowable_set))

def atom_feature(m, atom_i, i_donor, i_acceptor):
    atom = m.GetAtomWithIdx(atom_i)
    return np.array(one_of_k_encoding_unk(atom.GetSymbol(),
                                      ['C', 'N', 'O', 'S', 'F', 'P', 'Cl', 'Br', 'B', 'H']) +
                    one_of_k_encoding(atom.GetDegree(), [0, 1, 2, 3, 4, 5, 6]) +
                    one_of_k_encoding_unk(atom.GetTotalNumHs(), [0, 1, 2, 3, 4]) +
                    one_of_k_encoding_unk(atom.GetImplicitValence(), [0, 1, 2, 3, 4, 5, 6]) +
                    [atom.GetIsAromatic()])    # (10, 7, 5, 7, 1)

def get_atom_feature(m):
    n = m.GetNumAtoms()
    H = []
    for i in range(n):
        H.append(atom_feature(m, i, None, None))
    H = np.array(H)
    return H+0

In [7]:
# Creating a Dataset object and custom collate function that can be passed into the DataLoader

class Dataset:
    def __init__(self, keys):
        self.keys = keys
        
    def __len__(self):
        return len(self.keys)

    def __getitem__(self, index):

        key = self.keys[index]

        mol_w = Chem.MolFromPDBFile('drive/MyDrive/wild_pdb/' + key + '_wild.pdb')
        mol_m = Chem.MolFromPDBFile('drive/MyDrive/mutation_pdb/' + key + '_mutation.pdb')
        with open('drive/MyDrive/ddg/'+key, 'rb') as f:
            labels = pkl.load(f)

        # mutation type information
        # n_m = mol_m.GetNumAtoms()
        # c_m = mol_m.GetConformers()[0]
        # P_m = np.array(c_m.GetPositions())  
        # adj_m = GetAdjacencyMatrix(mol_m)+np.eye(n_m)
        H_m = get_atom_feature(mol_m)

        # wild type information
        # n_w = mol_w.GetNumAtoms()
        # c_w = mol_w.GetConformers()[0]
        # P_w = np.array(c_w.GetPositions())  
        # adj_w = GetAdjacencyMatrix(mol_w)+np.eye(n_w)
        H_w = get_atom_feature(mol_w)
        labels=labels
        sample = {'H_m': H_m,\
                  'H_w': H_w,\
                  'labels': labels, \
                  'key': key, \
                  }
        return sample

def collate_fn(batch):
    max_natoms_m = 50
    max_natoms_w = 50
    H_m = np.zeros((len(batch), max_natoms_m, 30))
    H_w = np.zeros((len(batch), max_natoms_w, 30))

    keys = [] 
    labels=[]   
    for i in range(len(batch)):
        natom1 = len(batch[i]['H_m'])
        natom2 = len(batch[i]['H_w'])
        if (natom1 > 50 or natom2 > 50):
          if (natom1 > 50):
            natom1 = 50
            H_m[i,:natom1] = batch[i]['H_m'][:50]
          if(natom2 > 50):
            natom2 = 50
            H_w[i,:natom2] = batch[i]['H_w'][:50]
        else:
          H_m[i,:natom1] = batch[i]['H_m']
          H_w[i,:natom2] = batch[i]['H_w']
        keys.append(batch[i]['key'])
        labels.append(batch[i]['labels'])
    H_m = torch.from_numpy(H_m).float()
    H_w = torch.from_numpy(H_w).float()
    return H_m, H_w, labels, keys

In [8]:
train_dataset = Dataset(train_keys)
test_dataset = Dataset(test_keys)

In [9]:
train_dataloader = DataLoader(train_dataset, 16, shuffle = True, num_workers = 0, collate_fn = collate_fn)
test_dataloader = DataLoader(test_dataset, 16, shuffle = True, num_workers = 0, collate_fn = collate_fn)

In [10]:
for i_batch, sample in enumerate(train_dataloader):
  H_m, H_w, labels, keys = sample
  print(H_w.shape)
  print(H_m.shape)
  break

torch.Size([16, 50, 30])
torch.Size([16, 50, 30])


In [11]:
# Code for the Convolutional Autoencoder
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class CAE(nn.Module):
    def __init__(self):
        super(CAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=50, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv1d(in_channels=32, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv1d(in_channels=16, out_channels=8, kernel_size=3, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(in_channels=8, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=32, out_channels=50, kernel_size=3, padding=1),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

def train(cae, train_loader, test_loader, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        print(epoch)
        running_loss_m = 0.0
        running_loss_w = 0.0

        # Train on H_m from full dataset
        for batch_idx, data in enumerate(train_loader):
            optimizer.zero_grad()
            H_m, H_w, labels, keys = data
            inputs_m = H_m.float().to(device)
            print("-", end='')
            outputs_m = cae(inputs_m)
            loss_m = criterion(outputs_m, inputs_m)
            loss_m.backward()
            optimizer.step()
            running_loss_m += loss_m.item()
        for batch_idx, data in enumerate(test_loader):
            optimizer.zero_grad()
            H_m, H_w, labels, keys = data
            inputs_m = H_m.float().to(device)
            print("-", end='')
            outputs_m = cae(inputs_m)
            loss_m = criterion(outputs_m, inputs_m)
            loss_m.backward()
            optimizer.step()
            running_loss_m += loss_m.item()
        # Train on H_w from full dataset
        for batch_idx, data in enumerate(train_loader):
            optimizer.zero_grad()
            H_m, H_w, labels, keys = data
            inputs_w = H_w.float().to(device)
            print("+", end='')
            outputs_w = cae(inputs_w)
            loss_w = criterion(outputs_w, inputs_w)
            loss_w.backward()
            optimizer.step()
            running_loss_w += loss_w.item()
        for batch_idx, data in enumerate(test_loader):
            optimizer.zero_grad()
            H_m, H_w, labels, keys = data
            inputs_w = H_w.float().to(device)
            print("+", end='')
            outputs_w = cae(inputs_w)
            loss_w = criterion(outputs_w, inputs_w)
            loss_w.backward()
            optimizer.step()
            running_loss_w += loss_w.item()
        print('Epoch {} Loss_m: {:.6f} Loss_w: {:.6f}'.format(epoch+1, running_loss_m/len(train_loader), running_loss_w/len(train_loader)))

In [12]:
print(torch.cuda.is_available())

True


In [13]:
# create a CAE instance and set the device
cae = CAE()
cae = cae.to(device)
# define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(cae.parameters(), lr=0.001)

# train the CAE
num_epochs = 20
train(cae, train_dataloader, test_dataloader, criterion, optimizer, num_epochs)

0
----------------------------------------------------------------------------------------------------------------------------------------++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++Epoch 1 Loss_m: 0.064782 Loss_w: 0.051787
1
----------------------------------------------------------------------------------------------------------------------------------------++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++Epoch 2 Loss_m: 0.044347 Loss_w: 0.042368
2
----------------------------------------------------------------------------------------------------------------------------------------++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++Epoch 3 Loss_m: 0.039509 Loss_w: 0.038107
3
--------------------------------------------------

In [14]:
class LatentSpaceDataset(Dataset):
    def __init__(self):
        self.data = []

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        # return (self.data[index][0].view(-1, 8*30), self.data[index][1])
        return  self.data[index]

    def append(self, x, y):
        self.data.append((x, y))

In [15]:
# Storing the output of the CAE for the whole dataset into a Latent Space Dataset

train_latent_space_dataset = LatentSpaceDataset()
test_latent_space_dataset = LatentSpaceDataset()
for batch_idx, data in enumerate(train_dataloader):
    H_m, H_w, labels, keys = data
    M_in = H_m.float().to(device)
    M_latent = cae.encoder(M_in)
    W_in = H_w.float().to(device)
    W_latent = cae.encoder(W_in)
    for i in range(len(M_latent)):
        diff_latent = M_latent[i] - W_latent[i]
        x = diff_latent.detach()
        y = labels[i]
        train_latent_space_dataset.append(x, y)
for batch_idx, data in enumerate(test_dataloader):
    H_m, H_w, labels, keys = data
    M_in = H_m.float().to(device)
    M_latent = cae.encoder(M_in)
    W_in = H_w.float().to(device)
    W_latent = cae.encoder(W_in)
    for i in range(len(M_latent)):
        diff_latent = M_latent[i] - W_latent[i]
        x = diff_latent.detach()
        y = labels[i]
        test_latent_space_dataset.append(x, y)

In [16]:
print(len(train_latent_space_dataset))
print(len(test_latent_space_dataset))

1944
216


In [17]:
train_latent_space_dataset = [(x, torch.FloatTensor([y])) for (x, y) in train_latent_space_dataset]
test_latent_space_dataset = [(x, torch.FloatTensor([y])) for (x, y) in test_latent_space_dataset]

In [18]:
train_ls_dataloader = DataLoader(train_latent_space_dataset, batch_size=32, shuffle=True)
test_ls_dataloader = DataLoader(test_latent_space_dataset, batch_size=32, shuffle=True)

In [19]:
# Code for the Dense Neural Network / Fully Connected Neural Network

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(8*30, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x):
        x = x.view(-1, 8*30)
        x = nn.functional.relu(self.fc1(x))
        x = nn.functional.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [27]:
# Define the training parameters
learning_rate = 0.001
num_epochs = 10

# Initialize the network and optimizer
net = Net()
net = net.to(device)
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

# Define the loss function as Mean Squared Error (MSE) loss
criterion = nn.MSELoss()

# Run the training loop for a specified number of epochs
for epoch in range(num_epochs):
    list1_train = []
    list2_train = []
    running_loss = 0.0
    for i, data in enumerate(train_ls_dataloader):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = net(inputs.to(device))
        loss = criterion(outputs, labels.to(device))
        labels = labels.data.cpu().numpy()
        outputs = outputs.detach().data.cpu().numpy()
        list1_train = np.append(list1_train, labels)
        list2_train = np.append(list2_train, outputs)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    rp_train = np.corrcoef(list2_train, list1_train)[0,1]
    # x = np.array(list1_test).reshape(-1,1)
    # y = np.array(list2_test).reshape(-1,1)
    # rmse = np.sqrt(((y - x) ** 2).mean())
    # Print the average loss for the epoch
    print('Epoch [%d/%d], Loss: %.4f, PCC_Train: %.4f' % (epoch+1, num_epochs, running_loss/len(train_ls_dataloader), rp_train))


Epoch [1/10], Loss: 2.1599, PCC_Train: 0.2093
Epoch [2/10], Loss: 1.7841, PCC_Train: 0.4387
Epoch [3/10], Loss: 1.6038, PCC_Train: 0.5263
Epoch [4/10], Loss: 1.4750, PCC_Train: 0.5774
Epoch [5/10], Loss: 1.3571, PCC_Train: 0.6214
Epoch [6/10], Loss: 1.2013, PCC_Train: 0.6765
Epoch [7/10], Loss: 1.1082, PCC_Train: 0.7059
Epoch [8/10], Loss: 1.0363, PCC_Train: 0.7299
Epoch [9/10], Loss: 0.9061, PCC_Train: 0.7686
Epoch [10/10], Loss: 0.8249, PCC_Train: 0.7918


In [28]:
# Displaying the Pearson Correlation Coefficient between the actual and predicted value and the RMSE of the predictions from the actual value

list1_test = []
list2_test = []
for batch_idx, data in enumerate(test_ls_dataloader):
    inputs, labels = data
    outputs = net(inputs.to(device))
    labels = labels.data.cpu().numpy()
    outputs = outputs.detach().data.cpu().numpy()
    list1_test = np.append(list1_test, labels)
    list2_test = np.append(list2_test, outputs)
rp_test = np.corrcoef(list2_test, list1_test)[0,1]
x = np.array(list1_test).reshape(-1,1)
y = np.array(list2_test).reshape(-1,1)
rmse = np.sqrt(((y - x) ** 2).mean())
print('PCC_Test: %.4f, RMSE: %.4f' % (rp_test, rmse))

PCC_Test: 0.5353, RMSE: 1.1877
