<a href="https://colab.research.google.com/github/andreslaroa/DiagnosIA/blob/main/PruebaRealDiagnosIA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Este proyecto es una prueba simulada de lo que podría ser la práctica del modelo generado con un paciente real

In [11]:
# Dividimos el dataset en train y test

import pandas as pd
import numpy as np
from scipy.stats import chi2_contingency
from sklearn.feature_selection import mutual_info_classif
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

# Ruta al archivo dentro de "Colab Notebooks"
dataset_path = '/content/drive/MyDrive/Colab Notebooks/Dataset_Completo.csv'

# Cargar el dataset
df = pd.read_csv(dataset_path)

# Verificar
df.head()


# 2) Detectar automáticamente la columna de respuesta
columna_respuesta = 'prognosis' if 'prognosis' in df.columns else df.columns[-1]

# 3) Dividir en train (70 %) y test (30 %), estratificando por prognosis
df_train, df_test = train_test_split(
    df,
    test_size=0.3,
    stratify=df[columna_respuesta],
    random_state=42
)

# 4) A partir de ahora, usamos SOLO el conjunto de entrenamiento para el análisis de síntomas
df = df_train.copy()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [12]:
# Se obtienen los nuevos data frames

from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.preprocessing import LabelEncoder

df_sin_enfermedades = df.drop(columns=['prognosis'])
todos_sintomas = df_sin_enfermedades.columns.tolist()

df.columns.tolist
le = LabelEncoder()
le.fit(df['prognosis'])
class_name= le.classes_

x_train = df_train[todos_sintomas].astype(int).values
y_train = df_train['prognosis'].values
x_test = df_test[todos_sintomas].astype(int).values
y_test = df_test['prognosis'].values



# Confirmar tamaños
print(f'Tamaño del conjunto de entrenamiento: {x_train.shape[0]} pacientes')
print(f'Tamaño del conjunto de test: {x_test.shape[0]} pacientes')


Tamaño del conjunto de entrenamiento: 3473 pacientes
Tamaño del conjunto de test: 1489 pacientes


In [13]:
import numpy as np
import pandas as pd
from scipy.stats import entropy
import matplotlib.pyplot as plt

