# Dataset Preprocessing (Descriptors + Fingerprints)


In [16]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from rdkit import Chem, RDLogger, DataStructs
from rdkit.Chem import Descriptors, rdMolDescriptors, AllChem, MACCSkeys
from rdkit.Chem.MolStandardize import rdMolStandardize
from rdkit.ML.Descriptors import MoleculeDescriptors as md
from morfeus import XTB

from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import (
    train_test_split,
    StratifiedKFold,
    RepeatedStratifiedKFold,
    cross_val_score,
    cross_validate,
    RandomizedSearchCV,
)
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
    roc_curve,
    classification_report,
    confusion_matrix,
)

from sklearn.svm import SVC
import shap
import optuna
from pyscf import gto, scf

## Molecule, Descriptor + Fingerprints and Outlier Functions


In [17]:
def molecule_from_smiles(smiles):
    lg = RDLogger.logger()
    # Temporarily silence RDKit logs
    lg.setLevel(RDLogger.CRITICAL)
    try:
        # Extract molecule
        molecule = Chem.MolFromSmiles(smiles, sanitize=True)
        if molecule is None:
            return None, "failed"

        # Remove salts
        clean_molecule = rdMolStandardize.LargestFragmentChooser()
        molecule = clean_molecule.choose(molecule)

        # Sanitize molecule again to reflect changes
        Chem.SanitizeMol(molecule)
        return molecule, "succeed"
    except Exception as e:
        return None, f"error: {e}"
    finally:
        # Re-enable logging afterward
        lg.setLevel(RDLogger.INFO)


def calculate_descriptors(molecule):
    # Get all descriptors (1D/2D)
    descriptor_names = []
    for descriptor, _ in Descriptors._descList:
        descriptor_names.append(descriptor)

    # Use descriptors to calculate values
    calculator = md.MolecularDescriptorCalculator(descriptor_names)
    descriptor_values = calculator.CalcDescriptors(molecule)

    # Create dictionary
    descriptors = dict(zip(descriptor_names, descriptor_values))
    return descriptors


def compute_iqr_limits(df, factor=1.5):
    # Calculate IQR limits
    limits = {}
    for col in df.columns:
        q1 = df[col].quantile(0.25)
        q3 = df[col].quantile(0.75)
        iqr = q3 - q1

        # If IQR is 0 - column is too flat - skip
        if iqr == 0 or np.isnan(iqr):
            continue

        lower = q1 - factor * iqr
        upper = q3 + factor * iqr
        limits[col] = (lower, upper)
    return limits


def apply_iqr_limits(df, limits):
    # Apply the limits
    df_clipped = df.copy()
    for col, (lower, upper) in limits.items():
        df_clipped[col] = df_clipped[col].clip(lower, upper)
    return df_clipped


def bitvect_to_dict(fp, prefix):
    # Convert bit vector to dictionary (create features)
    n_bits = fp.GetNumBits()
    arr = np.zeros((n_bits,), dtype=int)
    DataStructs.ConvertToNumpyArray(fp, arr)
    features = {}
    for i, v in enumerate(arr):
        features[f"{prefix}_{i}"] = int(v)
    return features


def calculate_fingerprints(mol):
    RDLogger.DisableLog("rdApp.*")
    # Calculate Morgan, RDKit, MACCS, AtomPair and Topological Torsion fingerprint
    feats = {}
    if mol is None:
        return feats

    # Morgan (ECFP) fingerprint
    morgan_bits = 2048
    morgan_radius = 2
    fp_morgan = rdMolDescriptors.GetMorganFingerprintAsBitVect(
        mol, radius=morgan_radius, nBits=morgan_bits
    )
    feats.update(bitvect_to_dict(fp_morgan, f"Morgan{morgan_radius}_{morgan_bits}"))

    # RDKit topological fingerprint
    rdk_bits = 2048
    fp_rdk = Chem.RDKFingerprint(mol, fpSize=rdk_bits)
    feats.update(bitvect_to_dict(fp_rdk, f"RDK_{rdk_bits}"))

    # MACCS keys (167 bits)
    fp_maccs = MACCSkeys.GenMACCSKeys(mol)
    feats.update(bitvect_to_dict(fp_maccs, "MACCS"))

    # AtomPair fingerprint
    ap_bits = 2048
    fp_ap = rdMolDescriptors.GetHashedAtomPairFingerprintAsBitVect(mol, nBits=ap_bits)
    feats.update(bitvect_to_dict(fp_ap, f"AtomPair_{ap_bits}"))

    # Topological torsion fingerprint
    tt_bits = 2048
    fp_tt = rdMolDescriptors.GetHashedTopologicalTorsionFingerprintAsBitVect(
        mol, nBits=tt_bits
    )
    feats.update(bitvect_to_dict(fp_tt, f"Torsion_{tt_bits}"))

    RDLogger.EnableLog("rdApp.*")

    return feats


def prepare_3d_molecule(mol):
    # Create 3D molecule
    if mol is None:
        return None

    mol3d = Chem.AddHs(mol)

    try:
        # Calculate 3D coordinates and energy
        AllChem.EmbedMolecule(mol3d, AllChem.ETKDG())
        AllChem.UFFOptimizeMolecule(mol3d, maxIters=200)
    except Exception:
        return None

    return mol3d


