In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pandas as pd
import pickle
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from pathlib import Path
from rdkit import Chem
from rdkit import RDLogger
from scipy.interpolate import interp1d
from torch.utils.data import DataLoader, TensorDataset

# Disable RDLogger warnings
RDLogger.DisableLog('rdApp.*')
import os

os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
functional_groups = {
    'Acid anhydride': Chem.MolFromSmarts('[CX3](=[OX1])[OX2][CX3](=[OX1])'),
    'Acyl halide': Chem.MolFromSmarts('[CX3](=[OX1])[F,Cl,Br,I]'),
    'Alcohol': Chem.MolFromSmarts('[#6][OX2H]'),
    'Aldehyde': Chem.MolFromSmarts('[CX3H1](=O)[#6,H]'),
    'Alkane': Chem.MolFromSmarts('[CX4;H3,H2]'),
    'Alkene': Chem.MolFromSmarts('[CX3]=[CX3]'),
    'Alkyne': Chem.MolFromSmarts('[CX2]#[CX2]'),
    'Amide': Chem.MolFromSmarts('[NX3][CX3](=[OX1])[#6]'),
    'Amine': Chem.MolFromSmarts('[NX3;H2,H1,H0;!$(NC=O)]'),
    'Arene': Chem.MolFromSmarts('[cX3]1[cX3][cX3][cX3][cX3][cX3]1'),
    'Azo compound': Chem.MolFromSmarts('[#6][NX2]=[NX2][#6]'),
    'Carbamate': Chem.MolFromSmarts('[NX3][CX3](=[OX1])[OX2H0]'),
    'Carboxylic acid': Chem.MolFromSmarts('[CX3](=O)[OX2H]'),
    'Enamine': Chem.MolFromSmarts('[NX3][CX3]=[CX3]'),
    'Enol': Chem.MolFromSmarts('[OX2H][#6X3]=[#6]'),
    'Ester': Chem.MolFromSmarts('[#6][CX3](=O)[OX2H0][#6]'),
    'Ether': Chem.MolFromSmarts('[OD2]([#6])[#6]'),
    'Haloalkane': Chem.MolFromSmarts('[#6][F,Cl,Br,I]'),
    'Hydrazine': Chem.MolFromSmarts('[NX3][NX3]'),
    'Hydrazone': Chem.MolFromSmarts('[NX3][NX2]=[#6]'),
    'Imide': Chem.MolFromSmarts('[CX3](=[OX1])[NX3][CX3](=[OX1])'),
    'Imine': Chem.MolFromSmarts('[$([CX3]([#6])[#6]),$([CX3H][#6])]=[$([NX2][#6]),$([NX2H])]'),
    'Isocyanate': Chem.MolFromSmarts('[NX2]=[C]=[O]'),
    'Isothiocyanate': Chem.MolFromSmarts('[NX2]=[C]=[S]'),
    'Ketone': Chem.MolFromSmarts('[#6][CX3](=O)[#6]'),
    'Nitrile': Chem.MolFromSmarts('[NX1]#[CX2]'),
    'Phenol': Chem.MolFromSmarts('[OX2H][cX3]:[c]'),
    'Phosphine': Chem.MolFromSmarts('[PX3]'),
    'Sulfide': Chem.MolFromSmarts('[#16X2H0]'),
    'Sulfonamide': Chem.MolFromSmarts('[#16X4]([NX3])(=[OX1])(=[OX1])[#6]'),
    'Sulfonate': Chem.MolFromSmarts('[#16X4](=[OX1])(=[OX1])([#6])[OX2H0]'),
    'Sulfone': Chem.MolFromSmarts('[#16X4](=[OX1])(=[OX1])([#6])[#6]'),
    'Sulfonic acid': Chem.MolFromSmarts('[#16X4](=[OX1])(=[OX1])([#6])[OX2H]'),
    'Sulfoxide': Chem.MolFromSmarts('[#16X3]=[OX1]'),
    'Thial': Chem.MolFromSmarts('[CX3H1](=S)[#6,H]'),
    'Thioamide': Chem.MolFromSmarts('[NX3][CX3]=[SX1]'),
    'Thiol': Chem.MolFromSmarts('[#16X2H]')
}
def match_group(mol: Chem.Mol, func_group) -> int:
    if type(func_group) == Chem.Mol:
        n = len(mol.GetSubstructMatches(func_group))
    else:
        n = func_group(mol)
    return 0 if n == 0 else 1