class BayesianSequentialDiagnostic:
    """
    Modelo de diagnóstico bayesiano secuencial que selecciona preguntas (síntomas)
    de manera óptima para maximizar la información ganada en cada paso.
    """

    def __init__(self, X_train, y_train, symptom_names):
        """
        Inicializa el modelo con datos de entrenamiento.

        Args:
            X_train (np.ndarray): Matriz de síntomas (0/1) de tamaño [n_pacientes, n_síntomas]
            y_train (array-like): Vector de enfermedades (etiquetas) de tamaño [n_pacientes]
            symptom_names (list of str): Lista de nombres de los síntomas en el mismo orden que las columnas de X_train
        """
        self.X_train = np.asarray(X_train)
        self.y_train = np.asarray(y_train)
        self.symptom_names = list(symptom_names)
        self.n_symptoms = self.X_train.shape[1]

        # Etiquetas de enfermedad y mapeos
        self.disease_labels = np.unique(self.y_train)
        self.label_to_index = {label: i for i, label in enumerate(self.disease_labels)}
        self.index_to_label = {i: label for label, i in self.label_to_index.items()}

        # Priors P(disease)
        self.disease_priors = {d: np.mean(self.y_train == d) for d in self.disease_labels}

        # Condicionales P(symptom|disease) con suavizado de Laplace
        self.conditional_probs = {}
        for d in self.disease_labels:
            cases = self.X_train[self.y_train == d]
            self.conditional_probs[d] = (cases.sum(axis=0) + 1) / (len(cases) + 2)

    def calculate_information_gain(self, symptom_idx, remaining_symptoms, current_probs):
        """Calcula la ganancia de información esperada al preguntar por un síntoma."""
        p = np.array([current_probs[d] for d in self.disease_labels])
        current_entropy = entropy(p)

        prob_pos = sum(current_probs[d] * self.conditional_probs[d][symptom_idx]
                       for d in self.disease_labels)
        prob_neg = 1 - prob_pos

        pos_probs = {d: (self.conditional_probs[d][symptom_idx] * current_probs[d] / prob_pos
                         if prob_pos > 0 else 0) for d in self.disease_labels}
        neg_probs = {d: ((1 - self.conditional_probs[d][symptom_idx]) * current_probs[d] / prob_neg
                         if prob_neg > 0 else 0) for d in self.disease_labels}

        if sum(pos_probs.values()) > 0:
            total = sum(pos_probs.values())
            pos_probs = {d: v/total for d, v in pos_probs.items()}
        if sum(neg_probs.values()) > 0:
            total = sum(neg_probs.values())
            neg_probs = {d: v/total for d, v in neg_probs.items()}

        e_pos = entropy(list(pos_probs.values())) if prob_pos > 0 else 0
        e_neg = entropy(list(neg_probs.values())) if prob_neg > 0 else 0
        cond_entropy = prob_pos * e_pos + prob_neg * e_neg

        return current_entropy - cond_entropy

    def select_next_symptom(self, asked_indices, current_probs):
        """Selecciona el próximo síntoma basado en ganancia de información."""
        candidates = [i for i in range(self.n_symptoms) if i not in asked_indices]
        if not candidates:
            return None
        gains = {i: self.calculate_information_gain(i, candidates, current_probs) for i in candidates}
        return max(gains, key=gains.get)

    def update_probabilities(self, current_probs, symptom_idx, symptom_value):
        """Actualiza las probabilidades de enfermedad usando Bayes."""
        updated = {}
        for d in self.disease_labels:
            likelihood = (self.conditional_probs[d][symptom_idx] if symptom_value == 1
                          else 1 - self.conditional_probs[d][symptom_idx])
            updated[d] = likelihood * current_probs[d]
        total = sum(updated.values())
        if total > 0:
            updated = {d: v/total for d, v in updated.items()}
        return updated

    def diagnose(self, patient_symptoms, max_questions=20, threshold=0.9):
        """Diagnostica a un paciente preguntando síntomas secuencialmente y devuelve nombres."""
        current_probs = self.disease_priors.copy()
        asked_names = []

        for _ in range(max_questions):
            idx = self.select_next_symptom([self.symptom_names.index(s) for s in asked_names], current_probs)
            if idx is None:
                break
            name = self.symptom_names[idx]
            val = patient_symptoms[idx]
            current_probs = self.update_probabilities(current_probs, idx, val)
            asked_names.append(name)
            if current_probs[max(current_probs, key=current_probs.get)] >= threshold:
                break

        disease = max(current_probs, key=current_probs.get)
        confidence = current_probs[disease]
        return disease, confidence, asked_names

    def evaluate(self, X_test, y_test, max_questions=20, threshold=0.9):
        """Evalúa rendimiento y devuelve nombres de síntomas preguntados."""
        results, correct, total_q = [], 0, 0
        for x, true in zip(X_test, y_test):
            pred, conf, asked = self.diagnose(x, max_questions, threshold)
            correct += int(pred == true)
            total_q += len(asked)
            results.append({'true': true, 'pred': pred, 'confidence': conf,
                             'asked_symptoms': asked})
        accuracy = correct / len(y_test)
        avg_questions = total_q / len(y_test)
        return accuracy, avg_questions, results

    def analyze_results(self, results):
        df = pd.DataFrame(results)
        stats = df.groupby('true').agg(accuracy=('pred', lambda x: np.mean(x == df.loc[x.index, 'true'])),
                                        avg_questions=('asked_symptoms', lambda x: np.mean([len(i) for i in x])))
        return stats

    def plot_symptom_importance(self, top_n=20):
        gains = [self.calculate_information_gain(i, list(range(self.n_symptoms)),
                                               self.disease_priors) for i in range(self.n_symptoms)]
        df = pd.DataFrame({'symptom': self.symptom_names, 'gain': gains})
        df_top = df.sort_values('gain', ascending=False).head(top_n)
        plt.figure(figsize=(10, 6))
        plt.barh(df_top['symptom'][::-1], df_top['gain'][::-1])
        plt.xlabel('Ganancia de información')
        plt.title(f'Top {top_n} síntomas')
        plt.tight_layout()
        plt.show()

    def plot_confusion_matrix(self, results):
        df = pd.DataFrame(results)
        cm = pd.crosstab(df['true'], df['pred'], normalize='index')
        plt.figure(figsize=(8, 6))
        plt.imshow(cm, interpolation='nearest', aspect='auto')
        plt.colorbar()
        plt.xticks(range(len(cm.columns)), cm.columns, rotation=45)
        plt.yticks(range(len(cm.index)), cm.index)
        plt.xlabel('Predicción')
        plt.ylabel('Verdadero')
        plt.title('Matriz de confusión')
        plt.tight_layout()
        plt.show()

    def compare_num_questions(self, X_test, y_test, max_questions_list=None, threshold=0.8):
        if max_questions_list is None:
            max_questions_list = [5, 10, 15, 20, 25, 30]
        records = []
        for mq in max_questions_list:
            acc, avg_q, _ = self.evaluate(X_test, y_test, mq, threshold)
            records.append({'max_questions': mq, 'accuracy': acc, 'avg_questions': avg_q})
        df_res = pd.DataFrame(records)
        fig, ax1 = plt.subplots(figsize=(10, 6))
        ax1.plot(df_res['max_questions'], df_res['accuracy'], marker='o')
        ax1.set_xlabel('Máximo de preguntas')
        ax1.set_ylabel('Precisión')
        ax2 = ax1.twinx()
        ax2.plot(df_res['max_questions'], df_res['avg_questions'], marker='s')
        ax2.set_ylabel('Preguntas promedio')
        plt.title('Rendimiento vs. nº de preguntas')
        plt.tight_layout()
        plt.show()
        return df_res

