In [None]:
"""
CIE10 Code Classifier - All-in-One Implementation con Validación

Estructura de datos requerida:
- train.csv y test.csv deben tener columnas:
  * text: Texto descriptivo del diagnóstico
  * labels: Lista de códigos CIE10 asociados (formato: "['cod1', 'cod2', ...]")
- cie10_codes.csv debe tener columnas:
  * codigo: Código CIE10
  * descripcion: Descripción oficial del código
"""

!pip install pandas spacy sentence-transformers scikit-learn transformers deep-translator torch numpy joblib transformers[torch]

#!python -m spacy download es_core_news_lg

import pandas as pd
import numpy as np
import ast
import joblib
import torch
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score, accuracy_score
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from sentence_transformers import SentenceTransformer, InputExample, losses, evaluation
from torch.utils.data import Dataset, DataLoader
from deep_translator import GoogleTranslator
from tqdm.auto import tqdm  # Barra de progreso mejorada
from multiprocessing import Pool, cpu_count
import multiprocessing
from sklearn.metrics.pairwise import cosine_similarity

tqdm.pandas()

# Configuración global
CIE10_DATA_PATH = '../csv_import_scripts/cie10-es-diagnoses.csv'
TRAIN_DATA_PATH = 'codiesp_csvs/codiesp_D_source_train.csv'
TEST_DATA_PATH = 'codiesp_csvs/codiesp_D_source_test.csv'
VALIDATION_DATA_PATH = 'codiesp_csvs/codiesp_D_source_validation.csv'


print("Cargando datos...")
train_df = pd.read_csv(TRAIN_DATA_PATH)
test_df = pd.read_csv(TEST_DATA_PATH)

cie10_df = pd.read_csv(CIE10_DATA_PATH)

# Convertir labels a listas
print("\nPreprocesando etiquetas...")
train_df['labels'] = train_df['labels'].progress_apply(ast.literal_eval)
test_df['labels'] = test_df['labels'].progress_apply(ast.literal_eval)

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

class MultiLabelClassificationEvaluator:
    def __init__(self, texts, true_labels, model, label_embeddings, threshold=0.5):
        """
        Custom evaluator for multi-label classification.

        Args:
            texts (list): List of input texts.
            true_labels (list): List of true multi-label lists.
            model (SentenceTransformer): The SentenceTransformer model.
            label_embeddings (dict): Dictionary mapping label codes to their embeddings.
            threshold (float): Similarity threshold for assigning labels.
        """
        self.texts = texts
        self.true_labels = true_labels
        self.model = model
        self.label_embeddings = label_embeddings
        self.threshold = threshold

    def __call__(self, model, **kwargs):
        """
        Evaluate the model on the provided data.

        Returns:
            dict: A dictionary containing accuracy, precision, recall, and F1 score.
        """
        # Generate embeddings for the input texts
        text_embeddings = model.encode(self.texts)

        # Predict labels for each text
        predicted_labels = []
        for text_embed in tqdm(text_embeddings, desc="Predicting labels", leave=False):
            similarities = {
                code: cosine_similarity([text_embed], [code_embed])[0][0]
                for code, code_embed in self.label_embeddings.items()
            }
            # Assign labels based on the threshold
            predicted_codes = [code for code, sim in similarities.items() if sim >= self.threshold]
            predicted_labels.append(predicted_codes)

        # Flatten the true and predicted labels for evaluation
        true_labels_flattened = [label for sublist in self.true_labels for label in sublist]
        predicted_labels_flattened = [label for sublist in predicted_labels for label in sublist]

        # Calculate evaluation metrics
        print("calculando scores")
        accuracy = accuracy_score(true_labels_flattened, predicted_labels_flattened)
        precision = precision_score(true_labels_flattened, predicted_labels_flattened, average='micro')
        recall = recall_score(true_labels_flattened, predicted_labels_flattened, average='micro')
        f1 = f1_score(true_labels_flattened, predicted_labels_flattened, average='micro')

        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1
        }

In [None]:
# Opción 1: Sentence Transformers con Pairwise Training
# ==================================================================


