<a href="https://www.kaggle.com/code/andyc97/chemberta-finetuning-with-downstream-xgboost?scriptVersionId=272629735" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# RDKit: Open-Source Cheminformatics Software
https://www.rdkit.org/

%%capture
!pip install rdkit

# Libraries

In [2]:
import os
from itertools import product

# Basic data manipulation
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# RDKit for cheminformatics
from rdkit import Chem
from rdkit.Chem import Descriptors
from rdkit.Chem.rdMolDescriptors import GetMorganFingerprintAsBitVect, GetMACCSKeysFingerprint

# XG boost
import xgboost as xgb

# Sklearn for downstream prediction
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split

# Pytorch for finetuning BERT model
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader

# Load transformer model
from transformers import AutoModelForMaskedLM, AutoTokenizer, AutoModel, AutoModelForSequenceClassification

# Dataset

## Load Data
* The training dataset `train.csv` is loaded to `df_train`.
* The test dataset `test.csv` is loaded to `df_test`.
* The augmented dataset `smiles_melting_point.csv` is loaded to `df_aug` (https://www.kaggle.com/datasets/seddiktrk/melting-point-smiles).

In [4]:
df_train = pd.read_csv('/kaggle/input/melting-point/train.csv')
df_test = pd.read_csv('/kaggle/input/melting-point/test.csv')
df_aug = pd.read_csv('/kaggle/input/melting-point-smiles/smiles_melting_point.csv') # external dataset

## Handle Data Leakage

+ Convert smiles strings in the test set to its caninical form.

In [18]:
# Helper function
def make_canonical(smile):
    mol = Chem.MolFromSmiles(smile)
    if mol is not None:
        canonical_smile = Chem.MolToSmiles(mol)
        return canonical_smile
    else:
        return np.nan

# The list of smile string to be exluded
black_list = df_test['SMILES'].apply(make_canonical).to_list()

# Canonicalize smile string in the augmented dataset 
df_aug['SMILES_C'] = df_aug['SMILES'].apply(make_canonical)

[13:10:44] Explicit valence for atom # 13 Cl, 7, is greater than permitted
[13:10:44] Explicit valence for atom # 32 Cl, 5, is greater than permitted


In [33]:
df_train_aug = df_aug[(~df_aug['SMILES_C'].isin(black_list)) & (df_aug['UNIT {Melting Point}.1'] == 'K')]
df_train_aug = df_train_aug[['SMILES', 'Melting Point {measured, converted}']]

# Rename column to align with competition dataset
df_train_aug.columns = ['SMILES', 'Tm']

## Data Augmentation
+ Combining both datasets we have 276,510 training points.

In [40]:
df_train = pd.concat([df_train[['SMILES', 'Tm']], df_train_aug], ignore_index=True)
df_train.shape

(276510, 2)

## Train-Test Split
+ The dataset is split into 80/20 for training and validation.

In [5]:
seed = 20251017
torch.manual_seed(seed)
df_train, df_val = train_test_split(df_train, test_size=0.2, random_state=seed)

+ Standardization $(y_i-\bar{y})/s$ is applied to the target, melting points `Tm`.
+ The standardized target values are denoted by `TmS`.

In [None]:
scaler = StandardScaler()
scaler.fit(df_train[['Tm']]) # pass in 2D array
std = scaler.var_[0]**0.5 # estimate of s

df_train['TmS'] = scaler.transform(df_train[['Tm']]).flatten() # convert back to 1D array 
df_val['TmS'] = scaler.transform(df_val[['Tm']]).flatten()

+ Define the data handler for fine-tuning ChemBerta with pytorch.

In [None]:
class ChemDataset(Dataset):  
    def __init__(self, df, tokenizer, max_length=128):  
        self.smiles = df['SMILES'].tolist()  
        self.labels = df['TmS'].tolist()  
        self.tokenizer = tokenizer  
        self.max_length = max_length  
  
    def __len__(self):  
        return len(self.labels)  
  
    def __getitem__(self, idx):  
        encoding = self.tokenizer(  
            self.smiles[idx],  
            truncation=True,  
            padding='max_length',  
            max_length=self.max_length,  
            return_tensors='pt'  
        )  
        item = {key: val.squeeze(0) for key, val in encoding.items()}  
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)  
        return item  

