In [4]:
import pandas as pd
import numpy as np
import glob
import os
from collections import defaultdict
from rdkit import Chem, RDLogger
from rdkit.Chem import AllChem, MACCSkeys
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.linear_model import Ridge
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from map4 import MAP4
import matplotlib.pyplot as plt
from functools import lru_cache
from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator

In [5]:
class ActiveLearningLoop:
    def __init__(self, csv_path, ligand_folder="ligands_out", fingerprint_type='morgan',
                 fraction=0.01, max_iterations=5, random_state=42, test_size=0.2):
        self.csv_path = csv_path
        self.ligand_folder = ligand_folder
        self.fraction = fraction
        self.max_iterations = max_iterations
        self.random_state = random_state
        self.fingerprint_type = fingerprint_type
        self.test_size = test_size
        self.map4_calc = MAP4(dimensions=1024)
        self.data = None
        self.smiles_dict = {}
        self.fingerprint_cache = {}
        self.evaluated_ids = set()
        self.all_ids = []
        self.results_log = []
        self.best_models = {}

        self.morgan_generator = GetMorganGenerator(radius=3, fpSize=2048)

        self.fingerprint_dispatch = {
            'morgan': lambda mol: self.morgan_generator.GetFingerprint(mol),
            'maccs': lambda mol: MACCSkeys.GenMACCSKeys(mol),
            'map4': lambda smiles: self.map4_calc.calculate(smiles)
        }

        self.fp_length = {
            'morgan': 2048,
            'maccs': 167,
            'map4': 1024
        }[fingerprint_type]

        self.final_model = None
        self.final_model_type = None

    def load_data(self):
        data = pd.read_csv(self.csv_path)
        data.insert(0, 'ID', range(len(data)))
        valid_ids = []

        for idx in data["ID"]:
            matches = glob.glob(f"{self.ligand_folder}/{idx}_*.pdbqt")
            if any(self.get_affinity(m) is not None for m in matches):
                valid_ids.append(idx)

        self.data = data[data["ID"].isin(valid_ids)].reset_index(drop=True)
        if self.data.empty:
            print("Advertencia: No hay compuestos válidos con afinidad.")
        else:
            print(f"Base filtrada: {len(self.data)} compuestos con afinidad disponible.")

        self.smiles_dict = dict(zip(self.data['ID'], self.data['smiles']))
        self.all_ids = self.data["ID"].tolist()

    @lru_cache(maxsize=None)
    def get_affinity(self, pdbqt_file):
        try:
            with open(pdbqt_file, "r") as f:
                for line in f:
                    if line.startswith("REMARK VINA RESULT:"):
                        return float(line.strip().split()[3])
        except Exception as e:
            print(f"Error leyendo {pdbqt_file}: {e}")
        return None

    def compute_fingerprints(self):
        for idx, smiles in self.smiles_dict.items():
            self.fingerprint_cache[idx] = self.smiles_to_fingerprint(smiles)

    def smiles_to_fingerprint(self, smiles):
        try:
            if self.fingerprint_type == 'map4':
                return np.array(self.fingerprint_dispatch['map4'](smiles))
            else:
                mol = Chem.MolFromSmiles(smiles)
                if mol is None:
                    return np.zeros(self.fp_length)
                return np.array(self.fingerprint_dispatch[self.fingerprint_type](mol))
        except:
            return np.zeros(self.fp_length)

    def train_and_evaluate_model(self, X_tr, y_tr, X_val, y_val, model_type):
        if model_type == 'RF':
            model = RandomForestRegressor(n_estimators=500, max_depth=8, min_samples_leaf=3,
                                          random_state=self.random_state)
        elif model_type == 'SVR':
            model = SVR(kernel='rbf', C=10.0, epsilon=0.1, gamma='scale')
        elif model_type == 'MLR':
            model = Ridge(alpha=1.0)
        else:
            raise ValueError("Modelo desconocido")

        model.fit(X_tr, y_tr)
        y_pred = model.predict(X_val)
        r2 = r2_score(y_val, y_pred)
        rmse = np.sqrt(mean_squared_error(y_val, y_pred))
        mae = mean_absolute_error(y_val, y_pred)
        return model, r2, rmse, mae

    def get_top_affinities(self, top_percent):
        all_affinities = []
        for idx in self.all_ids:
            matches = glob.glob(f"{self.ligand_folder}/{idx}_*.pdbqt")
            affs = []
            for m in matches:
                aff = self.get_affinity(m)
                if aff is not None:
                    affs.append(aff)
            if affs:
                best_aff = min(affs)
                all_affinities.append((idx, best_aff))
        all_affinities.sort(key=lambda x: x[1])
        top_n = int(top_percent * len(all_affinities))
        return set(x[0] for x in all_affinities[:top_n]), top_n

    def run(self):
        rng = np.random.RandomState(self.random_state)
        n_initial = int(self.fraction * len(self.all_ids))
        selection_size = n_initial

        self.evaluated_ids = set(rng.choice(self.all_ids, n_initial, replace=False))
        self.compute_fingerprints()

        for iteration in range(self.max_iterations):
            print(f"\nIteración {iteration+1}/{self.max_iterations}")
            X_train, y_train = [], []

            for idx in self.evaluated_ids:
                fps = self.fingerprint_cache[idx]
                matches = glob.glob(f"{self.ligand_folder}/{idx}_*.pdbqt")
                affinities = [self.get_affinity(m) for m in matches if self.get_affinity(m) is not None]
                if affinities:
                    X_train.append(fps)
                    y_train.append(min(affinities))

            X_train, y_train = np.array(X_train), np.array(y_train)

            print(f"X_train shape: {X_train.shape}")
            print(f"y_train stats: min={y_train.min():.3f}, max={y_train.max():.3f}, std={y_train.std():.3f}")

            X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train, test_size=self.test_size, random_state=self.random_state)

            results = []
            for model_type in ['RF', 'SVR', 'MLR']:
                model, r2, rmse, mae = self.train_and_evaluate_model(X_tr, y_tr, X_val, y_val, model_type)
                results.append((model_type, model, r2, rmse, mae))
                print(f"[{self.fingerprint_type}] Modelo {model_type} - R2: {r2:.3f} RMSE: {rmse:.3f} MAE: {mae:.3f}")
                self.results_log.append({
                    'iteration': iteration + 1,
                    'model': model_type,
                    'fingerprint': self.fingerprint_type,
                    'r2': r2,
                    'rmse': rmse,
                    'mae': mae
                })

            best_model_type, best_model, best_r2, *_ = max(results, key=lambda x: x[2])
            self.best_models[self.fingerprint_type] = {
                'model_type': best_model_type,
                'r2': best_r2,
                'params': {mt: mdl.get_params() for mt, mdl, *_ in results}
            }
            self.final_model = best_model
            self.final_model_type = best_model_type

            print(f"Usando modelo {best_model_type} para predicción de candidatos.")

            candidates = list(set(self.all_ids) - self.evaluated_ids)
            if not candidates:
                print("No quedan candidatos para evaluar. Proceso terminado.")
                break

            X_candidates = [self.fingerprint_cache[c] for c in candidates]
            predictions = best_model.predict(X_candidates)

            selection = np.argsort(predictions)[:selection_size]
            selected_ids = [candidates[i] for i in selection]
            self.evaluated_ids.update(selected_ids)

            print(f"Seleccionados {len(selected_ids)} nuevos candidatos para la siguiente iteración.")

        top_ids, top_n = self.get_top_affinities(top_percent=self.fraction)
        captured = self.evaluated_ids.intersection(top_ids)
        newly_found = captured - set(list(self.evaluated_ids)[:n_initial])
        percent_discovery = 100 * len(newly_found) / top_n if top_n > 0 else 0

        print(f"Descubrimiento activo en el top {self.fraction*100:.1f}%:")
        print(f"- Total top compuestos: {top_n}")
        print(f"- Encontrados después del set inicial: {len(newly_found)}")
        print(f"- Porcentaje de descubrimiento activo: {percent_discovery:.2f}%\n")

        print(" Mejores combinaciones de hiperparámetros:")
        for model_name, model_info in self.best_models[self.fingerprint_type]['params'].items():
            print(f"- {model_name}: {model_info}")

