In [None]:
# ## Cell 1: Import Libraries and Define Paths

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Flatten, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import pickle
import os

# Paths to files (adjust as needed)
DATA_PATH = "DiseaseAndSymptoms.csv"
MODEL_PATH = "diagnosis_model.keras"
TOKENIZER_PATH = "symptom_tokenizer.pkl"
LABEL_ENCODER_PATH = "label_encoder.pkl"

print("Libraries imported and paths defined.")

In [None]:
# ## Cell 2: Load and Preprocess the Dataset

def load_and_preprocess_data(data_path):
    """
    Load the dataset, preprocess symptoms, and split into train/val/test sets.
    """
    data = pd.read_csv(data_path)
    data["Disease"] = data["Disease"].replace("Peptic ulcer diseae", "Peptic ulcer disease")
    data["Disease"] = data["Disease"].replace("Dimorphic hemmorhoids(piles)", "Dimorphic hemorrhoids (piles)")
    
    data.columns = [col.replace("_", " ") for col in data.columns]
    data = data.apply(lambda x: x.str.replace("_", " ") if x.dtype == "object" else x)
    
    symptom_cols = [col for col in data.columns if "Symptom" in col]
    data["Symptoms"] = data[symptom_cols].apply(
        lambda row: " ".join(sorted(set([s.strip() for s in row if pd.notna(s)]))), axis=1
    )
    
    data = data[data["Symptoms"].str.strip() != ""]
    
    # Deduplicate the dataset
    data = data.drop_duplicates(subset=["Symptoms", "Disease"])
    print(f"Dataset size after deduplication: {len(data)}")
    
    # Split into train+val and test sets (stratified)
    train_val_data, test_data = train_test_split(data, test_size=0.2, random_state=42, stratify=data["Disease"])
    
    # Split train+val into train and validation sets
    train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42, stratify=train_val_data["Disease"])
    
    # Check for overlap
    train_symptoms = set(train_data["Symptoms"])
    test_symptoms = set(test_data["Symptoms"])
    overlap = train_symptoms.intersection(test_symptoms)
    print(f"Number of overlapping symptom strings between train and test: {len(overlap)}")
    if overlap:
        print("Sample overlapping symptoms:", list(overlap)[:5])
    
    # Check class distribution
    print("Test set class distribution:\n", test_data["Disease"].value_counts())
    
    return train_data[["Disease", "Symptoms"]], val_data[["Disease", "Symptoms"]], test_data[["Disease", "Symptoms"]]

print("Loading and preprocessing data...")
train_data, val_data, test_data = load_and_preprocess_data(DATA_PATH)
print(f"Training dataset size: {len(train_data)}, Validation dataset size: {len(val_data)}, Test dataset size: {len(test_data)}")

In [None]:
# ## Cell 3: Augment the Training Dataset

def augment_data(data, samples_per_disease=20):
    """
    Augment the dataset by creating synthetic symptom combinations with co-occurrence patterns.
    """
    augmented_data = data.copy()
    diseases = data["Disease"].unique()
    
    for disease in diseases:
        disease_rows = data[data["Disease"] == disease]
        all_symptoms = set()
        for symptoms in disease_rows["Symptoms"]:
            all_symptoms.update(symptoms.split())
        all_symptoms = list(all_symptoms)
        
        # Calculate symptom co-occurrence
        symptom_counts = {}
        for symptoms in disease_rows["Symptoms"]:
            symptom_list = symptoms.split()
            for i, s1 in enumerate(symptom_list):
                if s1 not in symptom_counts:
                    symptom_counts[s1] = {}
                for s2 in symptom_list[i+1:]:
                    if s2 not in symptom_counts[s1]:
                        symptom_counts[s1][s2] = 0
                    symptom_counts[s1][s2] += 1
        
        # Normalize to get probabilities
        symptom_probs = {}
        for s1 in symptom_counts:
            total = sum(symptom_counts[s1].values())
            if total > 0:
                symptom_probs[s1] = {s2: count/total for s2, count in symptom_counts[s1].items()}
        
        # Generate synthetic samples
        for _ in range(samples_per_disease):
            num_symptoms = np.random.randint(2, min(len(all_symptoms) + 1, 6))
            selected_symptoms = []
            current_symptom = np.random.choice(all_symptoms)
            selected_symptoms.append(current_symptom)
            
            for _ in range(num_symptoms - 1):
                if current_symptom in symptom_probs and symptom_probs[current_symptom]:
                    next_symptom_probs = symptom_probs[current_symptom]
                    next_symptom = np.random.choice(
                        list(next_symptom_probs.keys()),
                        p=list(next_symptom_probs.values())
                    )
                    selected_symptoms.append(next_symptom)
                    current_symptom = next_symptom
                else:
                    remaining_symptoms = [s for s in all_symptoms if s not in selected_symptoms]
                    if remaining_symptoms:
                        next_symptom = np.random.choice(remaining_symptoms)
                        selected_symptoms.append(next_symptom)
                        current_symptom = next_symptom
            
            synthetic_symptoms = " ".join(sorted(set(selected_symptoms)))
            augmented_data = pd.concat([augmented_data, pd.DataFrame({
                "Disease": [disease],
                "Symptoms": [synthetic_symptoms]
            })], ignore_index=True)
    
    return augmented_data