+ Define the transformer model ChemBerta.
+ A regression head is added on top of the transformer model.

In [None]:
# Model retrieved from https://www.kaggle.com/code/michaelrowen/opp2025-chemberta-pre-trained-base
class BERTEmbedder:
    def __init__(self, model_name):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(  
            model_name,
            num_labels=1,  
            problem_type='regression' # Regression task  
        )  
        self.model.eval()

# Finetuning ChemBERTa

+ The transformer model `ChemBerta` is available in Kaggle (https://www.kaggle.com/code/michaelrowen/opp2025-chemberta-pre-trained-base).
+ Load the transfomer model and define data handlers for training and validation sets, respectively.

In [None]:
# Training configuration
chemberta_model = '/kaggle/input/c/transformers/default/1/ChemBERTa-77M-MLM'
chemberta = BERTEmbedder(model_name=chemberta_model)
optimizer = AdamW(chemberta.model.parameters(), lr=1e-4) # lr set by experiment
loss_fn = nn.L1Loss() # Since reduce = mean, L1Loss measures MAE
n_epochs = 30

# Data handler
dataset_train = ChemDataset(df_train, chemberta.tokenizer)
dataset_val = ChemDataset(df_val, chemberta.tokenizer)
dataloader_train = DataLoader(dataset_train, batch_size=16, shuffle=True) # batch_size set by experiment
dataloader_val = DataLoader(dataset_val, batch_size=16, shuffle=True) # batch_size set by experiment

+ Move the model to GPU on Kaggle platform and start training.

In [None]:
if not os.path.exists('/kaggle/working/weights.pth'):
    # Enable GPU if available
    device = 'cuda' if torch.cuda.is_available() else 'cpu'  
    chemberta.model.to(device)
    
    train_loss_list = []
    val_loss_list = []
    # Training cycle
    for epoch in range(n_epochs):
        chemberta.model.train()
        epoch_train_size = 0
        epoch_val_size = 0
        epoch_train_loss = 0
        epoch_val_loss = 0
    
        # evaluate training set
        for batch in dataloader_train:
            # forward pass
            inputs = {k: v.to(device) for k, v in batch.items() if k != "labels"}  
            labels = batch['labels'].to(device).unsqueeze(1)  # shape [B,1]  
            outputs = chemberta.model(**inputs).logits  # shape [B,1] 
            loss = loss_fn(outputs, labels)  
    
            # backward pass
            optimizer.zero_grad()  
            loss.backward()  
            optimizer.step()  
    
            # update epoch loss
            epoch_train_loss += loss.item() * dataloader_train.batch_size * std
            epoch_train_size += dataloader_train.batch_size
        
        # evaluate validation set
        with torch.no_grad():
            for batch in dataloader_val:
                # forward pass
                inputs = {k: v.to(device) for k, v in batch.items() if k != "labels"}  
                labels = batch['labels'].to(device).unsqueeze(1)  # shape [B,1]  
                outputs = chemberta.model(**inputs).logits  # shape [B,1] 
                loss = loss_fn(outputs, labels)  
    
                # update epoch loss
                epoch_val_loss += loss.item() * dataloader_val.batch_size * std
                epoch_val_size += dataloader_val.batch_size
    
        # compute loss per epoch
        avg_train_loss = epoch_train_loss/epoch_train_size
        avg_val_loss = epoch_val_loss/epoch_val_size
    
        # save model with the lowest validation loss
        if len(val_loss_list) > 0 and avg_val_loss < np.min(val_loss_list):
            torch.save(chemberta.model.state_dict(), '/kaggle/working/weights.pth')
            
        train_loss_list.append(avg_train_loss)
        val_loss_list.append(avg_val_loss)
        print(f"Epoch {epoch + 1} done, training loss: {avg_train_loss:.4f}, validation loss: {avg_val_loss:.4f}") 
        
    # Evaluate in CPU
    chemberta.model.eval()
    chemberta.model.to('cpu')

In [None]:
# Load the best model
chemberta.model.load_state_dict(torch.load('/kaggle/working/weights.pth', map_location=torch.device('cpu')))

+ The validation loss seems to be converged after 15 epochs. 

In [None]:
plt.plot([i + 1 for i in range(n_epochs)], train_loss_list, label ='training loss')
plt.plot([i + 1 for i in range(n_epochs)], val_loss_list, '-.', label ='validation loss')

plt.xlabel("X-axis data")
plt.ylabel("Y-axis data")
plt.legend()
plt.title('Finetuning ChemBerta with Regression layer')
plt.show()

# Generate Features

In [None]:
def extract_chembert_embeddings(smiles_list, embedder, n_data):
    n_latent = 384
    embeddings = np.zeros((n_data, n_latent))
    
    for i, smiles in enumerate(smiles_list):
        with torch.no_grad():
            # Getting the model output
            encoded_input = embedder.tokenizer(smiles, return_tensors='pt', padding=True, truncation=True)
            model_output = embedder.model(**encoded_input, output_hidden_states=True)
            # embeddings[i, :] = model_output.logits.numpy()
            
            # Getting the CLS token from model output
            embedding = model_output.hidden_states[3][:, 0, :]
            embeddings[i, :] = embedding.numpy()
    
    return pd.DataFrame(embeddings, columns=[f'embedding_{i+1}' for i in range(n_latent)])

In [None]:
# Code retrieved from https://www.kaggle.com/code/aliffaagnur/single-lightgbm-optuna/notebook
def extract_all_descriptors(smiles_list):

    # GET ALL DESCRIPTORS
    descriptor_list = Descriptors._descList    # --> THESE WILL RETURN LIST OF TUPLE
    descriptors = [desc[0] for desc in descriptor_list]

    print(f'There Are {len(descriptor_list)} Descriptor Features')

    # EXTRACT ALL DESCRIPTORS FROM SMILES FEATURES
    result = []
    for smiles in smiles_list:

        mol = Chem.MolFromSmiles(smiles)

        # IF MOLECOLE IS INVALID
        if mol is None:
            row = {name : None for name, func in descriptor_list}
        else:
            # CREATE DESCRIPTORS FEATURES
            row = {name: func(mol) for name, func in descriptor_list}

        result.append(row)

    # MERGE DATA WITH EXTRACTED FEATURES
    return pd.DataFrame(result)

In [None]:
def extract_fingerprints(smiles_list):
    """
    Generates all features for a list of SMILES strings using RDKit.
    Returns a list of feature dictionaries.
    """
    features = []
    for smiles in smiles_list:
        mol = Chem.MolFromSmiles(smiles)
        
        feature_dict = {}
        if mol is not None:
            # Descriptors
            for name, func in Descriptors.descList:
                try:
                    feature_dict[name] = func(mol)
                except Exception:
                    feature_dict[name] = np.nan
            
            # Morgan Fingerprint 
            fp_morgan = GetMorganFingerprintAsBitVect(mol, radius=2, nBits=1024)
            for i in range(fp_morgan.GetNumBits()):
                feature_dict[f"Morgan_{i}"] = int(fp_morgan.GetBit(i))
            
            # MACCS Keys
            fp_maccs = GetMACCSKeysFingerprint(mol)
            for i in range(fp_maccs.GetNumBits()):
                feature_dict[f"MACCS_{i}"] = int(fp_maccs.GetBit(i))

        features.append(feature_dict)
    return pd.DataFrame(features)

In [None]:
%%capture
# Generate features for training set
embeddings_train = extract_chembert_embeddings(df_train['SMILES'], chemberta, df_train.shape[0])
molecular_features_train = extract_all_descriptors(df_train['SMILES'])
molecular_fingerprint_train = extract_fingerprints(df_train['SMILES'])

# Generate features for validation set
embeddings_val = extract_chembert_embeddings(df_val['SMILES'], chemberta, df_val.shape[0])
molecular_features_val = extract_all_descriptors(df_val['SMILES'])
molecular_fingerprint_val = extract_fingerprints(df_val['SMILES'])

In [None]:
# Dataset size
print('Training set')
print(embeddings_train.shape, type(embeddings_train))
print(molecular_features_train.shape, type(molecular_features_train))
print(molecular_fingerprint_train.shape, type(molecular_fingerprint_train))

print('Valiodation set')
print(embeddings_val.shape, type(embeddings_val))
print(molecular_features_val.shape, type(molecular_features_val))
print(molecular_fingerprint_val.shape, type(molecular_fingerprint_val))

# Downstream Prediction Task

In [None]:
# Dataset for sklearn - Training set
df_ttl_train = pd.concat([
    df_train.reset_index(drop=True), 
    embeddings_train, 
    # molecular_features_train,
    molecular_fingerprint_train
], axis=1)
y_train = df_ttl_train['TmS']
X_train = df_ttl_train.drop(df_train.columns, axis=1)
#X_train = df_ttl_train.drop(['id', 'Tm', 'TmS', 'SMILES'], axis=1)
X_train.columns = [str(colname) for colname in X_train.columns]

# Dataset for sklearn - Validation set
df_ttl_val = pd.concat([
    df_val.reset_index(drop=True), 
    embeddings_val, 
    # molecular_features_val,
    molecular_fingerprint_val
], axis=1)
y_val = df_ttl_val['TmS']
X_val = df_ttl_val.drop(df_val.columns, axis=1)
#X_val = df_ttl_val.drop(['id', 'Tm', 'TmS', 'SMILES'], axis=1)
X_val.columns = [str(colname) for colname in X_val.columns]

## Extreme Gradient Boosting

In [None]:
# Define hyperparameter grid for optimization  
param_grid = {
    'n_estimators' : [10000],
    'max_depth': [7, 9, 12],
    'eta': [0.02, 0.05, 0.1],
    'subsample': [0.5, 0.7],
    'colsample_bytree': [0.5, 0.7],
    'reg_alpha': [0, 0.5, 0.7],
    'reg_lambda': [0, 0.5, 0.7]
}
param_list = product(*[param_grid[key] for key in param_grid.keys()])
param_names = [*param_grid.keys()]

# Define Dmatrix for GPU training
# dtrain = xgb.DMatrix(X_train, label=y_train)
# dval = xgb.DMatrix(X_val, label=y_val)

In [None]:
mae_scores = []
for i, param in enumerate(param_list):
    param_dict = dict(zip(param_names, param))
    model_xgb = xgb.XGBRegressor(random_state=seed, objective='reg:squarederror', eval_metric='mae', early_stopping_rounds=100, device='cuda', **param_dict)
    model_xgb.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_val, y_val)], verbose=False)
    result = model_xgb.evals_result()
    train_loss = result['validation_0']['mae'][-1] * std
    val_loss = result['validation_1']['mae'][-1] * std
    print(f'Model {i}', '\t', train_loss, '\t', val_loss)
    mae_scores.append(list(param) + [val_loss])
    
