# Packages and initial setup

In [None]:
!pip install Catboost lightgbm optuna-integration rdkit

In [None]:
from catboost import CatBoostClassifier
from functools import partial
from lightgbm import LGBMClassifier
import joblib
import numpy as np
import optuna
import os
import pandas as pd
from pathlib import Path

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import (
    MolFromSmiles,
    MolToSmiles,
    DataStructs,
    Descriptors,
    Mol,
    MACCSkeys
)
from rdkit.Chem.rdFingerprintGenerator import (
    GetMorganGenerator,
    GetRDKitFPGenerator,
    GetTopologicalTorsionGenerator
)
from rdkit.Chem import rdMolDescriptors
from rdkit.ML.Descriptors.MoleculeDescriptors import MolecularDescriptorCalculator

from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler

import time
import warnings
from xgboost import XGBClassifier, XGBRegressor

RANDOM_STATE = 10
INPUT_DIR = Path("/kaggle/input/euos25")
WORKING_DIR = Path("/kaggle/working")

pd.set_option(
    "display.float_format",
    lambda x: "%.0f" % x if x.is_integer() else "%.3f" % x
)

# Suppress annoying but non-critical LGB warning about missing feature names
warnings.filterwarnings(
    "ignore",
    message="X does not have valid feature names, but LGBM"
)

# Data

In [None]:
t340_filename = "euos25_challenge_train_transmittance340.csv"
t450_679_avg_filename = "euos25_challenge_train_transmittance450.csv"
f340_450_filename = "euos25_challenge_train_fluorescence340_450.csv"
fmulti_filename = "euos25_challenge_train_fluorescence480.csv"

test_filename = "euos25_challenge_test.csv"

df_t340 = pd.read_csv(INPUT_DIR / t340_filename)
df_t450_679 = pd.read_csv(INPUT_DIR / t450_679_avg_filename)
df_f340_450 = pd.read_csv(INPUT_DIR / f340_450_filename)
df_fmulti = pd.read_csv(INPUT_DIR / fmulti_filename)

t340ext_filename = ("euos25_challenge_train_transmittance340_extended.csv")
t450_679_avgext_filename = ("euos25_challenge_train_transmittance450_extended.csv")
f340_450ext_filename = ("euos25_challenge_train_fluorescence340_450_extended.csv")
fmultiext_filename = ("euos25_challenge_train_fluorescence480_extended.csv")
leaderboard_filename = ("euos_challenge_2025_leaderboard.csv")

df_t340ext = pd.read_csv(INPUT_DIR / t340ext_filename)
df_t450_679ext = pd.read_csv(INPUT_DIR / t450_679_avgext_filename)
df_f340_450ext = pd.read_csv(INPUT_DIR / f340_450ext_filename)
df_fmultiext = pd.read_csv(INPUT_DIR / fmultiext_filename)
df_leaderboard = pd.read_csv(INPUT_DIR / leaderboard_filename)
df_test = pd.read_csv(INPUT_DIR / test_filename)

In [None]:
df_t340["canonical_smiles"] = df_t340["SMILES"].apply(
    lambda smiles: MolToSmiles(MolFromSmiles(smiles))
)

if (df_t340["canonical_smiles"].duplicated().any()):
    print("Duplicated SMILES detected after canonicalization.")
else:
    print("No duplicated SMILES detected after canonicalization.")

# All files contains the same molecules
# -> merge
df_train = pd.DataFrame(
    {
        "id": df_t340["N"],
        "canonical_smiles": df_t340["canonical_smiles"],
        "t340": df_t340["Transmittance (qualitative)"],
        "t450_679": df_t450_679["Transmittance"],
        "f340_450": df_f340_450["Fluorescence"],
        "fmulti": df_fmulti["Fluorescence"]
    }
)

df_leaderboard["canonical_smiles"] = df_leaderboard["SMILES"].apply(
    lambda smiles: MolToSmiles(MolFromSmiles(smiles))
)

