In [1]:
import os
import json
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split

CSV_PATH = 'spectra.csv'

# Index -> functional group name
label_map = [
    'phenol',
    'aldehyde',
    'arene'
]
num_classes = len(label_map)

# Torch expects every label list to have the same length. Not all samples have the same amount of labels, so instead
# we convert the labels to a multi hot vector v, where v_i = 1 if the sample has label i and v_i = 0 otherwise
def labels_to_multi_hot_vector(labels):
    multi_hot_vector = []
    for i in range(len(label_map)):
        if label_map[i] in labels:
            multi_hot_vector.append(1)
        else:
            multi_hot_vector.append(0)
    return torch.tensor(multi_hot_vector, dtype=torch.float32)

class IRDataset(Dataset):
    def __init__(self, csv_path, transform=None, target_transform=None):
        self.df = pd.read_csv(csv_path)
        # Parse all the json encoded spectra
        self.df['spectrum'] = self.df['spectrum'].apply(json.loads)
        # Convert the string labels to a multi hot vector
        self.df['labels'] = self.df['labels'].apply(labels_to_multi_hot_vector)
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        spectrum = self.df['spectrum'].iloc[idx]
        # Torch expects spectra as a tensor, not a list
        spectrum = torch.tensor(spectrum, dtype=torch.float32)
        
        labels = self.df['labels'].iloc[idx]
        
        if (self.transform):
            spectrum = self.transform(spectrum)
        if (self.target_transform):
            labels = self.target_transform(labels)
        return spectrum, labels    
    
dataset = IRDataset(CSV_PATH)

# TODO: make this split more fair by ensuring groups are evenly split between train/test
# (also add validation set?)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_set, test_set = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = DataLoader(test_set, batch_size=64)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def precision(predictions, true_labels, label_index):
    true_positive = sum((predictions[i][label_index] == 1).item() and (true_labels[i][label_index] == 1).item() for i in range(len(predictions)))
    false_positive = sum((predictions[i][label_index] == 1).item() and (true_labels[i][label_index] == 0).item() for i in range(len(predictions)))
    if true_positive == 0:
        return 0.0
    return true_positive / (true_positive + false_positive)

def recall(predictions, true_labels, label_index):
    true_positive = sum((predictions[i][label_index] == 1).item() and (true_labels[i][label_index] == 1).item() for i in range(len(predictions)))
    false_negative = sum((predictions[i][label_index] == 0).item() and (true_labels[i][label_index] == 1).item() for i in range(len(predictions)))
    if true_positive == 0:
        return 0.0
    return true_positive / (true_positive + false_negative)
    
def f1_score(predictions, true_labels, label_index):
    precision_score = precision(predictions, true_labels, label_index)
    recall_score = recall(predictions, true_labels, label_index)
    if precision_score == 0 and recall_score == 0:
        return 0.0
    return 2 * precision_score * recall_score / (precision_score + recall_score)

def EMR(predictions, true_labels):
    return (predictions == true_labels).all(axis=1).mean()
                
def evaluate(model, data_loader, batch_transform, device = 'cpu'):
    model.eval()
    
    all_true_labels = []
    all_predictions = []
    with torch.no_grad():
        for batch_inputs, batch_labels in data_loader:
            batch_inputs = batch_transform(batch_inputs).to(device)
            batch_labels = batch_labels.to(device)
            outputs = model(batch_inputs)
            probabilities = torch.sigmoid(outputs)
            predictions = (probabilities > 0.5).float()
        
            all_true_labels.append(batch_labels)
            all_predictions.append(predictions)
    
    y_true = torch.cat(all_true_labels).cpu().numpy()
    y_pred = torch.cat(all_predictions).cpu().numpy()

    results = {}
    f1_scores = []
    for i in range(len(label_map)):
        results[label_map[i]] = {}
        results[label_map[i]]['precision'] = precision(y_pred, y_true, i)
        results[label_map[i]]['recall'] = recall(y_pred, y_true, i)
        f1 = f1_score(y_pred, y_true, i)
        results[label_map[i]]['f1_score'] = f1
        f1_scores.append(f1)
    results['macro'] = {}
    results['macro']['f1_score'] = np.mean(f1_scores)
    results['macro']['EMR'] = EMR(y_pred, y_true)
    
    return results

In [7]:
import optuna 

INPUT_LENGTH = 3600
print(torch.cuda.is_available())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using %s' % (device))

