In [27]:
import json
import pandas as pd
import ast
import csv

In [None]:

from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
import torch
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from transformers import get_scheduler
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score

In [28]:
def parse_evidences1(evidences, json_data):
    """
    Parse evidences into meaningful text using JSON mappings.
    """
    parsed = []
    for evidence in eval(evidences):  # Convert string list to actual list
        if "_@_" in evidence:
            code, value = evidence.split("_@_")
            question = json_data.get(code, {}).get('question_en', 'Unknown question')
            value_meaning = json_data.get(code, {}).get('value_meaning', {}).get(value, {}).get('en', value)
            parsed.append(f"{question} - {value_meaning}")
        else:
            question = json_data.get(evidence, {}).get('question_en', 'Unknown question')
            parsed.append(f"{question} - Y")
    return parsed

def parse_evidences2(evidences, json_data):
    """
    Parse evidences into meaningful text using JSON mappings and combine repeated questions' answers with &.
    """
    parsed = {}
    for evidence in eval(evidences):  # Convert string list to actual list
        if "_@_" in evidence:
            code, value = evidence.split("_@_")
            question = json_data.get(code, {}).get('question_en', 'Unknown question')
            value_meaning = json_data.get(code, {}).get('value_meaning', {}).get(value, {}).get('en', value)
            if question in parsed:
                parsed[question] += f" & {value_meaning}"
            else:
                parsed[question] = value_meaning
        else:
            question = json_data.get(evidence, {}).get('question_en', 'Unknown question')
            if question in parsed:
                parsed[question] += " & Y"
            else:
                parsed[question] = "Y"
    
    return [f"{q} - {a}" for q, a in parsed.items()]


In [None]:
def transform_data(csv_path, json_path, output_path):
    """
    Transforms the CSV and JSON data into BioBERT-friendly format.
    """
    # Load JSON and CSV files
    with open(json_path, 'r') as file:
        json_data = json.load(file)
    
    csv_data = pd.read_csv(csv_path)

    # Process each row in the CSV
    formatted_data = []

    for _, row in csv_data.iterrows():
        patient_data = {
            "Age": row['AGE'],
            "Sex": row['SEX'],
            "Antecedents": [],
            "Symptoms": [],
            "Differential Diagnosis": [],
        }

        # Parse evidences
        evidences = parse_evidences2(row['EVIDENCES'], json_data)
        for evidence in evidences:
            if "Antecedent" in evidence:  # Example categorization logic
                patient_data["Antecedents"].append(evidence)
            else:
                patient_data["Symptoms"].append(evidence)

        # Parse differential diagnosis (exclude probabilities)
        diagnoses = [diag[0] for diag in eval(row['DIFFERENTIAL_DIAGNOSIS'])]
        patient_data["Differential Diagnosis"] = diagnoses

        formatted_data.append(patient_data)

    # Save the formatted data to a JSON file
    with open(output_path, 'w') as output_file:
        json.dump(formatted_data, output_file, indent=4)

task = "train"
data = pd.read_csv(f'dataset/{task}.csv')
data.sample(n=2000).to_csv(f'dataset/samples/{task}_sample50.csv', index=False)

# Paths to input files and output location
csv_path = f'dataset/samples/{task}_sample50.csv'  # Replace with the actual CSV file path
json_path = 'release_evidences_cleaned.json'  # Replace with the actual JSON file path
output_path = f'dataset_processed/{task}_sample50.json'  # Desired output file name

# Transform the data
transform_data(csv_path, json_path, output_path)


In [None]:
class BioBERTDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        # Tokenize input text
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "label": torch.tensor(label, dtype=torch.long)
        }