if (df_leaderboard["canonical_smiles"].duplicated().any()):
    print("Duplicated SMILES detected after canonicalization.")
else:
    print("No duplicated SMILES detected after canonicalization.")

df_leaderboard = df_leaderboard.drop(columns=["SMILES"])
df_leaderboard = df_leaderboard.rename(
    columns={
        "ID": "id",
        "Transmittance(340)": "t340",
        "Transmittance(450)": "t450_679",
        "Fluorescence(340/450)": "f340_450",
        "Fluorescence(>480)": "fmulti",
    }
)

df_train_full = pd.concat(
    [
        df_train.assign(
            source="train",
            source_index=df_train.index,
        ),
        df_leaderboard.assign(
            source="leaderboard",
            source_index=df_leaderboard.index,
        )
    ],
    ignore_index=True,
)

df_test["canonical_smiles"] = df_test["SMILES"].apply(
    lambda smiles: MolToSmiles(MolFromSmiles(smiles))
)

if (df_test["canonical_smiles"].duplicated().any()):
    print("Duplicated SMILES detected after canonicalization.")
else:
    print("No duplicated SMILES detected after canonicalization.")

df_test = df_test.rename(columns={"ID": "id"})

X_train_features = {}
X_test_features = {}

# Features

In [None]:
def featurize_fp(
        smiles_list: np.ndarray,
        fp_type: str,
        file_path: Path | None
) -> np.ndarray:
    if file_path is not None and file_path.exists():
        print(f"Loading fingerprints from {file_path}...")
        return np.load(file_path)

    fps = []

    fp_type = fp_type.lower()

    if fp_type == "maccskeys":
        n_bits = 167
        generator = None
    else:
        n_bits = 2048

        if fp_type == "morgan":
            generator = GetMorganGenerator(radius=2, fpSize=n_bits)

        elif fp_type == "rdkit":
            generator = GetRDKitFPGenerator(fpSize=n_bits, minPath=1, maxPath=7)

        elif fp_type == "atompair":
            generator = GetAtomPairGenerator(fpSize=n_bits)

        elif fp_type == "torsion":
            generator = GetTopologicalTorsionGenerator(fpSize=n_bits)

        else:
            raise ValueError(f"Unknown fingerprint type: {fp_type}")

    for smiles in smiles_list:
        mol = MolFromSmiles(smiles)
        if mol is None:
            print("SMILES conversion failed.")
            fps.append(np.zeros(n_bits, dtype=int))
            continue

        try:
            if fp_type == "maccskeys":
                fp = MACCSkeys.GenMACCSKeys(mol)
            else:
                fp = generator.GetFingerprint(mol)
        except Exception as e:
            print(f"Fingerprint generation failed.")
            fps.append(np.zeros(n_bits, dtype=int))
            continue

        arr = np.zeros(n_bits, dtype=int)
        DataStructs.ConvertToNumpyArray(fp, arr)
        fps.append(arr)

    if file_path is not None:
        np.save(file_path, fps)
        print(f"Saved fingerprints to {file_path}.")

    return np.vstack(fps)

In [None]:
def featurize_desc2D(
        smiles_list: np.ndarray,
        file_path: Path = None
) -> np.ndarray:
    if file_path is not None and file_path.exists():
        print(f"Loading 2D descriptors from {file_path}...")
        return np.load(file_path)

    desc_names = [desc[0] for desc in Descriptors._descList]
    n_descs = len(desc_names)
    desc_data = []

    for smiles in smiles_list:
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            print("SMILES conversion failed.")
            desc_data.append(np.zeros(n_descs, dtype=float))
            continue

        try:
            descs_dict = Descriptors.CalcMolDescriptors(mol)
            # Extract only values in the consistent order
            descs_values = [descs_dict[name] for name in desc_names]
        except Exception as e:
            print("2D descriptor calculation failed.")
            desc_data.append(np.zeros(n_descs, dtype=float))
            continue

        desc_data.append(descs_values)

        imputer = SimpleImputer(strategy="mean")
        desc_data_imputed = imputer.fit_transform(desc_data)

    return desc_data_imputed

