In [None]:
import htmd.ui as ht
from moleculekit.molecule import Molecule
import moleculekit.tools.voxeldescriptors as vd
from moleculekit.tools.voxeldescriptors import getVoxelDescriptors, viewVoxelFeatures
from moleculekit.tools.atomtyper import prepareProteinForAtomtyping
from moleculekit.smallmol.smallmol import SmallMol
from moleculekit.home import home
import os

import csv
from tqdm import *
import os
import pickle
import numpy as np
import pandas as ps
import multiprocessing as mp
from sklearn.model_selection import train_test_split
from oddt import toolkit
from oddt import datasets
from oddt.datasets import pdbbind

import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning) 
warnings.filterwarnings("ignore", category=FutureWarning) 

### SqueezeNet Model Training

In [None]:
import os
import h5py
data_dir = ".\DLSCORE-CNN-master\DLSCORE-CNN-master\dataset"
pdbbind_dir = os.path.join(data_dir, "refined-set-2016\\")
pdbbind_dataset = pdbbind(home = pdbbind_dir, default_set='refined', version=2016)
h5f = h5py.File(os.path.join(data_dir, "data_1.h5"), 'r')
train_x_1, train_y = h5f['train_x_1'][:], h5f['train_y'][:]
valid_x_1, valid_y = h5f['valid_x_1'][:], h5f['valid_y'][:]
test_x_1, test_y = h5f['test_x_1'][:], h5f['test_y'][:]
train_x_2, valid_x_2, test_x_2 = h5f['train_x_2'][:], h5f['valid_x_2'][:], h5f['test_x_2'][:]
h5f.close()
print("Data shapes: ", train_x_1.shape, valid_x_1.shape, test_x_1.shape, train_x_2.shape, valid_x_2.shape, test_x_2.shape)
print("Y shape: ", train_y.shape, valid_y.shape, test_y.shape)

In [None]:
pip install torchinfo

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchinfo import summary

class ExpandBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ExpandBlock, self).__init__()
        self.conv1 = nn.Conv3d(in_channels, out_channels, kernel_size=1, padding=0)
        self.conv2 = nn.Conv3d(in_channels, out_channels, kernel_size=3, padding=1)
    
    def forward(self, x):
        expand1 = F.relu(self.conv1(x))
        expand2 = F.relu(self.conv2(x))
        return torch.cat([expand1, expand2], dim=1)

class CrossAttention(nn.Module):
    def __init__(self, d_in, d_out_kq, d_out_v):
        super().__init__()
        self.d_out_kq=d_out_kq
        self.W_query=nn.Parameter(torch.rand(d_in, d_out_kq))
        self.W_key  = nn.Parameter(torch.rand(d_in, d_out_kq))
        self.W_value=nn.Parameter(torch.rand(d_in, d_out_v))
    
    def forward(self, x_1, x_2):
        queries_1=x_1.matmul(self.W_query)
        keys_2=x_2.matmul(self.W_key)
        values_2=x_2.matmul(self.W_value)
        
        attn_scores=queries_1.matmul(keys_2.T)
        attn_weights=torch.softmax(
            attn_scores/self.d_out_kq**0.5, dim=-1
        )
        
        context_vec=attn_weights.matmul(values_2)
        return context_vec


