In [125]:
import numpy as np
import torch
import os
import json
import pandas as pd
from sklearn.model_selection import train_test_split

## Train-test split

In [129]:
db = 'test_db/single_near_mutant_CYP3A4_db.csv'
df = pd.read_csv(db)
train_data_df, test_data_df = train_test_split(df, test_size=0.33, random_state=42)

## Get all the Mutein code for training data

In [137]:
#gonna iterate over all the rows in training data, concatenate mt_code with ligand cid and _complex string

prefix = 'protein_ligand_pdb/xml'
lst = []

for index, row in train_data_df.iterrows():
    mt_code = row[1]
    cid = row[14]
    path_to_xml = f'{prefix}/{mt_code}_{cid}_complex/report.xml'
    lst.append(path_to_xml)

In [139]:
os.listdir('sample_data')

[]

# report all the dims of existing adj and vertex feature matrix

In [13]:
import torch
import torch.nn.functional as F
import torch.optim as optim

class GraphNN_Attention(torch.nn.Module):
    
    def __init__(self, embedding_dim=2, non_linearity=torch.nn.Sigmoid()):
        
        super(GraphNN_Attention, self).__init__()
        self.embedding_dim = embedding_dim #meaning embedding vector by default is in R^2
        #self.gpu = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        self.W_1 = torch.randn(11, self.embedding_dim, requires_grad=True)
        self.Y_1 = torch.randn(11, self.embedding_dim, requires_grad=True)
        self.W_cost = torch.randn(self.embedding_dim, 1, requires_grad=True)
        
        self.non_linearity = non_linearity
        
        self.alpha = torch.nn.Parameter(torch.abs(torch.randn(1, requires_grad=True)))
        self.W_attent = torch.randn(11, 1, requires_grad=True)
        self.complex_embedding_map = {}  #take average of rows of embedding_matrix
        self.atom_embedding_map = {}
        
    def attention_coeff(self, euclid_dist_v_i, i_neighbour_vec):
        #ori_adj_mat contains original euclidean distance info
        i_neighbour_vec = i_neighbour_vec.view(1, i_neighbour_vec.shape[0])
        return (1/euclid_dist_v_i) * torch.mm(i_neighbour_vec, self.W_attent)
    
    def attention_mechanism(self, ori_adj_mat, feature_mat):
        
        attention_coeff_mat = torch.zeros(ori_adj_mat.shape) 
        processed_set = set()
        softmax = torch.nn.Softmax(dim=1)
        
        for v in range(ori_adj_mat.shape[0]):

            for i in range(ori_adj_mat.shape[1]):
                
                if v == i:
                    processed_set.add((v, i))
                    continue
                
                attention_coeff_v_i = self.attention_coeff(ori_adj_mat[v,i], feature_mat[i,:])
                
                attention_coeff_mat[v, i] = attention_coeff_v_i
                processed_set.add((v, i))
        
        attention_mat = softmax(attention_coeff_mat)
        
        return attention_mat

    def forward(self, adj_mat, feature_mat, complex_code):
        """
            adj_mat (torch tensor): adjacency matrix
            feature_mat (torch tensor): feature_matrix for all atoms
            complex_code (str): cyp_ligand code
            return a tensor of predicted probabilities
            
        """
        #graph nn embedding procedure
        num_neighbours = adj_mat.shape[0] - 1 #since adj mat is square and symmetric
        whitened_adj_mat = torch.where(adj_mat <= self.alpha, 0, 1).float()
        attent_mat = self.attention_mechanism(whitened_adj_mat, feature_mat)
        
        avr_neighbour_feature_linear_comb = (1/num_neighbours) * (torch.mm(attent_mat, feature_mat))
        h_v_matrix = self.non_linearity(torch.mm(avr_neighbour_feature_linear_comb, self.W_1) + torch.mm(feature_mat,self.Y_1))
        sigmoid = torch.nn.Sigmoid()
        prob_tensor = sigmoid(torch.matmul(h_v_matrix, self.W_cost))
        
        ## persistence
        #each row of h_v_matrix == atom embedding, entire matrix == for all atoms
        self.atom_embedding_map[complex_code] = h_v_matrix 
        self.complex_embedding_map[complex_code] = torch.mean(h_v_matrix, dim=0)
        
        return prob_tensor