In [None]:
X_morganfp = featurize_fp(
    df_train_full["canonical_smiles"].values,
    "morgan",
    INPUT_DIR / "X_train_morganfp.npy"
)

X_train_features["morganfp"] = X_morganfp

X_morganfp = featurize_fp(
    df_test["canonical_smiles"].values,
    "morgan",
    INPUT_DIR / "X_test_morganfp.npy"
)

X_test_features["morganfp"] = X_morganfp

In [None]:
X_rdkitfp = featurize_fp(
    df_train_full["canonical_smiles"].values,
    "rdkit",
     INPUT_DIR / "X_train_rdkitfp.npy"
)

X_train_features["rdkitfp"] = X_rdkitfp

X_rdkitfp = featurize_fp(
    df_test["canonical_smiles"].values,
    "rdkit",
     INPUT_DIR / "X_test_rdkitfp.npy"
)

X_test_features["rdkitfp"] = X_rdkitfp

In [None]:
X_torsionfp = featurize_fp(
    df_train_full["canonical_smiles"].values,
    "torsion",
    INPUT_DIR / "X_train_torsionfp.npy"
)

X_train_features["torsionfp"] = X_torsionfp

X_torsionfp = featurize_fp(
    df_test["canonical_smiles"].values,
    "torsion",
    INPUT_DIR / "X_test_torsionfp.npy"
)

X_test_features["torsionfp"] = X_torsionfp

In [None]:
X_maccskeys = featurize_fp(
    df_train_full["canonical_smiles"].values,
    "maccskeys",
    INPUT_DIR / "X_train_maccskeys.npy"
)

X_train_features["maccskeys"] = X_maccskeys

X_maccskeys = featurize_fp(
    df_test["canonical_smiles"].values,
    "maccskeys",
    INPUT_DIR / "X_test_maccskeys.npy"
)

X_test_features["maccskeys"] = X_maccskeys

In [None]:
X_desc2D = featurize_desc2D(
    df_train_full["canonical_smiles"].values,
    INPUT_DIR / "X_train_desc2D.npy"
)
X_train_features["desc2D"] = X_desc2D

X_desc2D = featurize_desc2D(
    df_test["canonical_smiles"].values,
    INPUT_DIR / "X_test_desc2D.npy"
)
X_test_features["desc2D"] = X_desc2D

# Model

In [None]:
def get_params(
        model_type: str,
        scale_pos_weight: float | None = None
    ) -> dict:
    if model_type == "cb":
        params = {
            "thread_count": -1,
            "logging_level": "Silent",
            "auto_class_weights": "Balanced",
            "iterations": 1000,
            "learning_rate": 0.05,
            "depth": 6,
            "rsm": 0.8,
            "eval_metric": "AUC",
        }

    elif model_type == "hgb":
        params = {
            "max_iter": 500,
            "max_leaf_nodes": 31,
            "learning_rate": 0.05,
            "early_stopping": True,
        }

    elif model_type == "xgb":
        params = {
            "n_jobs": -1,
            "scale_pos_weight": scale_pos_weight,
            "n_estimators": 500,
            "max_depth": 5,
            "learning_rate": 0.05,
            "subsample": 0.8,
            "colsample_bytree": 0.8,
            "eval_metric": "auc",
        }

    elif model_type == "lgbm":
        params = {
            "scale_pos_weight": scale_pos_weight,
            "objective": "binary",
            "metric": "auc",
            "n_estimators": 500,
            "num_leaves": 31,
            "min_data_in_leaf": 20,
            "learning_rate": 0.05,
            "feature_fraction": 0.8,
            "n_jobs": -1,
            "verbose": -1,
        }

    else:
        raise ValueError(f"Unknown model type: {model_type}")

    return params