def infer_charge_and_unpaired(mol):
    """
    Infer total charge and number of unpaired electrons from RDKit.
    Good enough for typical organic molecules.
    """
    total_charge = sum(a.GetFormalCharge() for a in mol.GetAtoms())
    n_unpaired = sum(a.GetNumRadicalElectrons() for a in mol.GetAtoms())
    return total_charge, n_unpaired


def compute_homo_lumo_xtb(mol):
    """
    Fast HOMO/LUMO computation using GFN2-xTB via morfeus.XTB.

    Returns the same keys as the old PySCF version:
        - HOMO_eV
        - LUMO_eV
        - HL_Gap_eV
    """
    feats = {
        "HOMO_eV": np.nan,
        "LUMO_eV": np.nan,
        "HL_Gap_eV": np.nan,
    }

    if mol is None:
        return feats

    # Create 3D molecule (reuses your existing pipeline)
    mol3d = prepare_3d_molecule(mol)
    if mol3d is None:
        return feats

    # Extract elements and coordinates
    conf = mol3d.GetConformer()
    elements = []
    coords = []
    for atom in mol3d.GetAtoms():
        pos = conf.GetAtomPosition(atom.GetIdx())
        elements.append(atom.GetSymbol())
        coords.append([pos.x, pos.y, pos.z])
    coords = np.array(coords, dtype=float)  # Ã…

    charge, n_unpaired = infer_charge_and_unpaired(mol3d)

    try:
        # method=2 -> GFN2-xTB
        xtb_calc = XTB(
            elements=elements,
            coordinates=coords,
            method=2,
            charge=charge,
            n_unpaired=n_unpaired,
        )

        feats["HOMO_eV"] = xtb_calc.get_homo(unit="eV")
        feats["LUMO_eV"] = xtb_calc.get_lumo(unit="eV")
        feats["HL_Gap_eV"] = xtb_calc.get_homo_lumo_gap(unit="eV")

    except Exception:
        # keep NaNs if xtb fails
        pass

    return feats

## Dataset Processing


In [18]:
# Configuration
ORIG_DATASET = "in_vitro_3T3_in_chemico_dataset.xlsx"
SKIP_ROWS = 1
SMILES_COL = "SMILES code"
TARGET_COL = "Phototoxicity"
FULL_OUTPUT_DATASET = (
    "processed_datasets/in_vitro_3T3_in_chemico_dataset_homolumo_processed.xlsx"
)
# Outputs
TRAIN_X_CSV = "processed_datasets/in_vitro_3T3_in_chemico_homolumo_x_train.csv"
TEST_X_CSV = "processed_datasets/in_vitro_3T3_in_chemico_homolumo_x_test.csv"
TRAIN_Y_CSV = "processed_datasets/in_vitro_3T3_in_chemico_homolumo_y_train.csv"
TEST_Y_CSV = "processed_datasets/in_vitro_3T3_in_chemico_homolumo_y_test.csv"

# Near constant threshold - tolerance
SIMILARITY_THRESHOLD = 0.7
# Correlation threshold
CORRELATION_THRESHOLD = 0.65

# Load dataset and skip first row (Header)
dataset = pd.read_excel(ORIG_DATASET, engine="openpyxl", skiprows=SKIP_ROWS)

descriptor_rows = []
state_molecules = []
molecules = []

for smiles in dataset[SMILES_COL].astype(str):
    molecule, state = molecule_from_smiles(smiles)
    state_molecules.append(state)
    molecules.append(molecule)

    if molecule is None:
        descriptor_rows.append({})
        continue

    # 1D/2D RDKit descriptors
    desc_feats = calculate_descriptors(molecule)

    # Fingerprints
    fp_feats = calculate_fingerprints(molecule)

    # HOMO / LUMO / gap from PySCF
    homo_lumo_feats = compute_homo_lumo_xtb(molecule)

    # Merge all feature dicts
    # all_feats = {**desc_feats, **fp_feats, **homo_lumo_feats}
    all_feats = {**homo_lumo_feats}
    descriptor_rows.append(all_feats)

# Convert list of dictionaries into dataframe
descriptor_data_all = pd.DataFrame(descriptor_rows)

# Keep everything + status
output = pd.concat(
    [dataset.reset_index(drop=True), descriptor_data_all.reset_index(drop=True)], axis=1
)
output["MoleculeStatus"] = state_molecules

# Output whole dataset with descriptors and state
with pd.ExcelWriter(FULL_OUTPUT_DATASET, engine="openpyxl") as writer:
    output.to_excel(writer, index=False, sheet_name="Descriptors")

print(f"Full - Rows: {len(output)}/Columns: {output.shape[1]}")
print(output.head().to_string(index=False))

# # Drop failed molecules - boolean array
# molecules_right = []
# for molecule in molecules:
#     if molecule is not None:
#         molecules_right.append(True)
#     else:
#         molecules_right.append(False)
# if not any(molecules_right):
#     raise ValueError("No valid molecules after SMILES parsing.")

