<a href="https://colab.research.google.com/github/dongddaoist/APPRET/blob/main/GNN_MP_prediction_test_Mw_hyperopt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

!conda install -c conda-forge rdkit
!pip install torch torch-geometric

# Install rdkit (should only be installed via conda)

!pip install --quiet rdkit-pypi > /dev/null
!pip install optuna

In [None]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive/')
ifile= '/content/drive/My Drive/machine-learning/Data/pubchem/mp_std_500.csv'
# Load your CSV file
df = pd.read_csv(ifile)


Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
df

Unnamed: 0,SMILES,MP
0,CC(CN)O,274.5389
1,C1=CC(=C(C=C1[N+](=O)[O-])[N+](=O)[O-])Cl,326.1500
2,C(CCl)Cl,237.8722
3,C1=CC(=C(C=C1Cl)Cl)Cl,290.3722
4,C(C=O)Cl,257.0389
...,...,...
7867,CC1CC=CC2C3C(O3)(C(C4C2(C(=O)NC4CC5=CC=CC=C5)O...,480.1500
7868,CC=C1CC(C(C(=O)OCC2=CC[N+]3(C2C(CC3)OC1=O)[O-]...,411.1500
7869,C(C1(C2C3C(N=C(NC34C(C1OC(C4O)(O2)O)O)N)O)O)O,498.1500
7870,CCCCCCC(C1C(C2=C(CC(CC3=C1C(=O)OC3=O)C(C4CC=CC...,442.1500


In [None]:
df['MP']

0       274.5389
1       326.1500
2       237.8722
3       290.3722
4       257.0389
          ...   
7867    480.1500
7868    411.1500
7869    498.1500
7870    442.1500
7871    419.1500
Name: MP, Length: 7872, dtype: float64

In [None]:
#!conda install -c conda-forge rdkit
#!pip install torch torch-geometric

# Install rdkit (should only be installed via conda)
#conda install -y --quiet -c conda-forge rdki > /dev/null
#!conda install conda-forge::rdkit > /dev/null
#!pip install --quiet rdkit-pypi > /dev/null

import torch
import pandas as pd
import numpy as np

#from google.colab import drive
from rdkit import Chem
from rdkit.Chem import Descriptors

from torch_geometric.data import Data
from torch_geometric.data import InMemoryDataset, DataLoader, Batch
from torch.utils.data import random_split
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool as gap, global_max_pool as gmp

from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR

import matplotlib.pyplot as plt

import optuna  # Import Optuna
# Check if GPU is available and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')







Using device: cuda


In [None]:
def get_atom_features(atom):
    """ Return a list of atom features. """
    return [
        atom.GetAtomicNum(),         # Atomic number
        atom.GetDegree(),            # Degree, number of neighbors
        atom.GetTotalDegree(),       # Total connections including Hydrogens
        atom.GetImplicitValence(),   # Implicit valence
        atom.GetFormalCharge(),      # Formal charge
        atom.IsInRing(),             # 1 if the atom is in a ring, else 0
        atom.GetHybridization().real # Hybridization state as an integer
    ]

def get_bond_features(bond):
    """ Return a list of bond features. """
    return [
        bond.GetBondTypeAsDouble(),  # Single=1, Double=2, Triple=3, Aromatic=1.5
        bond.IsInRing(),             # 1 if the bond is in a ring, else 0
        bond.GetIsConjugated()       # 1 if the bond is conjugated, else 0
    ]


def molecule_to_graph(smiles, label):
    try:
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            print("Failed to parse SMILES:", smiles)
            return None  # Make sure to exit the function if mol is None
        else:
            print("SMILES parsed successfully:", smiles)
    except Exception as e:
        print("Error processing SMILES:", smiles, e)
        return None  # Exit the function on exception

    x = torch.tensor([get_atom_features(atom) for atom in mol.GetAtoms()], dtype=torch.float)
    edge_index = []
    edge_attr = []

    for bond in mol.GetBonds():
        start, end = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()
        features = get_bond_features(bond)
        edge_index.extend([[start, end], [end, start]])
        edge_attr.extend([features, features])

    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
    edge_attr = torch.tensor(edge_attr, dtype=torch.float)
    y = torch.tensor([label], dtype=torch.float)
    molecular_weight = torch.tensor([Descriptors.MolWt(mol)], dtype=torch.float).view(1, -1)

    data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y)
    data.molecular_weight = molecular_weight
    print("Molecular weight assigned:", data.molecular_weight)
    return data

def custom_collate(batch):
    batch_with_mw = [item for item in batch if hasattr(item, 'molecular_weight') and item.molecular_weight is not None]
    if not batch_with_mw:
        print("No valid entries with molecular weight found in the batch.")
        return Batch()  # Return an empty batch object or handle appropriately

    batch_data = Batch.from_data_list(batch_with_mw)
    molecular_weights = torch.cat([item.molecular_weight for item in batch_with_mw], dim=0)
    batch_data.molecular_weight = molecular_weights
    return batch_data


class CustomMoleculeDataset(InMemoryDataset):
    def __init__(self, root, filename, transform=None, pre_transform=None):
        self.filename = filename  # Store the filename
        super(CustomMoleculeDataset, self).__init__(root, transform, pre_transform)
        self.data, self.slices = torch.load(self.processed_paths[0])

    @property
    def raw_file_names(self):
        # This should return the list of raw file names expected in the raw directory.
        # Ensure these files actually exist in the raw directory.
        return [self.filename]

    @property
    def processed_file_names(self):
        # This should return the list of files that result from processing the raw data.
        return ['data.pt']

    def download(self):
        # Implement download logic if your files are to be downloaded.
        # Otherwise, ensure files are manually placed in the correct directory.
        pass

    def process(self):
        print("Processing data...")
        df = pd.read_csv(self.raw_paths[0])
        data_list = []
        for index, row in df.iterrows():
            graph = molecule_to_graph(row['SMILES'], row['MP'])
            if graph is not None and hasattr(graph, 'molecular_weight'):
                data_list.append(graph)
            else:
                print(f"Skipping entry at index {index} due to missing molecular weight.")
        if not data_list:
            raise ValueError("No valid data could be processed.")
        print(f"Processed {len(data_list)} graphs with molecular weight.")
        data, slices = self.collate(data_list)
        torch.save((data, slices), self.processed_paths[0])



# Initialize dataset
dataset_root = 'pubchem-mp'
dataset = CustomMoleculeDataset(root=dataset_root, filename=ifile)
# Setup DataLoader
train_size = int(len(dataset) * 0.8)
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Assuming train_dataset and test_dataset are subsets created from random_split
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate)

# Example to check if molecular_weight is being passed correctly:
for batch in train_loader:
    if hasattr(batch, 'molecular_weight'):
        print("Molecular weight is present in the batch.")
        print(batch.molecular_weight.shape)
    else:
        print("Molecular weight is missing from the batch.")
    break  # Checking only the first batch for demonstration
# Initialize the model and move it to the device (GPU or CPU)
#model = GCN(dataset.num_node_features).to(device)


In [None]:

# Initialize dataset
dataset_root = 'pubchem-mp'
dataset = CustomMoleculeDataset(root=dataset_root, filename=ifile)
# Setup DataLoader
train_size = int(len(dataset) * 0.8)
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Assuming train_dataset and test_dataset are subsets created from random_split
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate)

# Example to check if molecular_weight is being passed correctly:
for batch in train_loader:
    if hasattr(batch, 'molecular_weight'):
        print("Molecular weight is present in the batch.")
        print(batch.molecular_weight.shape)
    else:
        print("Molecular weight is missing from the batch.")
    break  # Checking only the first batch for demonstration
# Initialize the model and move it to the device (GPU or CPU)
#model = GCN(dataset.num_node_features).to(device)


Molecular weight is present in the batch.
torch.Size([32, 1])


In [None]:
dataset[0].molecular_weight

tensor([[75.1110]])

In [None]:
# Initialize dataset
dataset_root = 'pubchem-mp'
dataset = CustomMoleculeDataset(root=dataset_root, filename=ifile)
# Setup DataLoader
train_size = int(len(dataset) * 0.8)
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Assuming train_dataset and test_dataset are subsets created from random_split
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate)

In [None]:
dataset.num_node_features

7

In [None]:
for batch in train_loader:
    if hasattr(batch, 'molecular_weight'):
        print("Molecular weight is present in the batch.")
        print(batch.molecular_weight.shape)
    else:
        print("Molecular weight is missing from the batch.")
    break  # Checking only the first batch for demonstration

Molecular weight is present in the batch.
torch.Size([32, 1])


In [None]:
#building GNN
class GCN(torch.nn.Module):
    def __init__(self, num_features, hidden_channels, num_layers, dropout_rate):
        super(GCN, self).__init__()
        torch.manual_seed(123)

        # Adjust layers sizes based on input parameters
        sizes = [num_features] + [hidden_channels * 2 ** i for i in range(num_layers)]

        self.layers = torch.nn.ModuleList()
        self.bns = torch.nn.ModuleList()

        # Creating the GCN layers based on the sizes list
        for i in range(len(sizes) - 1):
            self.layers.append(GCNConv(sizes[i], sizes[i+1]))
            if i < len(sizes) - 2:  # No batch norm after last GCN layer before output
                self.bns.append(torch.nn.BatchNorm1d(sizes[i+1]))

        self.dropout = torch.nn.Dropout(dropout_rate)

        # Assuming the last layer's output is adjusted as required
        final_embedding_size = sizes[-1]  # This would be the size of the last GCN layer's output
        self.out = Linear(final_embedding_size * 2 + 1, 1)  # Adjusting for concatenated pooling features

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        molecular_weight = data.molecular_weight  # Assumes it's already included and correctly formatted

        for i, layer in enumerate(self.layers):
            x = layer(x, edge_index)
            if i < len(self.bns):
                x = self.bns[i](x)
                x = F.relu(x)
                x = self.dropout(x)

        # Pooling (example with both global max and mean pooling)
        x = torch.cat([gmp(x, batch), gap(x, batch)], dim=1)

        # Concatenate molecular weight
        x = torch.cat([x, molecular_weight], dim=1)  # Ensure correct concatenation

        # Output layer
        x = self.out(x)
        return x.squeeze(-1)



In [None]:
# Initialize dataset
dataset_root = 'pubchem-mp'
dataset = CustomMoleculeDataset(root=dataset_root, filename=ifile)

# Setup DataLoader
train_size = int(len(dataset) * 0.8)
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate)
model = GCN(dataset.num_node_features).to(device)
for data in train_loader:
    if hasattr(data, 'molecular_weight'):
        output = model(data.to(device))
    else:
        print("Molecular weight missing from the batch data")
        continue  # Skip this batch or handle it according to your requirements


# Initialize the model and move it to the device (GPU or CPU)



optimizer = Adam(model.parameters(), lr=0.01)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
loss_function = torch.nn.MSELoss()

# Test data loading and model forward pass

for batch in train_loader:
    batch.to(device)
    out = model(batch)
    print("Output from the model:", out.shape)


print(model)
print("Number of parameters: ", sum(p.numel() for p in model.parameters()))

In [None]:
def train(model, loader, device, optimizer, loss_function):
    model.train()
    total_loss = 0
    total_mae = 0
    total_mse = 0
    predictions = []
    labels = []
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, data.y.to(device))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        # Calculate errors
        mae = F.l1_loss(output, data.y.to(device), reduction='mean')
        mse = F.mse_loss(output, data.y.to(device), reduction='mean')
        total_mae += mae.item()
        total_mse += mse.item()

        # Move data to CPU for numpy conversion
        predictions.extend(output.detach().cpu().numpy())
        labels.extend(data.y.cpu().numpy())

    num_samples = len(loader.dataset)
    average_loss = total_loss / len(loader)
    average_mae = total_mae / num_samples
    average_mse = total_mse / num_samples
    return average_loss, average_mae, average_mse, predictions, labels

def test(model, loader, device, loss_function):
    model.eval()
    total_loss = 0
    total_mae = 0
    total_mse = 0
    predictions = []
    labels = []
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            output = model(data)
            loss = loss_function(output, data.y.to(device))
            total_loss += loss.item()

            # Calculate errors
            mae = F.l1_loss(output, data.y.to(device), reduction='mean')
            mse = F.mse_loss(output, data.y.to(device), reduction='mean')
            total_mae += mae.item()
            total_mse += mse.item()

            # Move data to CPU for numpy conversion
            predictions.extend(output.cpu().numpy())
            labels.extend(data.y.cpu().numpy())

    num_samples = len(loader.dataset)
    average_loss = total_loss / len(loader)
    average_mae = total_mae / num_samples
    average_mse = total_mse / num_samples
    return average_loss, average_mae, average_mse, predictions, labels

In [None]:
# save loss function
import optuna

def objective(trial):
    # Hyperparameters to tune
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    hidden_channels = trial.suggest_categorical("hidden_channels", [16, 32, 64, 128])
    num_layers = trial.suggest_int("num_layers", 1, 4)
    dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.5)

    # Model setup
    model = GCN(dataset.num_node_features, hidden_channels, num_layers, dropout_rate).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

    # Training loop
    for epoch in range(50):  # Reduced number of epochs for faster trials
        model.train()
        for batch in train_loader:
            batch.to(device)
            optimizer.zero_grad()
            out = model(batch)
            loss = torch.nn.MSELoss()(out, batch.y.to(device))
            loss.backward()
            optimizer.step()
        scheduler.step()

    # Evaluation
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            batch.to(device)
            preds = model(batch)
            loss = torch.nn.MSELoss()(preds, batch.y.to(device))
            total_loss += loss.item()

    average_loss = total_loss / len(test_loader)
    return average_loss




In [None]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=100)

print("Best hyperparameters: ", study.best_trial.params)


45.31686943054199
3297.054553222656


In [None]:
!pip install optuna matplotlib plotly
from optuna.visualization import plot_optimization_history, plot_parallel_coordinate, plot_param_importances, plot_contour, plot_slice


In [None]:
# Optimization History: Shows the scores of all trials across each trial.
fig1 = plot_optimization_history(study)
fig1.write_image("optimization_history.png")

In [None]:
# Parameter Relationships: Visualizes the relationships between parameters.
fig2 = plot_parallel_coordinate(study)
fig2.write_image("optimization_P_relation.png")

In [None]:
# Parameter Importance: Shows which parameters were most influential in achieving the best scores.
fig3 = plot_param_importances(study)
fig3.write_image("optimization_params_imp.png")

In [None]:
# Slice Plot: Allows  to explore the impact of specific parameters across the range of values they took in the trials.
fig4 = plot_slice(study)
fig4.write_image("optimization_slice.png")

In [None]:
# Contour Plot: Visualizes the interaction between two parameters and how their values relate to the trial scores.
fig5 = plot_contour(study, params=['lr', 'hidden_channels'])
fig5.write_image("optimization_contour.png")