# Función interactiva corregida para usar nombres en lugar de índices
def diagnose_interactive(model, max_questions=15, threshold=0.90):
    """Función para simular un diagnóstico interactivo mostrando nombres de síntomas."""
    current_probs = model.disease_priors.copy()
    asked_indices = []
    asked_names = []
    patient_responses = {}

    print("Sistema de diagnóstico médico interactivo")
    print(f"Responda hasta {max_questions} preguntas (1=Sí, 0=No), umbral={threshold}")

    for i in range(max_questions):
        idx = model.select_next_symptom(asked_indices, current_probs)
        if idx is None:
            break
        name = model.symptom_names[idx]
        val = None
        while val not in (0, 1):
            try:
                val = int(input(f"Pregunta {i+1}: ¿Presenta el síntoma '{name}'? (1/0): "))
            except ValueError:
                print("Respuesta inválida. Ingrese 1 o 0.")
        # Guardar respuesta
        patient_responses[name] = val
        asked_indices.append(idx)
        asked_names.append(name)
        # Actualizar probabilidades
        current_probs = model.update_probabilities(current_probs, idx, val)
        # Mostrar top 3 provisionales
        print("Diagnósticos provisionales (top 3):")
        for d, p in sorted(current_probs.items(), key=lambda x: x[1], reverse=True)[:3]:
            print(f"- {d}: {p:.2f}")
        # Verificar umbral
        if current_probs[max(current_probs, key=current_probs.get)] >= threshold:
            break

    # Resultado final
    disease = max(current_probs, key=current_probs.get)
    confidence = current_probs[disease]
    print("" + "="*50)
    print(f"Diagnóstico final: {disease}")
    print(f"Confianza: {confidence:.4f}")
    print(f"Síntomas preguntados: {asked_names}")
    print("="*50)
    return disease, confidence, asked_names, patient_responses


Construimos el modelo

In [14]:
# 2. Inicializar y entrenar el modelo
symptom_names = df_train.drop(columna_respuesta, axis=1).columns.tolist()
model = BayesianSequentialDiagnostic(x_train, y_train, symptom_names)

In [15]:
# Uso
diagnose_interactive(model)

Sistema de diagnóstico médico interactivo
Responda hasta 15 preguntas (1=Sí, 0=No), umbral=0.9


KeyboardInterrupt: Interrupted by user