class SentenceTransformerCIE10:
    def __init__(self, model_name='paraphrase-multilingual-MiniLM-L12-v2'):
        self.model = SentenceTransformer(model_name)
        self.cie10_df, self.code_embeddings = self._prepare_code_embeddings()
        self.mlb = MultiLabelBinarizer()

    def _prepare_code_embeddings(self):
        print("Prepara embeddings para los códigos CIE10")
        cie10_df = pd.read_csv(CIE10_DATA_PATH)
        cie10_df['clean_desc'] = cie10_df['description'].str.replace('NEOM', '').str.strip()
        code_embeddings = {}

        # Process descriptions in batches for better performance
        batch_size = 32  # Adjust batch size based on available memory
        descriptions = cie10_df['clean_desc'].tolist()
        embeddings = []

        for i in tqdm(range(0, len(descriptions), batch_size), desc="Generating embeddings"):
            batch = descriptions[i:i + batch_size]
            batch_embeddings = self.model.encode(batch)
            embeddings.extend(batch_embeddings)

        # Create dictionary mapping codes to embeddings
        code_embeddings = dict(zip(cie10_df['code'], embeddings))

        return cie10_df, code_embeddings


    def _generate_triplets(self, texts, labels):
        print("Genera tripletas (anchor, positive, negative)")
        triplets = []
        all_codes = list(self.code_embeddings.keys())

        for text, codes in zip(texts, labels):
            for pos_code in codes:
                print(f"Generando tripletas para código {pos_code}")
                pos_desc = self.cie10_df[self.cie10_df['code'] == pos_code.upper()]['clean_desc'].values[0]
                triplets.append(InputExample(texts=[text, pos_desc], label=1.0))

                negative_codes = [c for c in all_codes if c not in codes]
                neg_code = np.random.choice(negative_codes)
                neg_desc = self.cie10_df[self.cie10_df['code'] == neg_code.upper()]['clean_desc'].values[0]
                triplets.append(InputExample(texts=[text, neg_desc], label=0.0))

        return triplets

    def train(self, train_df, val_df=None, epochs=5, batch_size=32):
        print("Entrena el modelo con validación")
        # Preparar datos
        self.mlb.fit(train_df['labels'])

        # Dividir datos si no se proporciona val_df
        if val_df is None:
            train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)

        # Generar tripletas
        train_triplets = self._generate_triplets(train_df['text'].tolist(), train_df['labels'].tolist())
        #val_triplets = self._generate_triplets(val_df['text'].tolist(), val_df['labels'].tolist())

        # Prepare custom evaluator
        val_texts = val_df['text'].tolist()
        val_true_labels = val_df['labels'].tolist()
        evaluator = MultiLabelClassificationEvaluator(val_texts, val_true_labels, self.model, self.code_embeddings)

        # Entrenamiento con barra de progreso
        train_loader = DataLoader(train_triplets, shuffle=True, batch_size=batch_size)
        train_loss = losses.CosineSimilarityLoss(self.model)

        best_score = 0
        for epoch in range(epochs):
            print(f"\nEpoch {epoch+1}/{epochs}")
            self.model.fit(
                train_objectives=[(train_loader, train_loss)],
                #evaluator=evaluator,
                epochs=1,
                warmup_steps=100,
                show_progress_bar=True
            )

            # Evaluación
            score = evaluator(self.model)
            print(f"Validation Score: {score:.4f}")

            if score > best_score:
                best_score = score
                self.model.save('./best_sbert_model')

        # Cargar mejor modelo
        self.model = SentenceTransformer('./best_sbert_model')

    def predict(self, text, threshold=0.4, top_k=5):
        print("Predice códigos para un texto dado")
        text_embed = self.model.encode([text])
        similarities = {
            code: cosine_similarity([text_embed], [code_embed])[0][0]
            for code, code_embed in self.code_embeddings.items()
        }

        sorted_codes = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
        results = []
        for code, sim in sorted_codes:
            if sim >= threshold and len(results) < top_k:
                results.append({
                    'codigo': code,
                    'descripcion': self.cie10_df[self.cie10_df['code'] == code]['description'].values[0],
                    'similitud': float(sim)
                })
        return results

In [None]:
st_model = SentenceTransformerCIE10()
st_model.train(train_df, epochs=5)

# Predicción
test_text = "Paciente con fiebre alta y dolor articular"
resultados = st_model.predict(test_text)
for res in resultados:
    print(f"Código: {res['codigo']} - Similitud: {res['similitud']:.2f}")
    print(f"Descripción: {res['descripcion']}\n")