In [70]:
import random

class CNN(torch.nn.Module):
    
    def __init__ (self, output_dim=11):
        
        super(CNN, self).__init__()
        
        self.output_dim = output_dim
        standard_input_shape = (50, 50)
        
        self.adaptive_layer = torch.nn.AdaptiveAvgPool2d(standard_input_shape)
        
        self.first_conv2d = torch.nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(5,5))
        self.second_conv2d = torch.nn.Conv2d(in_channels=1, out_channels=1, kernel_size=(5,5))
        
        self.elu = torch.nn.ELU(inplace=False)
        self.avg_pooling = torch.nn.AvgPool2d(5, stride=1)
        
        self.contact_map_embeddings_map = {}
        self.fnn = torch.nn.Linear(in_features=361, out_features=self.output_dim)
        
    def convolution(self, tensor_2d_contact_map):
        
        contact_map_tensor = tensor_2d_contact_map.reshape(1, 1, tensor_2d_contact_map.shape[0], tensor_2d_contact_map.shape[1])
        
        ### standardisation routine
        standardised_contact_map_tensor = self.adaptive_layer(contact_map_tensor).float()
        
        ### convolution routine
        first_conv_out = self.first_conv2d(standardised_contact_map_tensor)
        first_elu_out = self.elu(first_conv_out)
        
        ### 2nd convolution routine
        second_conv_out = self.second_conv2d(first_elu_out)
        second_elu_out = self.elu(second_conv_out)
        
        ### average pooling
        avg_pooled_img = self.avg_pooling(second_elu_out)
        
        ### downsampling
        down_sampled_img = torch.nn.functional.interpolate(avg_pooled_img, scale_factor=0.5, recompute_scale_factor=True)
        
        ### feed-forward network (do a linear transformation)
        vectorised_img = down_sampled_img.view(1, -1)
        num_img_features = vectorised_img.shape[-1]
        
        img_vector = self.fnn(vectorised_img)
        
        return img_vector
        
    def forward(self, tensor_2d_contact_map, mt_code):
        
        contact_map_embeddings = self.convolution(tensor_2d_contact_map)
        self.contact_map_embeddings_map[mt_code] = contact_map_embeddings
        
        return contact_map_embeddings 

In [75]:
import random

class RegressionNN(torch.nn.Module):
    def __init__(self, dim=(1,11)):
        """
            dim (int) should be the shape of contact_complex_embedding
        """
        super(RegressionNN, self).__init__()
        self.first_out = random.randint(1, 10)
        self.dim = dim
        self.first_layer = torch.nn.Linear(in_features=self.dim[-1], out_features=self.first_out)
        self.first_non_linear = torch.nn.Tanh()
        self.second_layer = torch.nn.Linear(in_features=self.first_out, out_features=1)
        
    def forward(self, contact_complex_embedding):
        first_layer_out = self.first_layer(contact_complex_embedding)
        first_non_linear_out = self.first_non_linear(first_layer_out)
        result = self.second_layer(first_non_linear_out)
        return result

## Sample run

In [122]:
import numpy as np
import pandas as pd

## hyper-parameters
data_base = 'test_db/single_near_mutant_CYP3A4_db.csv'
db_df = pd.read_csv(data_base)
data_dir = 'sample_data'

epochs = 10000
learning_rate = 0.02

loss_function = torch.nn.BCELoss()
regression_loss = torch.nn.MSELoss()

## model
gnn = GraphNN_Attention(embedding_dim=11)
cnn = CNN()
regressor = RegressionNN()

gnn_params = [gnn.W_1, gnn.Y_1, gnn.W_cost, gnn.W_attent, gnn.alpha]
cnn_params = list(cnn.parameters())
regressor_params = list(regressor.parameters())
all_params = gnn_params + cnn_params + regressor_params

optimiser = torch.optim.Adam(all_params, lr=learning_rate)

