In [38]:
%load_ext autoreload
%autoreload 2

import sys
sys.path.append('../..')

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## 1. Imports

### 1.1 External

In [39]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool
from torch_geometric.loader import DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch_geometric.data import Data


from rdkit import Chem
from rdkit.Chem import Descriptors
from sklearn.preprocessing import StandardScaler

import numpy as np


### 1.2 Internal

In [40]:
# Modified version of the dataset
from src import dataset_small

data = dataset_small.load_dataset(
    encoding='smiles',
    stratified=False,
    subset=None,
    fold=2
)

In [45]:

possible_atom_list = ['S', 'Si', 'F', 'O',
                      'C', 'I', 'P', 'Cl', 'Br', 'N', 'Unknown']

def one_of_k_encoding(x, allowable_set):
    if x not in allowable_set:
        raise Exception("input {0} not in allowable set{1}:".format(
            x, allowable_set))
    return list(map(lambda s: x == s, allowable_set))

def one_of_k_encoding_unk(x, allowable_set):
    """Maps inputs not in the allowable set to the last element."""
    if x not in allowable_set:
        x = allowable_set[-1]
    return list(map(lambda s: x == s, allowable_set))

import torch
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from rdkit import Chem
import numpy as np
from pathlib import Path
import os

# Utility functions to process atom and bond features
possible_atom_list = ['S', 'Si', 'F', 'O', 'C', 'I', 'P', 'Cl', 'Br', 'N', 'Unknown']

def one_of_k_encoding_unk(x, allowable_set):
    """Maps inputs not in the allowable set to the last element."""
    if x not in allowable_set:
        x = allowable_set[-1]
    return list(map(lambda s: x == s, allowable_set))

def atom_features(atom):
    return one_of_k_encoding_unk(atom.GetSymbol(), possible_atom_list) + \
           one_of_k_encoding(atom.GetDegree(), list(range(11))) + \
           one_of_k_encoding_unk(atom.GetImplicitValence(), list(range(7))) + \
           [atom.GetFormalCharge(), atom.GetNumRadicalElectrons()] + \
           one_of_k_encoding_unk(atom.GetHybridization(), [
               Chem.rdchem.HybridizationType.SP,
               Chem.rdchem.HybridizationType.SP2,
               Chem.rdchem.HybridizationType.SP3,
               Chem.rdchem.HybridizationType.SP3D,
               Chem.rdchem.HybridizationType.SP3D2
           ]) + [atom.GetIsAromatic()]

def bond_features(bond):
    bt = bond.GetBondType()
    bond_feats = [
        bt == Chem.rdchem.BondType.SINGLE, bt == Chem.rdchem.BondType.DOUBLE,
        bt == Chem.rdchem.BondType.TRIPLE, bt == Chem.rdchem.BondType.AROMATIC,
        bond.GetIsConjugated(), bond.IsInRing()
    ]
    return bond_feats

def get_bond_pair(mol):
    bonds = mol.GetBonds()
    res = [[], []]
    for bond in bonds:
        res[0] += [bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()]
        res[1] += [bond.GetEndAtomIdx(), bond.GetBeginAtomIdx()]
    return res

def mol2torchdata(mol):
    """
    Converts RDKit Mol object to a PyTorch Geometric Data object.
    Nodes = Atoms, Edges = Bonds
    """
    # Node features (atom features)
    atoms = mol.GetAtoms()
    node_f = [atom_features(atom) for atom in atoms]
    
    # Edge indices and edge attributes (bond features)
    edge_index = get_bond_pair(mol)
    bonds = mol.GetBonds()
    edge_attr = [bond_features(bond) for bond in bonds]

    # Create a PyTorch Geometric Data object
    data = Data(
        x=torch.tensor(node_f, dtype=torch.float),          # Node feature matrix (atoms)
        edge_index=torch.tensor(edge_index, dtype=torch.long),  # Edge index matrix (bonds)
        edge_attr=torch.tensor(edge_attr, dtype=torch.float)    # Edge attribute matrix (bond features)
    )
    return data

def create_data_loader(graphs, batch_size):
    """
    Creates a PyTorch Geometric DataLoader from a list of graph data objects.
    
    Parameters:
    - graphs: list of PyTorch Geometric Data objects (representing molecular graphs)
    - batch_size: int, the batch size for the DataLoader

    Returns:
    - data_loader: PyTorch Geometric DataLoader object for graph data
    """
    data_loader = DataLoader(graphs, batch_size=batch_size, shuffle=True, drop_last=True)
    return data_loader