In [None]:
# ==================================================================
# Opción 2: BERT Clasificador Multi-Etiqueta
# ==================================================================
class BERTMultiLabelCIE10:
    def __init__(self, model_name='dccuchile/bert-base-spanish-wwm-cased'):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.mlb = MultiLabelBinarizer()
        self._prepare_labels()

    def _prepare_labels(self):
        """Prepara el binarizador de etiquetas"""
        cie10_df = pd.read_csv(CIE10_DATA_PATH)
        self.mlb.fit([cie10_df['codigo'].tolist()])

    def train(self, train_path=TRAIN_DATA_PATH, epochs=3, batch_size=16):
        """Entrena el modelo BERT"""
        train_df = pd.read_csv(train_path)
        train_df['codigos'] = train_df['codigos'].apply(ast.literal_eval)
        y_train = self.mlb.transform(train_df['codigos'])

        class Cie10Dataset(Dataset):
            def __init__(self, texts, labels, tokenizer):
                self.texts = texts
                self.labels = labels
                self.tokenizer = tokenizer

            def __len__(self): return len(self.texts)

            def __getitem__(self, idx):
                encoding = self.tokenizer(
                    self.texts[idx],
                    max_length=128,
                    padding='max_length',
                    truncation=True,
                    return_tensors='pt'
                )
                return {
                    'input_ids': encoding['input_ids'].flatten(),
                    'attention_mask': encoding['attention_mask'].flatten(),
                    'labels': torch.FloatTensor(self.labels[idx])
                }

        self.model = AutoModelForSequenceClassification.from_pretrained(
            self.tokenizer.name_or_path,
            num_labels=len(self.mlb.classes_),
            problem_type="multi_label_classification"
        )

        train_dataset = Cie10Dataset(train_df['descripcion'].tolist(), y_train, self.tokenizer)

        training_args = TrainingArguments(
            output_dir='./bert_results',
            num_train_epochs=epochs,
            per_device_train_batch_size=batch_size,
            save_strategy='epoch'
        )

        Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset
        ).train()

    def predict(self, text, threshold=0.3):
        """Realiza predicciones"""
        inputs = self.tokenizer(
            text,
            max_length=128,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        outputs = self.model(**inputs)
        probs = torch.sigmoid(outputs.logits)
        predicted_labels = (probs > threshold).int().flatten().tolist()
        return self.mlb.inverse_transform([predicted_labels])[0]

In [None]:
# ==================================================================
# Opción 3: TF-IDF + Random Forest
# ==================================================================
class TfidfRFCIE10:
    def __init__(self):
        self.pipeline = None
        self.mlb = MultiLabelBinarizer()

    def train(self, train_path=TRAIN_DATA_PATH):
        """Entrena el pipeline TF-IDF + Random Forest"""
        train_df = pd.read_csv(train_path)
        train_df['codigos'] = train_df['codigos'].apply(ast.literal_eval)
        y_train = self.mlb.fit_transform(train_df['codigos'])

        self.pipeline = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=5000, ngram_range=(1,2))),
            ('clf', OneVsRestClassifier(RandomForestClassifier(n_estimators=100)))
        ])

        self.pipeline.fit(train_df['descripcion'], y_train)

    def save(self, path):
        """Guarda el modelo"""
        joblib.dump({'pipeline': self.pipeline, 'mlb': self.mlb}, path)

    def load(self, path):
        """Carga el modelo"""
        data = joblib.load(path)
        self.pipeline = data['pipeline']
        self.mlb = data['mlb']

    def predict(self, text, threshold=0.2):
        """Predice códigos"""
        probs = self.pipeline.predict_proba([text])
        return self.mlb.inverse_transform(probs > threshold)[0]