df_downstream = pd.DataFrame(mae_scores, columns= param_names + ['mae_score']) 

In [None]:
df_downstream = pd.DataFrame(mae_scores, columns= param_names + ['mae_score']) 

In [None]:
df_downstream.sort_values(by='mae_score', inplace=True)
df_downstream.head()

In [None]:
# Retrieve the best model parameters
best_model_config = df_downstream.nsmallest(1, 'mae_score').\
    drop('mae_score', axis=1).\
    iloc[0, :].to_dict()

# Type correction
best_model_config['max_depth'] = int(best_model_config['max_depth'])
best_model_config['n_estimators'] = int(best_model_config['n_estimators'])

# Train the downstream model with the best parameters
best_model = xgb.XGBRegressor(random_state=seed, **best_model_config)
best_model.fit(X_train, y_train)

# Submission

In [None]:
# Generate features for the test set
embeddings_test = extract_chembert_embeddings(df_test['SMILES'], chemberta, df_test.shape[0])
molecular_features_test = extract_all_descriptors(df_test['SMILES'])

# Prepare dataset for downstream regression tasks
df_ttl_test = pd.concat([
    df_test.reset_index(drop=True), 
    embeddings_test, 
    molecular_features_test
], axis=1)


X_test = df_ttl_test.drop(df_test.columns, axis=1)
X_test.columns = [str(colname) for colname in X_test.columns]

y_pred = scaler.inverse_transform( # back transform to raw target
    np.expand_dims(best_model.predict(X_test), axis=1) # convert to 2D array by adding an extra dimension
)

In [None]:
df_out = pd.DataFrame({'id': df_ttl_test['id'],'Tm': y_pred.flatten()})
df_out.to_csv('./submission.csv', index=False)