def prepare_gcn_data(x_data, y_data, num_node_features=34, batch_size=64):
    """
    Converts input SMILES strings into graph data (PyTorch Geometric format) with node features and edge attributes
    to be used for GCN regression training.

    Parameters:
    - x_data: list of SMILES strings representing the molecules
    - y_data: list of target values (for regression)
    - num_node_features: int, the number of features for each node (atom). Default is 34
    - batch_size: int, the batch size to use in DataLoader (default is 64)

    Returns:
    - train_loader: DataLoader for the prepared GCN graph data
    """
    # Convert SMILES strings to PyTorch Geometric Data objects (graphs)
    x_graphs = [mol2torchdata(Chem.MolFromSmiles(smile)) for smile in x_data]

    # Attach the targets (y) to the graph data (for regression, use float type)
    for i, data in enumerate(x_graphs):
        data.y = torch.tensor([y_data[i]], dtype=torch.float)

    # Create a DataLoader object from the graph data
    data_loader = create_data_loader(x_graphs, batch_size)

    return data_loader

### Dataset Loading Function
def load_dataset(encoding, fold, stratified, subset, path_prefix=''):
    """
    Loads a dataset from preprocessed `.npz` files for training and testing.
    
    Parameters:
    - encoding: type of encoding used (e.g., 'smiles')
    - fold: the fold number for cross-validation
    - stratified: boolean, whether the dataset is stratified
    - subset: optional string for the dataset subset (e.g., '_small')
    - path_prefix: optional prefix for the dataset path
    
    Returns:
    - train: tuple of (x_train, y_train, smiles_train)
    - test: tuple of (x_test, y_test, smiles_test)
    """
    base_path = os.getcwd() + path_prefix + '/../../data/preprocessed/{type}/fold{no}_'.format(
        type = 'random' if not stratified else 'stratified',
        no = fold
    )

    x_paths = base_path + '{encoding}' + '_{set}{subset}.npz'
    y_paths = base_path + '{set}{subset}.npz'
    
    subset = ('_' + subset) if subset is not None else ''

    # Paths for training data
    y_train = y_paths.format(set = 'y_train', subset = subset)
    smiles_train = y_paths.format(set = 'smiles_train', subset = subset)
    x_train = x_paths.format(encoding=encoding, set = 'x_train', subset = subset)

    print(x_train)
    print(y_train)

    if encoding == 'smiles':
        x_train = smiles_train

    x_train_load = np.load(x_train, allow_pickle=True)
    x_train_load = [x_train_load[f] for f in x_train_load.files]

    if len(x_train_load) == 1:
        x_train_load = x_train_load[0]

    # Slice first 10 rows of training data (for testing purposes)
    x_train_load = x_train_load[:10]
    y_train_load = np.load(y_train, allow_pickle=True)['y'][:10]
    smiles_train_load = np.load(smiles_train, allow_pickle=True)['smiles'][:10]

    # Paths for test data
    y_test = y_paths.format(set = 'y_test', subset = subset)
    smiles_test = y_paths.format(set = 'smiles_test', subset = subset)
    x_test = x_paths.format(encoding=encoding, set = 'x_test', subset = subset)

    if encoding == 'smiles':
        x_test = smiles_test

    x_test_load = np.load(x_test, allow_pickle=True)
    x_test_load = [x_test_load[f] for f in x_test_load.files]

    if len(x_test_load) == 1:
        x_test_load = x_test_load[0]

    # Slice first 10 rows of test data (for testing purposes)
    x_test_load = x_test_load[:10]
    y_test_load = np.load(y_test, allow_pickle=True)['y'][:10]
    smiles_test_load = np.load(smiles_test, allow_pickle=True)['smiles'][:10]

    # Train and test splits
    train = (x_train_load, y_train_load, smiles_train_load)
    test = (x_test_load, y_test_load, smiles_test_load)

    return train, test

## 2. GCN architechture