In [None]:
# ==================================================================
# Opción 4: Ensemble Híbrido
# ==================================================================
class EnsembleCIE10:
    def __init__(self):
        self.tfidf = TfidfVectorizer(max_features=2000)
        self.embedder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        self.model = RandomForestClassifier(n_estimators=100)
        self.mlb = MultiLabelBinarizer()

    def train(self, train_path=TRAIN_DATA_PATH):
        """Entrena el modelo ensemble"""
        train_df = pd.read_csv(train_path)
        train_df['codigos'] = train_df['codigos'].apply(ast.literal_eval)
        y_train = self.mlb.fit_transform(train_df['codigos'])

        # Generar features
        X_tfidf = self.tfidf.fit_transform(train_df['descripcion']).toarray()
        X_semantic = self.embedder.encode(train_df['descripcion'])
        X_combined = np.concatenate([X_tfidf, X_semantic], axis=1)

        self.model.fit(X_combined, y_train)

    def predict(self, text, threshold=0.25):
        """Realiza predicciones"""
        tfidf_feat = self.tfidf.transform([text]).toarray()
        semantic_feat = self.embedder.encode([text])
        combined = np.concatenate([tfidf_feat, semantic_feat], axis=1)
        probs = self.model.predict_proba(combined)
        return self.mlb.inverse_transform(probs > threshold)[0]


In [None]:
# ==================================================================
# Opción 5: Modelo Biomédico con Traducción
# ==================================================================
class BioTranslatedCIE10:
    def __init__(self):
        self.translator = GoogleTranslator(source='es', target='en')
        self.tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")
        self.mlb = MultiLabelBinarizer()
        self.code_descs = pd.read_csv(CIE10_DATA_PATH).set_index('codigo')['descripcion'].to_dict()

    def _translate(self, text):
        """Traduce texto médico español→inglés"""
        try: return self.translator.translate(text)
        except: return text  # Fallback

    def train(self, train_path=TRAIN_DATA_PATH, epochs=3):
        """Entrena el modelo con datos traducidos"""
        train_df = pd.read_csv(train_path)
        train_df['codigos'] = train_df['codigos'].apply(ast.literal_eval)
        train_df['text_en'] = train_df['descripcion'].apply(self._translate)

        self.mlb.fit(train_df['codigos'])
        y_train = self.mlb.transform(train_df['codigos'])

        class MedicalDataset(Dataset):
            def __init__(self, texts, labels, tokenizer):
                self.texts = texts
                self.labels = labels
                self.tokenizer = tokenizer

            def __len__(self): return len(self.texts)

            def __getitem__(self, idx):
                encoding = self.tokenizer(
                    self.texts[idx],
                    max_length=128,
                    padding='max_length',
                    truncation=True,
                    return_tensors='pt'
                )
                return {
                    'input_ids': encoding['input_ids'].flatten(),
                    'attention_mask': encoding['attention_mask'].flatten(),
                    'labels': torch.FloatTensor(self.labels[idx])
                }

        self.model = AutoModelForSequenceClassification.from_pretrained(
            "emilyalsentzer/Bio_ClinicalBERT",
            num_labels=len(self.mlb.classes_),
            problem_type="multi_label_classification"
        )

        trainer = Trainer(
            model=self.model,
            args=TrainingArguments(
                output_dir='./biobert',
                num_train_epochs=epochs,
                per_device_train_batch_size=16,
                fp16=True
            ),
            train_dataset=MedicalDataset(train_df['text_en'].tolist(), y_train, self.tokenizer)
        )
        trainer.train()

    def predict(self, text, threshold=0.3):
        """Predice traduciendo al inglés"""
        translated = self._translate(text)
        inputs = self.tokenizer(
            translated,
            max_length=128,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        outputs = self.model(**inputs)
        probs = torch.sigmoid(outputs.logits)
        return self.mlb.inverse_transform((probs > threshold).int().numpy())[0]


In [None]:
    # Opción 2
    bert_model = BERTMultiLabelCIE10()
    bert_model.train()
    print("Opción 2:", bert_model.predict(test_text))



In [None]:
    # Opción 3
    tfidf_model = TfidfRFCIE10()
    tfidf_model.train()
    print("Opción 3:", tfidf_model.predict(test_text))



In [None]:
    # Opción 4
    ensemble_model = EnsembleCIE10()
    ensemble_model.train()
    print("Opción 4:", ensemble_model.predict(test_text))



In [None]:
    # Opción 5 (Requiere API Key de Google Translate)
    bio_model = BioTranslatedCIE10()
    bio_model.train()
    print("Opción 5:", bio_model.predict(test_text))

In [None]:
# Ultima opción?


import pandas as pd
import torch
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.linear_model import LogisticRegression
from sentence_transformers import SentenceTransformer, InputExample, losses, evaluation
from torch.utils.data import DataLoader

# Cargar los datos
train_df = pd.read_csv(TRAIN_DATA_PATH)
val_df = pd.read_csv(VALIDATION_DATA_PATH)
test_df = pd.read_csv(TEST_DATA_PATH)

# Convertir las etiquetas alfanuméricas a listas
def convert_labels_to_list(label_str):
    return label_str.split(",")  # Suponiendo que las etiquetas están separadas por comas

train_df['labels'] = train_df['labels'].apply(convert_labels_to_list)
val_df['labels'] = val_df['labels'].apply(convert_labels_to_list)
test_df['labels'] = test_df['labels'].apply(convert_labels_to_list)

# Convertir las etiquetas a codificación one-hot
mlb = MultiLabelBinarizer()
train_labels = mlb.fit_transform(train_df['labels'])
val_labels = mlb.transform(val_df['labels'])
test_labels = mlb.transform(test_df['labels'])

# Cargar el modelo de Sentence Transformers (usamos un modelo multilingüe)
model_name = "paraphrase-multilingual-MiniLM-L12-v2"  # Modelo en español
model = SentenceTransformer(model_name)

# Preparar los datos para Pairwise Training
def create_pairwise_examples(df, labels):
    examples = []
    for i, text in enumerate(df['text']):
        # Crear pares positivos (mismo texto consigo mismo)
        examples.append(InputExample(texts=[text, text], label=1.0))
        # Crear pares negativos (texto con otro texto aleatorio)
        if len(df) > 1:
            random_idx = torch.randint(0, len(df), (1,)).item()
            while random_idx == i:
                random_idx = torch.randint(0, len(df), (1,)).item()
            examples.append(InputExample(texts=[text, df.iloc[random_idx]['text']], label=0.0))
    return examples

train_examples = create_pairwise_examples(train_df, train_labels)
val_examples = create_pairwise_examples(val_df, val_labels)

# Crear DataLoader para Pairwise Training
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16)
val_dataloader = DataLoader(val_examples, shuffle=False, batch_size=16)