for epoch in range(epochs):
    true_activities = []
    pred_activities = []
    total_loss = 0
    
    for folder in os.listdir(data_dir):
        
        #GNN related data
        adj_mat_path = f'{data_dir}/{folder}/{folder}_adjmat.npy'
        vertex_feature_mat_path = f'{data_dir}/{folder}/{folder}_featuremat.npy'
        adj_mat = torch.from_numpy(np.load(adj_mat_path)).float()
        
        
        feature_mat = torch.from_numpy(np.load(vertex_feature_mat_path)).float()
        atom_to_adj_row = json.load(open(f'{data_dir}/{folder}/{folder}_atom_to_row_adj_mat.json'))
        atom_to_feature_row = json.load(open(f'{data_dir}/{folder}/{folder}_atom_to_row_vertex_feature_mat.json'))
        
        probs_tensor = gnn.forward(adj_mat, feature_mat, folder)
        ground_truth_probs_tensor = torch.ones(probs_tensor.shape)
        loss = loss_function(probs_tensor, ground_truth_probs_tensor)
                                    
        # CNN related data
        mt_code = folder.split('_')[0]
        contact_map_path = f'2DContactMaps/CYP3A4_Mutants_Binding/{mt_code}.npy'
        contact_map_np = np.load(contact_map_path)
        
        with torch.autograd.set_detect_anomaly(True):
            vectorised_map = cnn.forward(torch.from_numpy(contact_map_np), mt_code)
                
            # 1.regression
            complex_embedding = gnn.complex_embedding_map[folder]
            contact_map_complex_embedding = vectorised_map + complex_embedding
            pred_activity = regressor.forward(contact_map_complex_embedding)
                    
            # 2.regression: getting ground truth label based on mt code
            ligand_id = int(folder.split('_')[1])
            filtered_data = db_df[(db_df['mt_code'] == mt_code) & (db_df['cid'] == ligand_id)]
            true_activity = float(filtered_data.iloc[0, 11])
            true_activity = torch.tensor(true_activity).view(1,1)
            
            r_loss = regression_loss(pred_activity, true_activity)
            
            r_loss.backward()
            optimiser.step()
            optimiser.zero_grad()
            total_loss += r_loss.data
    
    
    if (epoch % 100) == 0:
        print(f'current_epoch: {epoch} mean squared error loss:{total_loss}')

current_epoch: 0 mean squared error loss:1423.5853271484375
current_epoch: 100 mean squared error loss:780.7825317382812
current_epoch: 200 mean squared error loss:780.4229736328125
current_epoch: 300 mean squared error loss:780.429443359375
current_epoch: 400 mean squared error loss:780.4327392578125
current_epoch: 500 mean squared error loss:780.4345703125
current_epoch: 600 mean squared error loss:780.4357299804688
current_epoch: 700 mean squared error loss:780.4364624023438
current_epoch: 800 mean squared error loss:780.4371337890625
current_epoch: 900 mean squared error loss:780.4374389648438
current_epoch: 1000 mean squared error loss:780.4376831054688
current_epoch: 1100 mean squared error loss:780.4378662109375
current_epoch: 1200 mean squared error loss:780.43798828125
current_epoch: 1300 mean squared error loss:780.4381103515625
current_epoch: 1400 mean squared error loss:780.4381103515625
current_epoch: 1500 mean squared error loss:780.438232421875
current_epoch: 1600 mean s

In [123]:
torch.tensor([1,2,3]).shape

torch.Size([3])

In [79]:
a = torch.tensor(1)
b = torch.tensor(2)
c = a.add(b)
print(c)
print(a)
print(b)

tensor(3)
tensor(1)
tensor(2)


In [26]:
def pca(mat, n_components=2):
    
    from sklearn import preprocessing
    from sklearn.decomposition import PCA
    
    scaled_embeddings_matrix = preprocessing.StandardScaler().fit_transform(mat)
    
    pca = PCA(n_components=n_components)
    embeddings_matrix_pca = pca.fit_transform(scaled_embeddings_matrix)
    
    return embeddings_matrix_pca

def check_bond_info(np_vec):
    bond_info = []
    for i in range(len(np_vec)):
        if i == 1 or i == 2 or i == 3 or i == 4 or i == 6 or i ==7 or i ==9 or i ==10:
            if np_vec[1] == 1:
                bond_info.append('halogen_bonds')
                
            if np_vec[2] == 1:
                bond_info.append('h2_bonds')
            
            if np_vec[3] == 1:
                bond_info.append('hydrophobic_int')
            
            if np_vec[4] == 1:
                bond_info.append('metal complex')
            
            if np_vec[6] == 1:
                bond_info.append('pi cation int')
            
            if np_vec[7] == 1:
                bond_info.append('pi stack')
            
            if np_vec[9] == 1:
                bond_info.append('salt bridge')
            
            if np_vec[10] == 1:
                bond_info.append('h2o bridge')
    bond_info = list(set(bond_info))
    return '/'.join(bond_info)