# Function to map SMILES to functional groups (no change)
def get_functional_groups(smiles: str) -> dict:
    smiles = smiles.strip().replace(' ', '')
    mol = Chem.MolFromSmiles(smiles)
    if mol is None: 
        return None
    func_groups = [match_group(mol, smarts) for smarts in functional_groups.values()]
    return func_groups

def interpolate_to_600(spec):
    old_x = np.arange(len(spec))
    new_x = np.linspace(min(old_x), max(old_x), 600)
    interp = interp1d(old_x, spec)
    return interp(new_x)

def make_msms_spectrum(spectrum):
    msms_spectrum = np.zeros(10000)
    for peak in spectrum:
        peak_pos = int(peak[0]*10)
        peak_pos = min(peak_pos, 9999)
        msms_spectrum[peak_pos] = peak[1]
    return msms_spectrum

# Define CNN Model in PyTorch
class CNNModel(nn.Module):
    def __init__(self, num_fgs):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=31, kernel_size=11, padding='same')
        self.conv2 = nn.Conv1d(in_channels=31, out_channels=62, kernel_size=11, padding='same')
        self.fc1 = nn.Linear(62 * 150, 4927)
        self.fc2 = nn.Linear(4927, 2785)
        self.fc3 = nn.Linear(2785, 1574)
        self.fc4 = nn.Linear(1574, num_fgs)
        self.dropout = nn.Dropout(0.48599073736368)
        self.batch_norm1 = nn.BatchNorm1d(31)
        self.batch_norm2 = nn.BatchNorm1d(62)

    def forward(self, x):
        x = F.relu(self.batch_norm1(self.conv1(x)))
        x = F.max_pool1d(x, 2)
        x = F.relu(self.batch_norm2(self.conv2(x)))
        x = F.max_pool1d(x, 2)
        x = x.view(x.size(0), -1)  # Flatten
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.dropout(x)
        x = torch.sigmoid(self.fc4(x))
        return x




In [2]:
# Training function in PyTorch
def train_model(X_train, y_train, X_test, num_fgs, weighted=False, batch_size=41, epochs=41):
    device = torch.device('cuda:2' if torch.cuda.is_available() else 'cpu')
    model = CNNModel(num_fgs).to(device)
    
    # Define optimizer and loss
    optimizer = optim.Adam(model.parameters())
    
    if weighted:
        class_weights = calculate_class_weights(y_train)
        criterion = WeightedBinaryCrossEntropyLoss(class_weights).to(device)
    else:
        criterion = nn.BCELoss().to(device)

    # Create DataLoader
    train_data = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
    test_data = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

    # Train the model
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs.unsqueeze(1))  # Add channel dimension
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()

        print(f'Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}')

    # Evaluate the model
    model.eval()
    predictions = []
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs.unsqueeze(1))
            predictions.append(outputs.cpu().numpy())

    predictions = np.concatenate(predictions)
    return (predictions > 0.5).astype(int)

# Custom loss function with class weights
class WeightedBinaryCrossEntropyLoss(nn.Module):
    def __init__(self, class_weights):
        super(WeightedBinaryCrossEntropyLoss, self).__init__()
        self.class_weights = class_weights

    def forward(self, y_pred, y_true):
        loss = self.class_weights[0] * (1 - y_true) * torch.log(1 - y_pred + 1e-15) + \
               self.class_weights[1] * y_true * torch.log(y_pred + 1e-15)
        return -loss.mean()

# Calculate class weights
def calculate_class_weights(y_true):
    num_samples = y_true.shape[0]
    class_weights = np.zeros((2, y_true.shape[1]))
    for i in range(y_true.shape[1]):
        weights_n = num_samples / (2 * (y_true[:, i] == 0).sum())
        weights_p = num_samples / (2 * (y_true[:, i] == 1).sum())
        class_weights[0, i] = weights_n
        class_weights[1, i] = weights_p
    return torch.tensor(class_weights.T, dtype=torch.float32)



In [3]:
# Loading data (no change)
analytical_data = Path("/data/zjh2/multimodal-spectroscopic-dataset-main/data/multimodal_spectroscopic_dataset")
out_path = Path("/home/dwj/icml_guangpu/multimodal-spectroscopic-dataset-main/runs/runs_f_groups/h_nmr")
column = "h_nmr_spectra"
seed = 3245