# Definir la función de pérdida (Pairwise Loss)
train_loss = losses.CosineSimilarityLoss(model)

# Crear un evaluador personalizado
class CustomEvaluator:
    def __init__(self, texts, labels, model, mlb):
        self.texts = texts
        self.labels = labels
        self.model = model
        self.mlb = mlb

    def __call__(self, model, output_path=None, epoch=None, steps=None):
        # Generar embeddings para los textos
        embeddings = model.encode(self.texts, convert_to_tensor=True)
        embeddings = embeddings.cpu().numpy()
        # Usar un clasificador para predecir las etiquetas
        classifier = LogisticRegression()
        classifier.fit(embeddings, self.labels)
        preds = classifier.predict(embeddings)
        # Calcular métricas
        accuracy = accuracy_score(self.labels, preds)
        f1 = f1_score(self.labels, preds, average="micro")
        precision = precision_score(self.labels, preds, average="micro")
        recall = recall_score(self.labels, preds, average="micro")
        # Mostrar métricas
        print(f"Epoch {epoch}, Steps {steps}:")
        print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
        return accuracy

# Crear el evaluador
evaluator = CustomEvaluator(val_df['text'].tolist(), val_labels, model, mlb)

# Entrenar el modelo con Pairwise Training
num_epochs = 3
warmup_steps = int(len(train_dataloader) * num_epochs * 0.1)  # 10% de warmup steps
model.fit(
    train_objectives=[(train_dataloader, train_loss)],
    evaluator=evaluator,  # Añadir el evaluador
    epochs=num_epochs,
    warmup_steps=warmup_steps,
    output_path="./pairwise_model",
    evaluation_steps=100,  # Evaluar cada 100 pasos
    show_progress_bar=True,  # Mostrar barra de progreso
)

# Generar embeddings con el modelo fine-tuneado
train_embeddings = model.encode(train_df['text'].tolist(), convert_to_tensor=True)
val_embeddings = model.encode(val_df['text'].tolist(), convert_to_tensor=True)
test_embeddings = model.encode(test_df['text'].tolist(), convert_to_tensor=True)

# Entrenar un clasificador (Logistic Regression)
classifier = LogisticRegression()
classifier.fit(train_embeddings.cpu().numpy(), train_labels)

# Evaluar en el conjunto de validación
val_preds = classifier.predict(val_embeddings.cpu().numpy())
print("Resultados en validación:")
print(classification_report(val_labels, val_preds, target_names=mlb.classes_))