def get_model(model_type: str, params: dict):
    if model_type == "cb":
        return CatBoostClassifier(
            **params
        )

    elif model_type == "hgb":
        return HistGradientBoostingClassifier(
            **params
        )

    elif model_type == "xgb":
        return XGBClassifier(
            **params
        )

    elif model_type == "lgbm":
        return LGBMClassifier(
            **params
        )

    else:
        raise ValueError(f"Unknown model type: {model_type}")


def fit_model(
    X_train: np.ndarray,
    y_train: np.ndarray,
    model: object
) -> object:
    if isinstance(model, LGBMClassifier):
        model.fit(
            X_train,
            y_train,
            eval_metric="auc"
        )

    elif isinstance(model, XGBClassifier):
        model.fit(
            X_train,
            y_train,
            verbose=False,
        )

    elif isinstance(model, CatBoostClassifier):
        model.fit(
            X_train,
            y_train,
            verbose=False,
        )

    elif isinstance(model, HistGradientBoostingClassifier):
        model.fit(X_train, y_train)

    else:
        model.fit(X_train, y_train)

    return model

In [None]:
def train_eval_skfold(
    df_train_full: pd.DataFrame,
    df_test: pd.DataFrame,
    dataset_name,
    features_train: dict,
    features_test: dict,
    feature_names: list,
    model_type: str,
    random_state: int
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, list]:
    model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)

    print(
        f"{model_name} ({feature_names}, {dataset_name})"
    )

    X_train_full = np.hstack([features_train[feature_name] for feature_name in feature_names])
    y_train_full = df_train_full[dataset_name].values
    ids_train_full = df_train_full["id"].values

    X_test = np.hstack([features_test[feature_name] for feature_name in feature_names])
    ids_test = df_test["id"].values

    skf = StratifiedKFold(
        n_splits=5,
        shuffle=True,
        random_state=random_state
    )

    eval_aucs = []
    eval_preds = []
    ensemble_test_preds = []
    all_df_folds_test_preds = []
    models = []

    for i, (train_indices, eval_indices) in enumerate(skf.split(X_train_full, y_train_full)):
        fold_nr = i + 1
        fold_name = model_name + "_kfold_" + str(fold_nr)
        
        X_train = X_train_full[train_indices]
        y_train = y_train_full[train_indices]

        scale_pos_weight = (y_train == 0).sum() / max((y_train == 1).sum(), 1)
        params = get_params(model_type, scale_pos_weight)
        model = get_model(model_type, params)
        
        model = fit_model(
            X_train,
            y_train,
            model
        )
        models.append(model)

        X_eval = X_train_full[eval_indices]
        y_eval = y_train_full[eval_indices]
        
        y_eval_pred_proba = model.predict_proba(X_eval)[:, 1]
        y_eval_pred_class = model.predict(X_eval)

        # Kfold evaluation
        auc_eval = roc_auc_score(y_eval, y_eval_pred_proba)
        eval_aucs.append({
            "model": model_name,
            "fold": fold_nr,
            "auc": auc_eval
        })
        
        ids_eval = ids_train_full[eval_indices]
        for id_eval, y_proba, y_class in zip(ids_eval, y_eval_pred_proba, y_eval_pred_class):
            eval_preds.append({
                "model": model_name,
                "fold": fold_nr,
                "id": id_eval,
                "y_proba": y_proba,
                "y_class": y_class
            })

        # Test set evaluation
        y_test_pred_proba = model.predict_proba(X_test)[:, 1]
        y_test_pred_class = model.predict(X_test)

        # Single fold predictions
        folds_test_preds = []
        for id_test, y_proba, y_class in zip(ids_test, y_test_pred_proba, y_test_pred_class):
            folds_test_preds.append({
                    "id": id_test,
                    "y_proba": y_proba
            })
        df_folds_test_preds = pd.DataFrame(folds_test_preds)
        df_folds_test_preds.to_csv(f"./{model_name}/{fold_name}_test_preds.csv", index=False)
        all_df_folds_test_preds.append(df_folds_test_preds)

        # Ensemble predictions accross all folds
        for id_test, y_proba in zip(ids_test, y_test_pred_proba):
            ensemble_test_preds.append({
                "fold": fold_nr,
                "id": id_test,
                "y_proba": y_proba
            })
                
        joblib.dump(model, f"./{model_name}/{fold_name}.joblib")

    df_eval_aucs = pd.DataFrame(eval_aucs)
    df_eval_aucs.to_csv(f"./{model_name}/{model_name}_kfolds_eval_aucs.csv", index=False)
    
    df_eval_preds = pd.DataFrame(eval_preds)
    df_eval_preds.to_csv(f"./{model_name}/{model_name}_kfolds_eval_preds.csv", index=False)

    df_ensemble_test_preds = pd.DataFrame(ensemble_test_preds)
    df_ensemble_test_preds = (
        df_ensemble_test_preds
        .groupby("id", as_index=False)
        .agg(
            y_proba=("y_proba", "mean")
        )
    )
    df_ensemble_test_preds.to_csv(f"./{model_name}/{model_name}_kfolds_ensemble_test_preds.csv", index=False)

    return {
        "df_eval_aucs": df_eval_aucs,
        "df_eval_preds": df_eval_preds,
        "all_df_folds_test_preds": all_df_folds_test_preds,
        "df_ensemble_test_preds": df_ensemble_test_preds,
        "models": models
    }