def prep_pandas_df(embeddings_mat, columns, atom_to_feature_row, feature_mat):
    import pandas as pd
    embeddings_df = pd.DataFrame(embeddings_mat, columns = columns)
    atom_names = []
    bond_info = []
    for index, row in embeddings_df.iterrows():
        
        atom_names.append(atom_to_feature_row[str(index)])
        feature_vec = feature_mat.numpy()[index, :]
        bond_info.append(check_bond_info(feature_vec))

    embeddings_df['atoms'] = atom_names
    embeddings_df['bond_info'] = bond_info
    
    return embeddings_df

def tsne(mat, perplexity=5, n_components=2):
    from sklearn.manifold import TSNE
    return TSNE(n_components=n_components, perplexity=perplexity).fit_transform(mat)

data_dir = 'sample_data'
pca_df_map = {}
tsne_df_map = {}

for complex_id in gnn.atom_embedding_map:
    vertex_feature_mat_path = f'{data_dir}/{complex_id}/{complex_id}_featuremat.npy'
    atom_to_feat_row = json.load(open(f'{data_dir}/{complex_id}/{complex_id}_atom_to_row_vertex_feature_mat.json'))
    feature_mat = torch.from_numpy(np.load(vertex_feature_mat_path)).float()
    
    embeddings_pca_mat = pca(gnn.atom_embedding_map[complex_id].detach().numpy())
    pca_embeddings_df = prep_pandas_df(embeddings_pca_mat, ['x', 'y'], atom_to_feat_row, feature_mat)
    
    embeddings_tsne_mat = tsne(gnn.atom_embedding_map[complex_id].detach().numpy())
    tsne_embeddings_df = prep_pandas_df(embeddings_tsne_mat, ['x', 'y'], atom_to_feat_row, feature_mat)
    
    pca_df_map[complex_id] = pca_embeddings_df
    tsne_df_map[complex_id] = tsne_embeddings_df

In [24]:
import plotly.express as px

fig = px.scatter(df_map['CYP3A4.12_848780_complex'], x="x", y="y", color="bond_info",
                title=f"PCA projected Atoms embedding for {folder}")
fig.show()

In [33]:
import plotly.express as px

tsne_fig = px.scatter(tsne_df_map['CYP3A4-T309V_6013_complex'], x="x", y="y", color="bond_info",
                title=f"TSNE projected Atoms embedding for {folder}")
tsne_fig.show()

In [55]:
tensors = []
id_to_row_map = {}

i = 0
for complex_id in gnn.complex_embedding_map:
    
    tensors.append(gnn.complex_embedding_map[complex_id])
    next_row = gnn.complex_embedding_map[complex_id]
    id_to_row_map[complex_id] = i
    i += 1

all_complex_tensor = torch.stack(tensors, dim=0)
all_complex_tensor
id_to_row_map

{'CYP3A4-T309V_6013_complex': 0,
 'CYP3A4-V376T_6013_complex': 1,
 'CYP3A4.12_848780_complex': 2}

In [56]:
def prep_pandas_complex_df(embeddings_mat, columns, id_to_row_map):
    
    import pandas as pd
    embeddings_df = pd.DataFrame(embeddings_mat, columns = columns)
    complex_id = []
    
    for index, row in embeddings_df.iterrows():
        
        for id in id_to_row_map:
            if id_to_row_map[id] == index:
                complex_id.append(id)
                break
    
    embeddings_df['complex_id'] = complex_id

    return embeddings_df

In [58]:
embeddings_tsne_mat = tsne(all_complex_tensor.detach().numpy())
complex_df = prep_pandas_complex_df(embeddings_tsne_mat, ['x', 'y'], id_to_row_map)

In [59]:
import plotly.express as px

tsne_fig = px.scatter(complex_df, x="x", y="y", color="complex_id",
                title=f"TSNE projected complex embeddings")
tsne_fig.show()