# Evaluar en el conjunto de prueba
test_preds = classifier.predict(test_embeddings.cpu().numpy())
print("Resultados en prueba:")
print(classification_report(test_labels, test_preds, target_names=mlb.classes_))

# Guardar el modelo y el clasificador
import joblib
model.save("./pairwise_model")
joblib.dump(classifier, "./classifier.pkl")
joblib.dump(mlb, "./mlb.pkl")  # Guardar el MultiLabelBinarizer


In [None]:
import pandas as pd
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification, get_linear_schedule_with_warmup
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score, classification_report
from tqdm.auto import tqdm
import re
from collections import defaultdict

# Configuración inicial
# = 'PlanTL-GOB-ES/bsc-bio-ehr-es-quasimodo'
MODEL_NAME = 'dccuchile/bert-base-spanish-wwm-cased'
TEST_BATCH_SIZE = 32
TRAIN_BATCH_SIZE = 2
VAL_BATCH_SIZE = 16
MAX_LENGTH = 512
EPOCHS = 200
GRADIENT_ACCUMULATION_STEPS = 2
HIERARCHICAL_WEIGHTS = {'parent': 1.5, 'child': 1.0}  # Peso mayor para categorías padre
TRAIN_DATA_PATH = 'codiesp_csvs/codiesp_D_source_train.csv'
TEST_DATA_PATH = 'codiesp_csvs/codiesp_D_source_test.csv'
VALIDATION_DATA_PATH = 'codiesp_csvs/codiesp_D_source_validation.csv'

# Cargar datos
train_df = pd.read_csv(TRAIN_DATA_PATH)
val_df = pd.read_csv(VALIDATION_DATA_PATH)
test_df = pd.read_csv(TEST_DATA_PATH)

# Preprocesamiento jerárquico de etiquetas
def extract_hierarchy(code):
    if not isinstance(code, str):
        return '', ''  # Return empty strings for non-string inputs
    match = re.match(r"([A-Z0-9])[.]?(\d+[A-Z]*)?", code.upper())
    if match:
        parent = match.group(1)
        child = match.group(2)
        return parent, child
    return code, "*"  # Return the code as parent and child as *

all_labels = set()
hierarchy_map = defaultdict(set)

for df in [train_df, val_df, test_df]:
    for labels in df['labels'].apply(eval):
        for code in labels:
            parent, child = extract_hierarchy(code)
            all_labels.add(child)
            all_labels.add(parent)  # Incluir padres como etiquetas independientes
            hierarchy_map[child].add(parent)

# Crear mapeo de etiquetas con estructura jerárquica
mlb = MultiLabelBinarizer()
mlb.fit([all_labels])

# Dataset mejorado con jerarquía
class HierarchicalClinicalDataset(Dataset):
    def __init__(self, df, tokenizer, mlb, max_length):
        self.texts = df['text'].values.tolist()
        self.labels = [self._process_labels(eval(l)) for l in df['labels']]
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.mlb = mlb

    def _process_labels(self, labels):
        # Añadir padres automáticamente
        processed = set()
        for code in labels:
            parent, child = extract_hierarchy(code)
            processed.add(child)
            processed.add(parent)
        return list(processed)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        labels = self.mlb.transform([self.labels[idx]]).astype(float).flatten()

        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.FloatTensor(labels)
        }

# Inicializar modelo y componentes
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=len(mlb.classes_),
    problem_type="multi_label_classification"
)

# Función de pérdida con pesos jerárquicos
class HierarchicalBCELoss(torch.nn.Module):
    def __init__(self, hierarchy_map, mlb, weight=None):
        super().__init__()
        self.base_loss = torch.nn.BCEWithLogitsLoss(weight=weight)
        self.hierarchy_map = hierarchy_map
        self.classes = mlb.classes_
        self.class_index = {cls: idx for idx, cls in enumerate(self.classes)}

    def forward(self, outputs, targets):
        base_loss = self.base_loss(outputs, targets)

        # Penalización adicional por errores jerárquicos
        batch_size = targets.size(0)
        hierarchical_loss = 0.0

        for i in range(batch_size):
            predicted = torch.sigmoid(outputs[i]) > 0.5
            true_labels = targets[i].bool()

            for idx, label in enumerate(self.classes):
                if true_labels[idx]:
                    # Verificar si los padres están presentes
                    parents = self.hierarchy_map.get(label, set())
                    for parent in parents:
                        parent_idx = self.class_index.get(parent)
                        if parent_idx is not None and not predicted[parent_idx]:
                            hierarchical_loss += HIERARCHICAL_WEIGHTS['parent']

        return base_loss + hierarchical_loss / batch_size