def train_model(texts, labels, model_name="dmis-lab/biobert-v1.1", max_length=128, batch_size=16, epochs=3, lr=2e-5):
    # Encode labels
    label_encoder = LabelEncoder()
    labels = label_encoder.fit_transform(labels)

    # Load tokenizer and dataset
    tokenizer = BertTokenizer.from_pretrained(model_name)
    dataset = BioBERTDataset(texts, labels, tokenizer, max_length)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Load model
    model = BertForSequenceClassification.from_pretrained(model_name, num_labels=len(label_encoder.classes_))
    model = model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

    # Optimizer and scheduler
    optimizer = AdamW(model.parameters(), lr=lr)
    num_training_steps = len(dataloader) * epochs
    lr_scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

    # Training loop
    model.train()
    for epoch in range(epochs):
        loop = tqdm(dataloader, leave=True)
        for batch in loop:
            input_ids = batch["input_ids"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
            attention_mask = batch["attention_mask"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
            labels = batch["label"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            lr_scheduler.step()

            loop.set_description(f"Epoch {epoch}")
            loop.set_postfix(loss=loss.item())

    return model, tokenizer, label_encoder

# Load preprocessed data
with open("transformed_data.json", "r") as data_file:
    data = json.load(data_file)

texts = [" ".join(item["Symptoms"]) for item in data]
labels = [item["Differential Diagnosis"][0] for item in data]  # Primary diagnosis as target

# Train BioBERT model
model, tokenizer, label_encoder = train_model(texts, labels)

# Save model and tokenizer
model.save_pretrained("biobert_diagnosis_model")
tokenizer.save_pretrained("biobert_diagnosis_model")

# Save label encoder
with open("label_encoder.json", "w") as le_file:
    json.dump(label_encoder.classes_.tolist(), le_file)


In [None]:
# With validation set
class BioBERTDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        # Tokenize input text
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "label": torch.tensor(label, dtype=torch.long)
        }

def train_model(train_texts, train_labels, val_texts, val_labels, model_path="dmis-lab/biobert-v1.1", max_length=128, batch_size=16, epochs=3, lr=2e-5):
    # Load tokenizer
    tokenizer = BertTokenizer.from_pretrained(model_path)

    # Encode labels
    unique_labels = sorted(set(train_labels + val_labels))
    label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
    train_labels = [label_to_idx[label] for label in train_labels]
    val_labels = [label_to_idx[label] for label in val_labels]

    # Prepare datasets and dataloaders
    train_dataset = BioBERTDataset(train_texts, train_labels, tokenizer, max_length)
    val_dataset = BioBERTDataset(val_texts, val_labels, tokenizer, max_length)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Load model
    model = BertForSequenceClassification.from_pretrained(model_path, num_labels=len(unique_labels))
    model = model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

    # Optimizer and scheduler
    optimizer = AdamW(model.parameters(), lr=lr)
    num_training_steps = len(train_loader) * epochs
    lr_scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

    # Training and validation loop
    best_val_loss = float("inf")
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for batch in tqdm(train_loader, desc=f"Training Epoch {epoch + 1}"):
            input_ids = batch["input_ids"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
            attention_mask = batch["attention_mask"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
            labels = batch["label"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            train_loss += loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            lr_scheduler.step()

        avg_train_loss = train_loss / len(train_loader)

        # Validation phase
        model.eval()
        val_loss = 0
        val_predictions = []
        val_true_labels = []
        with torch.no_grad():
            for batch in tqdm(val_loader, desc="Validating"):
                input_ids = batch["input_ids"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
                attention_mask = batch["attention_mask"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
                labels = batch["label"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                val_loss += loss.item()

                logits = outputs.logits
                predictions = torch.argmax(logits, dim=1).cpu().numpy()
                val_predictions.extend(predictions)
                val_true_labels.extend(labels.cpu().numpy())

        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = accuracy_score(val_true_labels, val_predictions)
        val_f1 = f1_score(val_true_labels, val_predictions, average="weighted")

        print(f"Epoch {epoch + 1}/{epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}")
        print(f"Validation Loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.4f}, F1 Score: {val_f1:.4f}")

        # Save best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            model.save_pretrained("best_biobert_model")
            tokenizer.save_pretrained("best_biobert_model")
            print("Best model saved!")

    return model, tokenizer, label_to_idx

# Load training and validation data
with open("train_data.json", "r") as train_file:
    train_data = json.load(train_file)
with open("val_data.json", "r") as val_file:
    val_data = json.load(val_file)

train_texts = [" ".join(item["Symptoms"]) for item in train_data]
train_labels = [item["Differential Diagnosis"][0] for item in train_data]
val_texts = [" ".join(item["Symptoms"]) for item in val_data]
val_labels = [item["Differential Diagnosis"][0] for item in val_data]

# Train the model
train_model(train_texts, train_labels, val_texts, val_labels)


In [None]:
class BioBERTDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]

        # Tokenize input text
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0)
        }

def evaluate_model(test_texts, model_path="biobert_diagnosis_model", max_length=128, batch_size=16):
    # Load tokenizer and model
    tokenizer = BertTokenizer.from_pretrained(model_path)
    model = BertForSequenceClassification.from_pretrained(model_path)
    model = model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
    model.eval()

    # Prepare dataset and dataloader
    dataset = BioBERTDataset(test_texts, tokenizer, max_length)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    predictions = []

    # Inference loop
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch["input_ids"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
            attention_mask = batch["attention_mask"].to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            batch_predictions = torch.argmax(logits, dim=1).cpu().numpy()
            predictions.extend(batch_predictions)

    return predictions

# Load test data
with open("test_data.json", "r") as test_file:
    test_data = json.load(test_file)

test_texts = [" ".join(item["Symptoms"]) for item in test_data]

def calculate_metrics(predictions, true_labels):
    accuracy = accuracy_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions, average="weighted")
    return accuracy, f1

# Load label encoder
with open("label_encoder.json", "r") as le_file:
    label_classes = json.load(le_file)

# Assuming true_labels.json contains ground truth labels for the test set
with open("true_labels.json", "r") as true_file:
    true_labels_raw = json.load(true_file)

# Encode true labels
true_labels = [label_classes.index(label) for label in true_labels_raw]

# Evaluate model
predictions = evaluate_model(test_texts)

# Calculate metrics
accuracy, f1 = calculate_metrics(predictions, true_labels)

print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