class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()

        self.conv0 = nn.Conv3d(16, 128,kernel_size = 3, stride=2, padding = 1)
        
        self.conv1 = nn.Conv3d(128, 256, kernel_size=1, padding=1)
        self.bn1 = nn.BatchNorm3d(256)
        self.pool1 = nn.MaxPool3d(kernel_size=3, stride=2)
        self.expand_block1 = ExpandBlock(256, 64)
        
        self.conv2 = nn.Conv3d(128, 32, kernel_size=1, padding=1)
        self.bn2 = nn.BatchNorm3d(32)
        self.pool2 = nn.MaxPool3d(kernel_size=3, stride=2)
        self.expand_block2 = ExpandBlock(32, 64)

        self.conv3 = nn.Conv3d(128, 32, kernel_size=1, padding=1)
        self.bn3 = nn.BatchNorm3d(32)
        self.pool3 = nn.MaxPool3d(kernel_size=3, stride=2)
        self.expand_block3 = ExpandBlock(32, 128)

        #self.avg_pool_1 = nn.AvgPool3d(kernel_size=3, padding=1)


        self.conv4 = nn.Conv3d(256, 32, kernel_size=1, padding=1)
        self.bn4 = nn.BatchNorm3d(32)
        self.pool4 = nn.MaxPool3d(kernel_size=2, stride=1, padding = 1)
        self.expand_block4 = ExpandBlock(32, 128)

        self.conv5 = nn.Conv3d(256, 128, kernel_size=1, padding = 1)
        self.bn5 = nn.BatchNorm3d(128)
        self.pool5 = nn.MaxPool3d(kernel_size=2, stride=1, padding = 1)
        self.expand_block5 = ExpandBlock(128, 192)


        self.conv6 = nn.Conv3d(384, 128, kernel_size=1, padding = 1)
        self.bn6 = nn.BatchNorm3d(128)
        self.pool6 = nn.MaxPool3d(kernel_size=2, stride=1, padding = 1)
        self.expand_block6 = ExpandBlock(128, 32)

        self.conv7 = nn.Conv3d(64, 16, kernel_size=1, padding = 1)
        self.bn7 = nn.BatchNorm3d(16)
        self.pool7 = nn.MaxPool3d(kernel_size=3, stride=1)
        self.expand_block7 = ExpandBlock(16, 16)

        
        self.avg_pool_2 = nn.AvgPool3d(kernel_size=5)

        self.crossattn = CrossAttention(256, 5, 256)


        self.flatten = nn.Flatten(1) 
        
        # self.fc_1 = nn.Linear(256, 512)
        # self.fc_2 = nn.Linear(512, 720)
        # self.fc_3 = nn.Linear(720, 1028)
        # self.fc_4 = nn.Linear(1028, 512)
        self.fc_5 = nn.Linear(256, 128)
        self.fc_6 = nn.Linear(128, 32)
        self.fc_7 = nn.Linear(32, 16)
        self.fc_8 = nn.Linear(16, 1)
                

    def forward(self, x, x1):

        x = F.relu(self.conv0(x))
        
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool2(x)
        x = self.expand_block1(x)
        
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = self.expand_block2(x)

        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool3(x)
        x = self.expand_block3(x)

        #x = self.avg_pool_1(x)

        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool4(x)
        x = self.expand_block4(x)

        x = F.relu(self.bn5(self.conv5(x)))
        x = self.pool5(x)
        x = self.expand_block5(x)

        x = F.relu(self.bn6(self.conv6(x)))
        x = self.pool6(x)
        x = self.expand_block6(x)

        x = F.relu(self.bn7(self.conv7(x)))
        x = self.pool7(x)
        x = self.expand_block7(x)

        x = self.avg_pool_2(x)
        
        
        x = self.flatten(x)

        x = self.crossattn(x, x1)

        #print(x.shape) #(batch_size x length) = (50, 512)
        
        # x = F.relu(self.fc_1(x))
        # x = F.relu(self.fc_2(x))
        # x = F.relu(self.fc_3(x))
        # x = F.relu(self.fc_4(x))
        x = F.relu(self.fc_5(x))
        x = F.relu(self.fc_6(x))
        x = F.relu(self.fc_7(x))
        x = self.fc_8(x)
        return x

# Create an instance of the model
model = MyModel()
summary(model, input_size=[(50, 16, 24, 24, 24), (50, 256,)])

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator
from ignite.handlers import ModelCheckpoint, EarlyStopping
from ignite.metrics import Loss
import numpy as np


X1_train_tensor = torch.tensor(train_x_1, dtype=torch.float32)
X2_train_tensor = torch.tensor(train_x_2, dtype=torch.float32)
y_train_tensor = torch.tensor(train_y, dtype=torch.float32)

X1_valid_tensor = torch.tensor(valid_x_1, dtype=torch.float32)
X2_valid_tensor = torch.tensor(valid_x_2, dtype=torch.float32)
y_valid_tensor = torch.tensor(valid_y, dtype=torch.float32)