In [None]:
def show_metrics_of_models(dataset_results: dict) -> None:
    metrics_list = []
    for model_name, model_results in dataset_results.items():
        model_results["metrics"]
        metrics_list.append({"model": model_name, **model_results["metrics"]})

    df_metrics_list = pd.DataFrame(metrics_list)
    df_metrics_list.set_index("model", inplace=True)
    df_metrics_list = df_metrics_list.sort_values(
        by="eval_auc",
        ascending=False
    )

    display(df_metrics_list)

# Results

In [None]:
results = {}

# t340

In [None]:
dataset_name = "t340"
feature_names = ["rdkitfp"]
model_type = "xgb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

In [None]:
dataset_name = "t340"
feature_names = ["rdkitfp", "desc2D"]
model_type = "hgb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

# t450_679

In [None]:
dataset_name = "t450_679"
feature_names = ["desc2D"]
model_type = "cb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

In [None]:
dataset_name = "t450_679"
feature_names = ["rdkitfp"]
model_type = "cb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

# f340_450

In [None]:
dataset_name = "f340_450"
feature_names = ["rdkitfp"]
model_type = "xgb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

In [None]:
dataset_name = "f340_450"
feature_names = ["morganfp"]
model_type = "hgb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

In [None]:
dataset_name = "f340_450"
feature_names = ["rdkitfp", "desc2D"]
model_type = "xgb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

# fmulti

In [None]:
dataset_name = "fmulti"
feature_names = ["rdkitfp", "desc2D"]
model_type = "cb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

In [None]:
dataset_name = "fmulti"
feature_names = ["maccskeys"]
model_type = "xgb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

In [None]:
dataset_name = "fmulti"
feature_names = ["morganfp", "desc2D"]
model_type = "cb"
model_name = dataset_name + "_" + model_type + "_" + "_".join(feature_names)
os.makedirs(model_name, exist_ok=True)

results[model_name] = train_eval_skfold(
    df_train_full,
    df_test,
    dataset_name,
    X_train_features,
    X_test_features,
    feature_names,
    model_type,
    RANDOM_STATE
)

In [None]:
!zip -r /kaggle/working/output.zip /kaggle/working/