# QSAR notebook for antimicrobial peptidomimetics

## Data curation (Q1-ready, reproducible)
- Canonicalize SMILES with RDKit and remove invalid structures.
- Normalize salts/counter-ions by keeping the largest organic fragment.
- Resolve duplicates after standardization with configurable policy (`median`, `mean`, `drop_conflicts`).
- Save a transparent curation audit trail to `artifacts/curation_report.csv`.


In [None]:
# Optional install (uncomment if needed)
# !pip install rdkit-pypi pandas numpy scikit-learn matplotlib seaborn


In [None]:
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem
from rdkit.Chem.MolStandardize import rdMolStandardize
from rdkit.ML.Descriptors import MoleculeDescriptors

from sklearn.model_selection import RepeatedKFold, cross_validate, KFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import make_scorer, mean_absolute_error, mean_squared_error, r2_score


In [None]:
# Configuration
DATA_PATH = 'potok.csv'  # required columns: smiles, activity
ARTIFACTS_DIR = Path('artifacts')
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)
CURATION_REPORT_PATH = ARTIFACTS_DIR / 'curation_report.csv'


## 1) Load raw data

In [None]:
df_raw = pd.read_csv(DATA_PATH)
print('Raw shape:', df_raw.shape)
print('Columns:', list(df_raw.columns))
df_raw.head()


## 2) Unified Data curation block

In [None]:
def _standardize_smiles(smiles: str, lfc: rdMolStandardize.LargestFragmentChooser):
    """Convert raw SMILES to standardized canonical parent SMILES.

    Steps: parse -> keep largest organic fragment -> canonicalize.
    Returns tuple: (standardized_smiles_or_none, reason, salt_normalized_flag).
    """
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None, 'invalid_smiles', False

    num_frags_before = len(Chem.GetMolFrags(mol))
    parent = lfc.choose(mol)
    if parent is None:
        return None, 'standardization_failed', False

    std_smiles = Chem.MolToSmiles(parent, canonical=True)
    salt_normalized = num_frags_before > 1
    reason = 'ok_salt_normalized' if salt_normalized else 'ok'
    return std_smiles, reason, salt_normalized