X1_test_tensor = torch.tensor(test_x_1, dtype=torch.float32)
X2_test_tensor = torch.tensor(test_x_2, dtype=torch.float32)
y_test_tensor = torch.tensor(test_y, dtype=torch.float32)



train_dataset = TensorDataset(X1_train_tensor, X2_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=50, shuffle=True)

test_dataset = TensorDataset(X1_test_tensor, X2_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=50, shuffle=False)

valid_dataset = TensorDataset(X1_valid_tensor, X2_valid_tensor, y_valid_tensor)
valid_loader = DataLoader(valid_dataset, batch_size=50, shuffle=False)


criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, amsgrad=True)


best_loss = np.inf
patience = 10
counter = 0


num_epochs = 30

# Lists to store training and validation losses
train_losses = []
val_losses = []


ep = []
l = []
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    for input1, input2, labels in train_loader:
        optimizer.zero_grad()  # Zero the parameter gradients
        #print(input1.shape)
        #print(input1.shape)
        outputs = model(input1, input2)  # Forward pass
        loss = criterion(outputs, labels)  # Calculate the loss
        loss.backward()  # Backward pass
        optimizer.step()  # Optimize
        running_loss += loss.item() * input1.size(0)
    epoch_loss = running_loss / len(train_dataset)
    l.append(epoch_loss)
    ep.append(epoch+1)
    train_losses.append(epoch_loss)
    #print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}")
    

    model.eval()
    running_val_loss = 0.0
    with torch.no_grad():
        for input1, input2, labels in valid_loader:
            outputs = model(input1, input2)
            val_loss = criterion(outputs, labels)
            running_val_loss += val_loss.item() * input1.size(0)
        epoch_val_loss = running_val_loss / len(valid_loader.dataset)
        val_losses.append(epoch_val_loss)
    
    print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {epoch_loss:.4f}, Val Loss: {epoch_val_loss:.4f}")

    # Check for early stopping
    if epoch_val_loss < best_loss:
        best_loss = epoch_val_loss
        counter = 0
        torch.save(model.state_dict(), 'best_model.pth')  # Save the best model
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered.")
            break
    

In [None]:
import matplotlib.pyplot as plt
plt.plot(ep, l, color='red', label='Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Training Loss')
plt.title(f'Training Loss')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
import sklearn
model.eval()  # Set model to evaluation mode
test_loss = 0.0
yhat = []
y = []
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
with torch.no_grad():
    for input1, input2, label in test_loader:
        output = model(input1, input2)
        yhat.append(output.numpy().reshape(-1))
        y.append(label.numpy().reshape(-1))
        loss = criterion(outputs, labels)
        test_loss += loss.item() * input1.size(0)
    avg_test_loss = test_loss / len(test_dataset)
    print(f"Test Loss: {avg_test_loss:.4f}")

print(len(yhat))
print(len(y))

y = np.concatenate(y).reshape(-1)
yhat = np.concatenate(yhat).reshape(-1)

y = np.flip(y)

evaluations ={'RMSE': sklearn.metrics.root_mean_squared_error(y, yhat),
        'MAE': sklearn.metrics.mean_absolute_error(y, yhat),
        'Max Error': sklearn.metrics.max_error(y, yhat),
        'CORR': np.corrcoef(y, yhat),
    }
print(evaluations)

In [None]:
print(y)

In [None]:
print(yhat)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import pearsonr, linregress

# Calculate Pearson correlation coefficient
corr_coefficient, _ = pearsonr(yhat, y)
RMSE = sklearn.metrics.root_mean_squared_error(y, yhat)

# Calculate line of best fit
slope, intercept, _, _, _ = linregress(y, yhat)
line = slope * y + intercept

# Plotting
plt.scatter(y, yhat)
plt.plot(y, line, color='red', label='Correlation')
plt.xlabel('Experimental pK')
plt.ylabel('Predicted pK')
plt.title(f'Pearson Correlation Coefficient: {corr_coefficient:.4f}, RMSE: {RMSE}')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# Plot training and validation losses per epoch
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss per Epoch')
plt.legend()
plt.grid(True)
plt.show()