training_data = None
for i, parquet_file in enumerate(analytical_data.glob("*.parquet")):
    data = pd.read_parquet(parquet_file, columns=[column, 'smiles'])
    data[column] = data[column].map(interpolate_to_600)
    data['func_group'] = data.smiles.map(get_functional_groups)
    print("Loaded Data: ", i)
    if training_data is None:
        training_data = data
    else:
        training_data = pd.concat((training_data, data))



Loaded Data:  0
Loaded Data:  1
Loaded Data:  2
Loaded Data:  3
Loaded Data:  4
Loaded Data:  5
Loaded Data:  6
Loaded Data:  7
Loaded Data:  8
Loaded Data:  9
Loaded Data:  10
Loaded Data:  11
Loaded Data:  12
Loaded Data:  13
Loaded Data:  14
Loaded Data:  15
Loaded Data:  16
Loaded Data:  17
Loaded Data:  18
Loaded Data:  19
Loaded Data:  20
Loaded Data:  21
Loaded Data:  22
Loaded Data:  23
Loaded Data:  24
Loaded Data:  25
Loaded Data:  26
Loaded Data:  27
Loaded Data:  28
Loaded Data:  29
Loaded Data:  30
Loaded Data:  31
Loaded Data:  32
Loaded Data:  33
Loaded Data:  34
Loaded Data:  35
Loaded Data:  36
Loaded Data:  37
Loaded Data:  38
Loaded Data:  39
Loaded Data:  40
Loaded Data:  41
Loaded Data:  42
Loaded Data:  43
Loaded Data:  44
Loaded Data:  45
Loaded Data:  46
Loaded Data:  47
Loaded Data:  48
Loaded Data:  49
Loaded Data:  50
Loaded Data:  51
Loaded Data:  52
Loaded Data:  53
Loaded Data:  54
Loaded Data:  55
Loaded Data:  56
Loaded Data:  57
Loaded Data:  58
Loaded 

In [4]:
train, test = train_test_split(training_data, test_size=0.1, random_state=seed)

X_train = np.stack(train[column].to_list())
y_train = np.stack(train['func_group'].to_list())
X_test = np.stack(test[column].to_list())
y_test = np.stack(test['func_group'].to_list())



In [5]:
# Train extended model
predictions = train_model(X_train, y_train, X_test, num_fgs=37, weighted=False)

# Evaluate the model
f1 = f1_score(y_test, predictions, average='micro')
print(f'F1 Score: {f1}')

# Save results
with open(out_path / "results.pickle", "wb") as file:
    pickle.dump({'pred': predictions, 'tgt': y_test}, file)

Epoch 1/41, Loss: 0.1723814986826042
Epoch 2/41, Loss: 0.15927543758489826
Epoch 3/41, Loss: 0.15386978669757587
Epoch 4/41, Loss: 0.14995749643682046
Epoch 5/41, Loss: 0.14675061757517105
Epoch 6/41, Loss: 0.1441214064207603
Epoch 7/41, Loss: 0.14170353545302256
Epoch 8/41, Loss: 0.13988476863268817
Epoch 9/41, Loss: 0.13809043581604075
Epoch 10/41, Loss: 0.13647553762488837
Epoch 11/41, Loss: 0.1355010384033768
Epoch 12/41, Loss: 0.13379026336107405
Epoch 13/41, Loss: 0.13267997772132142
Epoch 14/41, Loss: 0.13156943159749648
Epoch 15/41, Loss: 0.13057929950787786
Epoch 16/41, Loss: 0.1295157467738239
Epoch 17/41, Loss: 0.12873149593185823
Epoch 18/41, Loss: 0.12840971173323912
Epoch 19/41, Loss: 0.12767608644741651
Epoch 20/41, Loss: 0.12651016189430456
Epoch 21/41, Loss: 0.12614036233225945
Epoch 22/41, Loss: 0.1254055682456311
Epoch 23/41, Loss: 0.1244882598390727
Epoch 24/41, Loss: 0.12421700988265857
Epoch 25/41, Loss: 0.12337275618056351
Epoch 26/41, Loss: 0.12310836691499283
E

In [None]:
train, test = train_test_split(training_data, test_size=0.1, random_state=seed)

X_train = np.stack(train[column].to_list())
y_train = np.stack(train['func_group'].to_list())
X_test = np.stack(test[column].to_list())
y_test = np.stack(test['func_group'].to_list())