def curate_data(
    df: pd.DataFrame,
    smiles_col: str = 'smiles',
    activity_col: str = 'activity',
    duplicate_policy: str = 'median',
    tolerance: float = 1e-6,
    report_path: Path = Path('artifacts/curation_report.csv')
):
    """Perform reproducible QSAR data curation.

    Parameters
    ----------
    duplicate_policy : {'median', 'mean', 'drop_conflicts'}
        How to aggregate activity for duplicated standardized structures.
    tolerance : float
        Tiny tolerance to treat activity values as effectively equal.
    report_path : Path
        Path for row-level curation report.

    Returns
    -------
    curated_df : pd.DataFrame
        Curated table with standardized_smiles and curated activity.
    summary : dict
        Summary counts for transparent logging.
    report_df : pd.DataFrame
        Detailed report of removed/collapsed records.
    """
    allowed = {'median', 'mean', 'drop_conflicts'}
    if duplicate_policy not in allowed:
        raise ValueError(f'duplicate_policy must be one of {allowed}')

    required = {smiles_col, activity_col}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f'Missing required columns: {missing}')

    work = df[[smiles_col, activity_col]].copy()
    work[activity_col] = pd.to_numeric(work[activity_col], errors='coerce')
    work = work.dropna(subset=[smiles_col, activity_col]).reset_index(drop=True)

    lfc = rdMolStandardize.LargestFragmentChooser()

    rows = []
    for _, row in work.iterrows():
        original_smiles = row[smiles_col]
        act = float(row[activity_col])
        std_smiles, reason, salt_flag = _standardize_smiles(original_smiles, lfc)
        rows.append({
            'original_smiles': original_smiles,
            'standardized_smiles': std_smiles,
            'original_activity': act,
            'reason': reason,
            'salt_normalized': salt_flag,
        })

    std_df = pd.DataFrame(rows)

    invalid_mask = std_df['standardized_smiles'].isna()
    invalid_count = int(invalid_mask.sum())
    salt_normalized_count = int(std_df['salt_normalized'].sum())

    valid_df = std_df.loc[~invalid_mask].copy()

    report_rows = []
    if invalid_count > 0:
        report_rows.append(std_df.loc[invalid_mask, ['original_smiles', 'standardized_smiles', 'reason', 'original_activity']])

    grouped_rows = []
    duplicate_groups = 0
    conflicts_found = 0
    duplicate_collapsed = 0

    for ssmiles, grp in valid_df.groupby('standardized_smiles', sort=False):
        acts = grp['original_activity'].to_numpy()
        n = len(grp)
        if n == 1:
            grouped_rows.append({'standardized_smiles': ssmiles, 'activity': float(acts[0])})
            continue

        duplicate_groups += 1
        duplicate_collapsed += (n - 1)
        act_range = float(np.max(acts) - np.min(acts))
        conflict = act_range > tolerance
        if conflict:
            conflicts_found += 1

        if (not conflict):
            final_activity = float(np.median(acts))
            action = 'duplicates_equal_keep_one'
            keep_row = True
        else:
            if duplicate_policy == 'median':
                final_activity = float(np.median(acts))
                action = 'duplicates_conflict_aggregated_median'
                keep_row = True
            elif duplicate_policy == 'mean':
                final_activity = float(np.mean(acts))
                action = 'duplicates_conflict_aggregated_mean'
                keep_row = True
            else:
                final_activity = np.nan
                action = 'duplicates_conflict_dropped'
                keep_row = False

        report_rows.append(pd.DataFrame({
            'original_smiles': grp['original_smiles'].values,
            'standardized_smiles': grp['standardized_smiles'].values,
            'reason': [action] * len(grp),
            'original_activity': grp['original_activity'].values,
        }))

        if keep_row:
            grouped_rows.append({'standardized_smiles': ssmiles, 'activity': final_activity})

    curated_df = pd.DataFrame(grouped_rows)

    if len(report_rows) > 0:
        report_df = pd.concat(report_rows, ignore_index=True)
    else:
        report_df = pd.DataFrame(columns=['original_smiles', 'standardized_smiles', 'reason', 'original_activity'])

    report_df.to_csv(report_path, index=False)

    summary = {
        'n_input_rows': int(len(df)),
        'n_after_numeric_and_notna': int(len(work)),
        'n_invalid_removed': invalid_count,
        'n_salt_normalized': salt_normalized_count,
        'n_duplicate_groups': int(duplicate_groups),
        'n_duplicates_collapsed': int(duplicate_collapsed),
        'n_activity_conflicts': int(conflicts_found),
        'duplicate_policy': duplicate_policy,
        'tolerance': tolerance,
        'n_output_rows': int(len(curated_df)),
        'report_path': str(report_path),
    }

    return curated_df, summary, report_df


In [None]:
curated_df, curation_summary, curation_report = curate_data(
    df_raw,
    smiles_col='smiles',
    activity_col='activity',
    duplicate_policy='median',
    tolerance=1e-6,
    report_path=CURATION_REPORT_PATH,
)

print('=== Data curation summary ===')
for k, v in curation_summary.items():
    print(f'{k}: {v}')

curated_df.head()


In [None]:
assert curated_df['standardized_smiles'].notna().all(), 'NaN in standardized_smiles after curation.'
if (curated_df['activity'] <= 0).any():
    raise ValueError('Found activity <= 0; cannot safely apply log10 transform for target.')
assert len(curated_df) > 0, 'Curated dataset is empty.'

print('Sanity checks passed.')


## 3) Feature generation (uses standardized_smiles only)

In [None]:
df_model = curated_df.copy()
df_model['mol'] = df_model['standardized_smiles'].apply(Chem.MolFromSmiles)
df_model = df_model[df_model['mol'].notnull()].reset_index(drop=True)
df_model['logACT'] = np.log10(df_model['activity'])

print('Modeling shape:', df_model.shape)
df_model.head()


In [None]:
descriptor_names = [
    'MolWt', 'MolLogP', 'NumHDonors', 'NumHAcceptors',
    'NumHeteroatoms', 'NumRotatableBonds', 'TPSA',
    'NumAromaticRings', 'RingCount', 'FractionCSP3'
]

calc = MoleculeDescriptors.MolecularDescriptorCalculator(descriptor_names)