In [42]:
class GCN(torch.nn.Module):
    def __init__(self, num_node_features):
        super(GCN, self).__init__()
        
        # Convolutional base size and number of layers
        conv_base_size = 64
        conv_n_layers = 5
        
        # GCN layers
        self.convs = nn.ModuleList()
        self.convs.append(GCNConv(num_node_features, conv_base_size))  # First GCN layer
        
        # Add additional convolutional layers
        for _ in range(1, conv_n_layers):
            self.convs.append(GCNConv(conv_base_size, conv_base_size))
        
        # Dropout layer
        self.dropout = nn.Dropout(p=0.153)
        
        # MLP layers for regression
        self.fc1 = nn.Linear(conv_base_size, 64)
        self.fc2 = nn.Linear(64, 1)  # Output is a single continuous value (for regression)
        
    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        
        # Apply GCN layers with ReLU activation
        for conv in self.convs:
            x = F.relu(conv(x, edge_index))
        
        # Global mean pooling (pool over node embeddings)
        x = global_mean_pool(x, batch)
        
        # Apply MLP layers with dropout
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)  # Output a single value for regression
        
        return x  # No softmax or activation for regression output

# Training function for regression
def train(model, loader, optimizer, scheduler, device):
    model.train()
    
    total_loss = 0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.mse_loss(output, data.y.view(-1, 1))  # Mean Squared Error loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    scheduler.step(total_loss)
    return total_loss / len(loader)

# Evaluation function for regression
def evaluate(model, loader, device):
    model.eval()
    
    total_loss = 0
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            output = model(data)
            loss = F.mse_loss(output, data.y.view(-1, 1))  # Mean Squared Error loss
            total_loss += loss.item()
    
    return total_loss / len(loader)

# Model training process for regression
def train_gcn_regression(x_train, y_train, num_node_features):
    # Hyperparameters
    learning_rate = 0.008
    batch_size = 64
    epochs = 100
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Prepare dataset (convert x_train to PyG data format)
    dataset = [mol2torchdata(Chem.MolFromSmiles(smile)) for smile in x_train]
    for data, y in zip(dataset, y_train):
        data.y = torch.tensor(y, dtype=torch.float)  # Ensure target is float type for regression
    
    # DataLoader
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # Model, optimizer, and scheduler
    model = GCN(num_node_features=num_node_features).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=20, verbose=True)
    
    # Training loop
    for epoch in range(epochs):
        loss = train(model, loader, optimizer, scheduler, device)
        if epoch % 10 == 0:  # Print every 10 epochs
            val_loss = evaluate(model, loader, device)
            print(f"Epoch: {epoch}, Training Loss: {loss:.4f}, Validation Loss: {val_loss:.4f}")
    
    return model

In [47]:
train_data, test_data = load_dataset('graph', fold=1, stratified=False, subset=None)

c:\Users\xalvna\Desktop\Natalia\master\courses\Project course\rep\notebooks\benchmarks/../../data/preprocessed/random/fold1_graph_x_train.npz
c:\Users\xalvna\Desktop\Natalia\master\courses\Project course\rep\notebooks\benchmarks/../../data/preprocessed/random/fold1_y_train.npz


In [60]:
### Example of usage
# Loading the dataset
train_data, test_data = load_dataset('smiles', fold=0, stratified=False, subset=None)
x_train, y_train, smiles_train = train_data

# Step 1: Flatten x_train to get SMILES strings
x_train_flat = [item[0] for item in x_train]  # Flattening to get a list of SMILES

# Step 2: Flatten y_train to make it a 1D array
y_train_flat = y_train.flatten()  # Converts (10, 1) to (10,)

# Step 3: Convert SMILES strings to graph data objects
x_graphs = [mol2torchdata(Chem.MolFromSmiles(smile)) for smile in x_train_flat]

# Step 4: Attach the target values (y) to each graph data object
for i, data in enumerate(x_graphs):
    data.y = torch.tensor(y_train_flat[i], dtype=torch.float).view(1, -1)  # Reshape if needed

# Step 5: Create DataLoader for training
batch_size = 64
train_loader = DataLoader(x_graphs, batch_size=batch_size, shuffle=True)

# Now, train_loader can be used for training your GCN

#train_loader = prepare_gcn_data(x_train, y_train, num_node_features=34, batch_size=64)

# Train the GCN regression model
train_gcn_regression(train_loader)


c:\Users\xalvna\Desktop\Natalia\master\courses\Project course\rep\notebooks\benchmarks/../../data/preprocessed/random/fold0_smiles_x_train.npz
c:\Users\xalvna\Desktop\Natalia\master\courses\Project course\rep\notebooks\benchmarks/../../data/preprocessed/random/fold0_y_train.npz


TypeError: train_gcn_regression() missing 2 required positional arguments: 'y_train' and 'num_node_features'