# dataset_ok = dataset.loc[molecules_right].reset_index(drop=True)
# descriptor_ok = descriptor_data_all.loc[molecules_right].reset_index(drop=True)

# # Target
# y_full = dataset_ok[TARGET_COL].astype(int)

# # Take only numeric descriptor columns
# X_full = descriptor_ok.select_dtypes(include=[np.number]).copy()
# for column in X_full.columns:
#     X_full[column] = X_full[column].replace([np.inf, -np.inf], np.nan)

# # Drop columns that are entirely NaN
# all_nan_cols = X_full.columns[X_full.isna().all()].tolist()
# if all_nan_cols:
#     print(f"Dropping {len(all_nan_cols)} NaN columns.")
#     X_full = X_full.drop(columns=all_nan_cols)

# # Split dataset - train and test
# X_train, X_test, y_train, y_test = train_test_split(
#     X_full, y_full, test_size=0.2, random_state=42, stratify=y_full
# )

# # Calculate medians for each column in train only
# train_medians = X_train.median(numeric_only=True)

# # Fill missing values in both train and test using those medians
# X_train = X_train.fillna(train_medians)
# X_test = X_test.fillna(train_medians)

# # Compute constants on train only
# constant_cols = []
# for col in X_train.columns:
#     top_freq = X_train[col].value_counts(normalize=True, dropna=False).max()
#     if top_freq >= SIMILARITY_THRESHOLD:
#         constant_cols.append(col)

# # Drop from train and apply same drop to test
# if constant_cols:
#     X_train = X_train.drop(columns=constant_cols)
#     X_test = X_test.drop(columns=constant_cols)
#     print(f"Dropped {len(constant_cols)} constant/almost-constant columns.")

# # Compute absolute correlation matrix on training data
# corr_matrix = X_train.corr().abs()
# # Keep only upper triangle of the matrix
# upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
# # Find columns to drop - correlation
# high_corr_cols = []
# for col in upper.columns:
#     if any(upper[col] > CORRELATION_THRESHOLD):
#         high_corr_cols.append(col)

# # Drop from both train and test
# if high_corr_cols:
#     X_train = X_train.drop(columns=high_corr_cols)
#     X_test = X_test.drop(columns=high_corr_cols)
#     print(f"Dropped {len(high_corr_cols)} highly correlated columns.")

# # Compute IQR limits on training data
# iqr_limits = compute_iqr_limits(X_train, factor=1.5)

# # Apply limits to both train and test sets
# X_train = apply_iqr_limits(X_train, iqr_limits)
# X_test = apply_iqr_limits(X_test, iqr_limits)

# # Scaling not necessary for all models
# # Splitting the dataset to Fingerprint and Descriptor sets
# FP_PREFIXES = (
#     "Morgan",
#     "RDK_",
#     "MACCS",
#     "AtomPair_",
#     "Torsion_",
# )  # Identify fingerprint columns by prefix
# fp_cols = [c for c in X_train.columns if c.startswith(FP_PREFIXES)]
# desc_cols = [c for c in X_train.columns if c not in fp_cols]

# print(f"Descriptor columns: {len(desc_cols)}, fingerprint columns: {len(fp_cols)}")

# scaler = StandardScaler()
# if desc_cols:
#     X_train_desc_scaled = scaler.fit_transform(X_train[desc_cols])
#     X_test_desc_scaled = scaler.transform(X_test[desc_cols])

#     # Start from copies so we keep original indexing and all columns
#     X_train_scaled = X_train.copy()
#     X_test_scaled = X_test.copy()

#     # Overwrite only descriptor columns with scaled values
#     X_train_scaled[desc_cols] = X_train_desc_scaled
#     X_test_scaled[desc_cols] = X_test_desc_scaled
# else:
#     X_train_scaled = X_train.copy()
#     X_test_scaled = X_test.copy()

# X_train = X_train_scaled
# X_test = X_test_scaled

# # Save processed datasets
# X_train.to_csv(TRAIN_X_CSV, index=False)
# X_test.to_csv(TEST_X_CSV, index=False)
# y_train.to_csv(TRAIN_Y_CSV, index=False, header=[TARGET_COL])
# y_test.to_csv(TEST_Y_CSV, index=False, header=[TARGET_COL])

# print(f"Train - Rows: {len(X_train)}/Columns: {X_train.shape[1]}")
# print("First rows of train x:")
# print(X_train.head().to_string(index=False))
# print(f"Test - Rows: {len(X_test)}/Columns: {X_test.shape[1]}")
# print("First rows of train y:")
# print(y_train.head().to_string(index=False))
# X_train.describe()

# print("\nTrain set class counts:")
# print(y_train.value_counts())
# print("\nTrain set class ratio:")
# print(y_train.value_counts(normalize=True))

# print("\nTest set class counts:")
# print(y_test.value_counts())
# print("\nTest set class ratio:")
# print(y_test.value_counts(normalize=True))

FileNotFoundError: [Errno 2] No such file or directory: 'in_vitro_3T3_in_chemico_dataset.xlsx'