In [6]:
loop = ActiveLearningLoop(csv_path="Enamine.csv", fingerprint_type="maccs", fraction=0.02, max_iterations=5)
loop.load_data()
loop.run()

Base filtrada: 5280 compuestos con afinidad disponible.

Iteración 1/5
X_train shape: (105, 167)
y_train stats: min=-10.300, max=-6.600, std=0.811
[maccs] Modelo RF - R2: -0.089 RMSE: 0.841 MAE: 0.720
[maccs] Modelo SVR - R2: 0.143 RMSE: 0.747 MAE: 0.610
[maccs] Modelo MLR - R2: 0.193 RMSE: 0.724 MAE: 0.620
Usando modelo MLR para predicción de candidatos.
Seleccionados 105 nuevos candidatos para la siguiente iteración.

Iteración 2/5
X_train shape: (210, 167)
y_train stats: min=-10.700, max=-6.600, std=0.860
[maccs] Modelo RF - R2: 0.454 RMSE: 0.601 MAE: 0.505
[maccs] Modelo SVR - R2: 0.425 RMSE: 0.616 MAE: 0.488
[maccs] Modelo MLR - R2: 0.209 RMSE: 0.723 MAE: 0.567
Usando modelo RF para predicción de candidatos.
Seleccionados 105 nuevos candidatos para la siguiente iteración.

Iteración 3/5
X_train shape: (315, 167)
y_train stats: min=-10.700, max=-6.600, std=0.813
[maccs] Modelo RF - R2: 0.142 RMSE: 0.724 MAE: 0.601
[maccs] Modelo SVR - R2: 0.225 RMSE: 0.688 MAE: 0.537
[maccs] Modelo