# Preparar datasets
train_dataset = HierarchicalClinicalDataset(train_df, tokenizer, mlb, MAX_LENGTH)
val_dataset = HierarchicalClinicalDataset(val_df, tokenizer, mlb, MAX_LENGTH)
test_dataset = HierarchicalClinicalDataset(test_df, tokenizer, mlb, MAX_LENGTH)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=TRAIN_BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=VAL_BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=TEST_BATCH_SIZE)

# Configurar entrenamiento
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
total_steps = len(train_loader) * EPOCHS // GRADIENT_ACCUMULATION_STEPS

# Calculate warmup steps and total steps
warmup_epochs = 100
boost_epochs = 25
warmup_steps = len(train_loader) * warmup_epochs
boost_steps = len(train_loader) * boost_epochs
total_steps = len(train_loader) * EPOCHS

def lr_lambda(current_step):
    if current_step < boost_steps:
        lr = 10e-5 + 5e-5 * (1- (current_step / boost_steps))
    elif current_step < warmup_steps:
        lr = 5e-5
    else:
        # After warmup: decrease from 3e-5 to 2e-5 linearly
        progress = (current_step - warmup_steps) / (total_steps - warmup_steps)
        lr = 3e-5 - (1e-5 * progress)
    return lr

scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
loss_fn = HierarchicalBCELoss(hierarchy_map, mlb)

# Función de evaluación
def evaluate(model, dataloader):
    model.eval()
    total_loss = 0
    predictions = []
    true_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluando"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            loss = loss_fn(outputs.logits, labels)

            total_loss += loss.item()
            preds = torch.sigmoid(outputs.logits).cpu().detach().numpy()
            predictions.extend(preds > 0.5)
            true_labels.extend(labels.cpu().numpy())

    f1_micro = f1_score(true_labels, predictions, average='micro')
    f1_macro = f1_score(true_labels, predictions, average='macro')
    return {
        'loss': total_loss / len(dataloader),
        'f1_micro': f1_micro,
        'f1_macro': f1_macro
    }

# Bucle de entrenamiento con barra de progreso y evaluación
best_f1 = 0
for epoch in range(EPOCHS):
    model.train()
    epoch_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Entrenando época {epoch+1}")

    for step, batch in enumerate(progress_bar):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        loss = loss_fn(outputs.logits, labels)
        loss = loss / GRADIENT_ACCUMULATION_STEPS
        loss.backward()

        if (step + 1) % GRADIENT_ACCUMULATION_STEPS == 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()

        epoch_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())

    # Evaluación después de cada época
    val_metrics = evaluate(model, val_loader)
    print(f"\nÉpoca {epoch+1} - Loss: {epoch_loss / len(train_loader):.4f} - LR: {scheduler.get_last_lr()[0]}")
    print(f"Validación - Loss: {val_metrics['loss']:.4f}")
    print(f"F1 Micro: {val_metrics['f1_micro']:.4f}, F1 Macro: {val_metrics['f1_macro']:.4f}")

    # Guardar mejor modelo
    if val_metrics['f1_micro'] > best_f1:
        best_f1 = val_metrics['f1_micro']
        print(f"Guardando mejor modelo... BEST ONE!!!!!! {val_metrics['f1_micro']:.4f}")
        torch.save(model.state_dict(), f"best_model_epoch.bin")


# Evaluación final en test
print("\nEvaluando en conjunto de test...")
test_metrics = evaluate(model, test_loader)
print(f"Test - Loss: {test_metrics['loss']:.4f} - LR: {scheduler.get_last_lr()[0]}")
print(f"F1 Micro: {test_metrics['f1_micro']:.4f}, F1 Macro: {test_metrics['f1_macro']:.4f}")

# Generar reporte de clasificación con etiquetas reales
y_true = test_dataset.labels
y_pred = model.predict(test_loader)  # Necesitarías implementar esta función
print(classification_report(y_true, y_pred, target_names=mlb.classes_))