class NeuralNetwork(nn.Module):
    def __init__(self, conv_config, fc_config):
        super().__init__()

        conv_layers = []
        last_out_channels = 1
        conv_out_len = INPUT_LENGTH
        for config in conv_config:
            conv_layers.append(nn.Conv1d(in_channels=last_out_channels, out_channels=config['conv_outchannels'], kernel_size=config['conv_kernelsize']))
            conv_layers.append(nn.BatchNorm1d(config["conv_outchannels"]))
            conv_layers.append(nn.ReLU())
            conv_layers.append(nn.MaxPool1d(2))
            last_out_channels = config['conv_outchannels']
            conv_out_len = (conv_out_len - (config['conv_kernelsize'] - 1)) // 2
        self.conv_stack = nn.Sequential(*conv_layers)

        fc_layers = []
        last_out = conv_out_len * last_out_channels
        fc_layers.append(nn.Flatten())
        for config in fc_config:
            fc_layers.append(nn.Linear(last_out, config['fc_size']))
            fc_layers.append(nn.ReLU())
            fc_layers.append(nn.Dropout(config['dropout']))
            last_out = config['fc_size']
        fc_layers.append(nn.Linear(last_out, num_classes))
        self.fc_stack = nn.Sequential(*fc_layers)
    
    def forward(self, x):
        x = self.conv_stack(x)
        x = self.fc_stack(x)
        return x
    
def batch_to_conv_input(batch_input):
    return batch_input.unsqueeze(1)

def objective(trial):
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)

    conv_layers = trial.suggest_int('conv_layers', 2, 4)
    conv_config = []
    for i in range(conv_layers):
        conv_config.append({})
        conv_config[i]['conv_outchannels'] = trial.suggest_categorical('conv%i_outchannels' % (i), [8, 16, 32, 48, 64])
        conv_config[i]['conv_kernelsize'] = trial.suggest_categorical('conv%i_kernelsize' % (i), [5, 7, 9, 11, 13])

    fc_layers = trial.suggest_int('fc_layers', 1, 3)
    fc_config = []
    for i in range(fc_layers):
        fc_config.append({})
        fc_config[i]['fc_size'] = trial.suggest_categorical('fc%i_size' % (i), [64, 128, 256, 512])
        max_dropout = max(0.1, 0.5 - i * 0.2) # Give later layers a lower dropout
        fc_config[i]['dropout'] = trial.suggest_float('dropout%i' % (i), 0.0, max_dropout)
    
    model = NeuralNetwork(
        conv_config = conv_config,
        fc_config = fc_config
    ).to(device)
    
    # Increasing pos_weight punishes false negatives more heavily
    pos_weight = torch.full((num_classes,), 2.0, device=device) 
    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, min_lr=1e6)
    
    model.train()
    
    # For hyperparameter optimization, 10 epochs will be enough to see if the model is good or bad
    num_epochs=10
    for i in range(num_epochs):
        for batch_inputs, batch_labels in train_loader:
            # The CNN expects batch_inputs to have a channel dimension (like [batch_size, 1, input_length]), but currently
            # batch_input has no channel and the dimensions of batch_inputs are [64, 3600].
            # unsqueeze adds a channel dimension in the middle, so batch_input's dimensions become [64, 1, 3600]
            batch_inputs = batch_to_conv_input(batch_inputs).to(device)
            batch_labels = batch_labels.to(device)
        
            optimizer.zero_grad()
            outputs = model(batch_inputs)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()
            scheduler.step(loss)
    
        results = evaluate(model, test_loader, batch_to_conv_input)
        f1 = results['macro']['f1_score']
        
        # Print some additional metrics at the end of a run
        if (i == num_epochs - 1):
            for j in range(num_classes):
                print('%s, recall: %f, precision: %f' % (label_map[j], results[label_map[j]]['recall'], results[label_map[j]]['precision']))
            print('Macro, F1: %f, EMR: %f' % (results['macro']['f1_score'], results['macro']['EMR']))
        trial.report(f1, step=i)
        
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
        
    return f1
    

pruner = optuna.pruners.MedianPruner(n_warmup_steps=2)
study = optuna.create_study(direction='maximize', pruner=pruner)
study.optimize(objective, n_trials=25)

best_trial = study.best_trial
print(best_trial.params)
print(best_trial.value)


[I 2025-06-04 12:48:07,817] A new study created in memory with name: no-name-2873b877-7d9e-4acb-82a2-c1620c22143e


False
Using cpu


[W 2025-06-04 12:48:09,874] Trial 0 failed with parameters: {'learning_rate': 0.00024884398041942127, 'conv_layers': 2, 'conv0_outchannels': 32, 'conv0_kernelsize': 11, 'conv1_outchannels': 32, 'conv1_kernelsize': 5, 'fc_layers': 3, 'fc0_size': 512, 'dropout0': 0.20286414651558832, 'fc1_size': 128, 'dropout1': 0.1711371185900922, 'fc2_size': 512, 'dropout2': 0.09787127195927298} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "C:\Users\20234238\AppData\Local\anaconda3\envs\cbl_ir\lib\site-packages\optuna\study\_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "C:\Users\20234238\AppData\Local\Temp\ipykernel_21232\1854491737.py", line 87, in objective
    loss.backward()
  File "C:\Users\20234238\AppData\Local\anaconda3\envs\cbl_ir\lib\site-packages\torch\_tensor.py", line 489, in backward
    self, gradient, retain_graph, create_graph, inputs=inputs
  File "C:\Users\20234238\AppData\Local\anaconda3\envs\cbl_ir\

KeyboardInterrupt: 