def calc_descriptor_frame(mols):
    """Calculate selected RDKit descriptors for a sequence of molecules."""
    rows = [calc.CalcDescriptors(mol) for mol in mols]
    return pd.DataFrame(rows, columns=descriptor_names)

X_desc = calc_descriptor_frame(df_model['mol'])
y = df_model['logACT'].to_numpy()

print('Descriptor matrix:', X_desc.shape)


In [None]:
def morgan_fp_array(mol, radius=2, n_bits=2048):
    """Convert Morgan bit-vector fingerprint to numpy array."""
    fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits=n_bits)
    arr = np.zeros((n_bits,), dtype=np.int8)
    DataStructs.ConvertToNumpyArray(fp, arr)
    return arr

X_fp = np.vstack([morgan_fp_array(m) for m in df_model['mol']])
print('Fingerprint matrix:', X_fp.shape)


## 4) Modeling and metrics (kept intact, minimal adaptation)

In [None]:
def rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

scoring = {
    'MAE': make_scorer(mean_absolute_error, greater_is_better=False),
    'RMSE': make_scorer(rmse, greater_is_better=False),
    'R2': make_scorer(r2_score)
}

cv = RepeatedKFold(n_splits=5, n_repeats=10, random_state=42)

models = {
    'Ridge_descriptors': Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
        ('model', Ridge(alpha=1.0))
    ]),
    'RF_descriptors': Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('model', RandomForestRegressor(n_estimators=500, random_state=42, min_samples_leaf=2, n_jobs=-1))
    ]),
    'Ridge_fingerprint': Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('scaler', StandardScaler(with_mean=False)),
        ('model', Ridge(alpha=2.0))
    ])
}

results = []

def summarize_cv(name, model, X, y):
    out = cross_validate(model, X, y, cv=cv, scoring=scoring, n_jobs=-1)
    return {'model': name, 'MAE_mean': -out['test_MAE'].mean(), 'RMSE_mean': -out['test_RMSE'].mean(), 'R2_mean': out['test_R2'].mean()}

results.append(summarize_cv('Ridge_descriptors', models['Ridge_descriptors'], X_desc, y))
results.append(summarize_cv('RF_descriptors', models['RF_descriptors'], X_desc, y))
results.append(summarize_cv('Ridge_fingerprint', models['Ridge_fingerprint'], X_fp, y))

b_mae, b_rmse, b_r2 = [], [], []
for train_idx, test_idx in cv.split(X_desc):
    y_train, y_test = y[train_idx], y[test_idx]
    pred = np.full_like(y_test, y_train.mean())
    b_mae.append(mean_absolute_error(y_test, pred))
    b_rmse.append(rmse(y_test, pred))
    b_r2.append(r2_score(y_test, pred))

results.append({'model': 'Baseline_mean', 'MAE_mean': float(np.mean(b_mae)), 'RMSE_mean': float(np.mean(b_rmse)), 'R2_mean': float(np.mean(b_r2))})

pd.DataFrame(results).sort_values('RMSE_mean')


In [None]:
def tanimoto_to_train(test_mol, train_mols, radius=2, n_bits=2048):
    fp_test = AllChem.GetMorganFingerprintAsBitVect(test_mol, radius, nBits=n_bits)
    train_fps = [AllChem.GetMorganFingerprintAsBitVect(m, radius, nBits=n_bits) for m in train_mols]
    sims = DataStructs.BulkTanimotoSimilarity(fp_test, train_fps)
    return max(sims) if sims else 0.0

kf = KFold(n_splits=5, shuffle=True, random_state=42)
train_idx, test_idx = next(kf.split(df_model))

train_mols = df_model.loc[train_idx, 'mol'].tolist()
test_mols = df_model.loc[test_idx, 'mol'].tolist()

tanimoto_max = [tanimoto_to_train(m, train_mols) for m in test_mols]

ad_demo = pd.DataFrame({
    'standardized_smiles': df_model.loc[test_idx, 'standardized_smiles'].values,
    'logACT': df_model.loc[test_idx, 'logACT'].values,
    'max_train_tanimoto': tanimoto_max,
})

plt.figure(figsize=(6, 4))
sns.histplot(ad_demo['max_train_tanimoto'], bins=15)
plt.title('Applicability Domain proxy: max train Tanimoto')
plt.xlabel('Max Tanimoto to train set')
plt.grid(alpha=0.3)
plt.show()
