In [1]:
# Standard imports - DO NOT CHANGE
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import roc_curve, auc, precision_recall_curve
import warnings
warnings.filterwarnings('ignore')

In [2]:
class LeukemiaDetectionModel:
    def __init__(self, target_column, id_column=None):
        """Initialize model with flexible column names"""
        self.target_column = target_column
        self.id_column = id_column
        self.label_encoders = {}
        self.scaler = StandardScaler()
        self.trained_models = None

    def load_data(self, data_path=None, dataframe=None):
        """Load data from either a CSV file or existing dataframe"""
        if data_path is not None:
            if data_path.endswith('.csv'):
                self.df = pd.read_csv(data_path)
            elif data_path.endswith('.xlsx'):
                self.df = pd.read_excel(data_path)
            elif data_path.endswith('.json'):
                self.df = pd.read_json(data_path)
            else:
                raise ValueError("Unsupported file format. Please provide a CSV, Excel, or JSON file.")
        elif dataframe is not None:
            self.df = dataframe.copy()
        else:
            raise ValueError("Either data_path or dataframe must be provided")
        
        # Verify target column exists
        if self.target_column not in self.df.columns:
            raise ValueError(f"Target column '{self.target_column}' not found in data")
        
        # Handle missing values
        if self.df.isnull().sum().sum() > 0:
            self.df.fillna(self.df.mean(numeric_only=True), inplace=True)
            self.df.fillna('Unknown', inplace=True)
        
        return self

    def explore_data(self, max_plots=6):
        """Explore data with automatic feature type detection"""
        print("\nClass Distribution for", self.target_column)
        print(self.df[self.target_column].value_counts(normalize=True))
        
        # Convert target to numerical if it's categorical
        if self.df[self.target_column].dtype == 'object':
            target_encoder = LabelEncoder()
            self.df[self.target_column] = target_encoder.fit_transform(self.df[self.target_column])
            print("\nConverted target values:")
            for i, label in enumerate(target_encoder.classes_):
                print(f"{label} -> {i}")
        
        # Automatically identify numerical and categorical columns
        self.numerical_features = self.df.select_dtypes(
            include=['int64', 'float64']).columns.tolist()
        self.categorical_features = self.df.select_dtypes(
            include=['object', 'category']).columns.tolist()
        
        # Remove target and ID columns from features
        for col in [self.target_column, self.id_column]:
            if col in self.numerical_features:
                self.numerical_features.remove(col)
            if col in self.categorical_features:
                self.categorical_features.remove(col)
        
        print("\nNumerical features:", self.numerical_features)
        print("Categorical features:", self.categorical_features)
        
        # Plot distributions of numerical features
        if len(self.numerical_features) > 0:
            n_plots = min(len(self.numerical_features), max_plots)
            plt.figure(figsize=(15, 10))
            for i, feature in enumerate(self.numerical_features[:n_plots], 1):
                plt.subplot(2, 3, i)
                sns.histplot(data=self.df, x=feature, hue=self.target_column, multiple="stack")
                plt.title(f'Distribution of {feature}')
            plt.tight_layout()
            plt.show()

        # Correlation matrix
        if len(self.numerical_features) > 0:
            numerical_df = self.df[self.numerical_features + [self.target_column]]
            plt.figure(figsize=(12, 8))
            sns.heatmap(numerical_df.corr(), annot=True, cmap='coolwarm', center=0)
            plt.title('Correlation Matrix')
            plt.show()
        
        return self

    def preprocess_data(self, test_size=0.2, random_state=42):
        """Preprocess data with automatic handling of different data types"""
        df_processed = self.df.copy()
        
        # Encode categorical variables
        for feature in self.categorical_features:
            self.label_encoders[feature] = LabelEncoder()
            df_processed[feature] = self.label_encoders[feature].fit_transform(df_processed[feature])
        
        # Prepare features and target
        exclude_cols = [col for col in [self.target_column, self.id_column] if col is not None]
        X = df_processed.drop(exclude_cols, axis=1)
        y = df_processed[self.target_column]
        
        # Split the data
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )
        
        # Scale the features
        self.X_train_scaled = self.scaler.fit_transform(self.X_train)
        self.X_test_scaled = self.scaler.transform(self.X_test)
        
        return self

    def train_models(self):
        """Train multiple models with progress updates"""
        self.models = {
            'Logistic Regression': LogisticRegression(max_iter=1000),
            'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42)
        }
        
        self.trained_models = {}
        for name, model in self.models.items():
            print(f"\nTraining {name}...")
            model.fit(self.X_train_scaled, self.y_train)
            train_score = model.score(self.X_train_scaled, self.y_train)
            print(f"{name} training accuracy: {train_score:.4f}")
            self.trained_models[name] = model
        
        return self

    def evaluate_models(self):
        """Evaluate models with detailed metrics"""
        self.results = {}
        
        for name, model in self.trained_models.items():
            print(f"\nEvaluating {name}...")
            y_pred = model.predict(self.X_test_scaled)
            y_pred_proba = model.predict_proba(self.X_test_scaled)[:, 1]
            
            accuracy = accuracy_score(self.y_test, y_pred)
            fpr, tpr, _ = roc_curve(self.y_test, y_pred_proba)
            roc_auc = auc(fpr, tpr)
            
            self.results[name] = {
                'accuracy': accuracy,
                'fpr': fpr,
                'tpr': tpr,
                'auc': roc_auc,
                'predictions': y_pred,
                'probabilities': y_pred_proba
            }

        # Plot ROC curves
        plt.figure(figsize=(10, 8))
        for name, metrics in self.results.items():
            plt.plot(metrics['fpr'], metrics['tpr'], 
                    label=f'{name} (AUC = {metrics["auc"]:.2f})')
        plt.plot([0, 1], [0, 1], 'k--')
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('ROC Curves for All Models')
        plt.legend()
        plt.show()

        # Print detailed classification reports
        for name, metrics in self.results.items():
            print(f"\nClassification Report for {name}:")
            print(classification_report(self.y_test, metrics['predictions']))

        # Compare model accuracies
        accuracies = {name: metrics['accuracy'] 
                     for name, metrics in self.results.items()}
        plt.figure(figsize=(10, 6))
        plt.bar(accuracies.keys(), accuracies.values())
        plt.title('Model Accuracy Comparison')
        plt.ylabel('Accuracy')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
        
        return self

    def _load_data_from_path(self, data_path):
        """Helper method to load data from a file path."""
        if data_path.endswith('.csv'):
            return pd.read_csv(data_path)
        elif data_path.endswith('.xlsx'):
            return pd.read_excel(data_path)
        elif data_path.endswith('.json'):
            return pd.read_json(data_path)
        else:
            raise ValueError("Unsupported file format. Please provide a CSV, Excel, or JSON file.")

    def predict_unlabeled_data(self, data_path=None, dataframe=None, save_to=None):
        """Predict leukemia status for unlabeled data."""
        # Load data from file or dataframe
        if data_path is not None:
            df_processed = self._load_data_from_path(data_path)
        elif dataframe is not None:
            df_processed = dataframe.copy()
        else:
            raise ValueError("Either data_path or dataframe must be provided")
        
        # Encode categorical variables
        for feature in self.categorical_features:
            if feature in df_processed.columns:
                if feature in self.label_encoders:
                    df_processed[feature] = self.label_encoders[feature].transform(df_processed[feature])
                else:
                    raise ValueError(f"Feature '{feature}' was not seen during training")
        
        # Scale numerical features
        X_unlabeled = df_processed.drop(columns=[self.target_column, self.id_column], errors='ignore')
        X_unlabeled_scaled = self.scaler.transform(X_unlabeled)
        
        # Use the trained model to predict
        predictions = {}
        for name, model in self.trained_models.items():
            predictions[name] = model.predict(X_unlabeled_scaled)
        
        # Save predictions to a file if specified
        if save_to:
            pd.DataFrame(predictions).to_csv(save_to, index=False)
        
        return predictions

In [None]:
# Create and run the model
model = LeukemiaDetectionModel(
    target_column='Leukemia_Status',
    id_column='Patient_ID'
)

model.load_data(
    data_path='biased_leukemia_dataset.csv'
)

model.explore_data()

model.preprocess_data(test_size=0.2)

model.train_models()

model.evaluate_models()

In [None]:
# Predict leukemia status for a new dataset
new_data_path = 'unlabeled_leukemia_dataset.csv'  # Path to the new dataset
predictions = model.predict_unlabeled_data(data_path=new_data_path)

# Display predictions for each model
for model_name, preds in predictions.items():
    print(f"Predictions from {model_name}:")
    print(preds[:10])  # Display the first 10 predictions