print("Augmenting training data...")
augmented_train_data = augment_data(train_data, samples_per_disease=20)
print(f"Original training dataset size: {len(train_data)}, Augmented training dataset size: {len(augmented_train_data)}")

In [None]:
# ## Cell 4: Prepare Data for Training

def prepare_data(train_data, val_data, test_data, max_len=20):
    """
    Tokenize symptoms, encode diseases, and prepare training, validation, and test data.
    """
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(train_data["Symptoms"])
    
    train_sequences = tokenizer.texts_to_sequences(train_data["Symptoms"])
    train_sequences = pad_sequences(train_sequences, maxlen=max_len, padding="post")
    
    val_sequences = tokenizer.texts_to_sequences(val_data["Symptoms"])
    val_sequences = pad_sequences(val_sequences, maxlen=max_len, padding="post")
    
    test_sequences = tokenizer.texts_to_sequences(test_data["Symptoms"])
    test_sequences = pad_sequences(test_sequences, maxlen=max_len, padding="post")
    
    label_encoder = LabelEncoder()
    train_labels = label_encoder.fit_transform(train_data["Disease"])
    val_labels = label_encoder.transform(val_data["Disease"])
    test_labels = label_encoder.transform(test_data["Disease"])
    
    class_weights = compute_class_weight("balanced", classes=np.unique(train_labels), y=train_labels)
    class_weight_dict = dict(enumerate(class_weights))
    class_weight_dict = {k: np.sqrt(v) for k, v in class_weight_dict.items()}  # Soften weights
    
    return train_sequences, val_sequences, test_sequences, train_labels, val_labels, test_labels, tokenizer, label_encoder, len(tokenizer.word_index) + 1, class_weight_dict

print("Preparing data for training...")
X_train, X_val, X_test, y_train, y_val, y_test, tokenizer, label_encoder, vocab_size, class_weight_dict = prepare_data(augmented_train_data, val_data, test_data)
num_classes = len(label_encoder.classes_)
print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}, Test samples: {len(X_test)}")
print(f"Vocabulary size: {vocab_size}, Number of classes: {num_classes}")

In [None]:
# ## Cell 5: Build and Train the Model

def build_and_train_model(X_train, X_val, y_train, y_val, vocab_size, num_classes, class_weight_dict, max_len=20, embedding_dim=50):
    """
    Build and train a simple dense model with a trainable embedding layer.
    """
    model = Sequential([
        Embedding(vocab_size, embedding_dim, input_length=max_len, trainable=True),
        Flatten(),
        Dense(64, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        Dropout(0.5),
        Dense(32, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        Dropout(0.5),
        Dense(num_classes, activation="softmax")
    ])
    
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    model.summary()
    
    early_stopping = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
    
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=50,
        batch_size=32,
        callbacks=[early_stopping],
        class_weight=class_weight_dict,
        verbose=1
    )
    
    return model, history

print("Building and training model...")
model, history = build_and_train_model(
    X_train, X_val, y_train, y_val, vocab_size, num_classes, class_weight_dict
)

In [None]:
# ## Cell 6: Evaluate on Test Set

print("Evaluating on test set...")
test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

In [None]:
# ## Cell 7: Save the Model, Tokenizer, and Label Encoder

print("Saving model, tokenizer, and label encoder...")
model.save(MODEL_PATH)
with open(TOKENIZER_PATH, "wb") as f:
    pickle.dump(tokenizer, f)
with open(LABEL_ENCODER_PATH, "wb") as f:
    pickle.dump(label_encoder, f)

print("Training complete!")