In [1]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("paultimothymooney/chest-xray-pneumonia")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/chest-xray-pneumonia


In [2]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = "/kaggle/working"

# Verify Kaggle API
!kaggle datasets list


Traceback (most recent call last):
  File "/usr/local/bin/kaggle", line 5, in <module>
    from kaggle.cli import main
  File "/usr/local/lib/python3.10/dist-packages/kaggle/__init__.py", line 7, in <module>
    api.authenticate()
  File "/usr/local/lib/python3.10/dist-packages/kaggle/api/kaggle_api_extended.py", line 407, in authenticate
    raise IOError('Could not find {}. Make sure it\'s located in'
OSError: Could not find kaggle.json. Make sure it's located in /kaggle/working. Or use the environment method. See setup instructions at https://github.com/Kaggle/kaggle-api/


In [3]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

# Deep Learning and ML Libraries
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50V2, DenseNet121
from tensorflow.keras.layers import (
    Dense, GlobalAveragePooling2D, Input, 
    Dropout, Flatten, Concatenate
)
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# Additional Libraries
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    confusion_matrix, 
    classification_report, 
    roc_curve, 
    roc_auc_score,
    precision_recall_curve,
    average_precision_score
)
from sklearn.preprocessing import LabelEncoder

# Memory Management and Augmentation
from tensorflow.keras.preprocessing.image import load_img, img_to_array

class PneumoniaClassificationPipeline:
    def __init__(self, dataset_path):
        """
        Initialize the pipeline with dataset paths and configuration
        
        Args:
            dataset_path (str): Root path of the chest X-ray dataset
        """
        self.dataset_path = dataset_path
        self.train_path = os.path.join(dataset_path, 'train')
        self.test_path = os.path.join(dataset_path, 'test')
        self.val_path = os.path.join(dataset_path, 'val')
        
        # Configuration parameters
        self.img_height = 224
        self.img_width = 224
        self.batch_size = 16  # Reduced batch size for memory efficiency
        self.epochs = 30  # Reduced epochs
        
    def load_and_preprocess_data(self, directory, is_test=False):
        """
        Load and preprocess chest X-ray images with memory-efficient approach
        
        Args:
            directory (str): Directory to load images from
            is_test (bool): Whether this is test data (no augmentation)
        
        Returns:
            tuple: Preprocessed image data and labels
        """
        images = []
        labels = []
        
        # Data augmentation for training
        if not is_test:
            datagen = ImageDataGenerator(
                rotation_range=20,
                width_shift_range=0.2,
                height_shift_range=0.2,
                horizontal_flip=True,
                zoom_range=0.2,
                shear_range=0.2,
                fill_mode='nearest'
            )
        
        for label in ['NORMAL', 'PNEUMONIA']:
            class_path = os.path.join(directory, label)
            class_label = 1 if label == 'PNEUMONIA' else 0
            
            for img_name in os.listdir(class_path):
                img_path = os.path.join(class_path, img_name)
                try:
                    # Load and preprocess image
                    img = load_img(img_path, target_size=(self.img_height, self.img_width))
                    img_array = img_to_array(img) / 255.0  # Normalize
                    
                    # Apply augmentation only for training
                    if not is_test:
                        # Generate additional augmented images
                        for _ in range(2):  # Generate 2 additional augmented images
                            augmented = datagen.random_transform(img_array)
                            images.append(augmented)
                            labels.append(class_label)
                    
                    # Original image
                    images.append(img_array)
                    labels.append(class_label)
                    
                except Exception as e:
                    print(f"Error loading {img_path}: {e}")
        
        return np.array(images), np.array(labels)
    
    def visualize_class_distribution(self, y, title):
        """
        Visualize class distribution
        
        Args:
            y (array): Labels
            title (str): Plot title
        """
        plt.figure(figsize=(8, 4))
        unique, counts = np.unique(y, return_counts=True)
        plt.bar(['NORMAL', 'PNEUMONIA'], counts)
        plt.title(title)
        plt.ylabel('Number of Samples')
        plt.tight_layout()
        plt.savefig(f'{title.lower().replace(" ", "_")}.png')
        plt.close()
        
        # Print distribution details
        for label, count in zip(['NORMAL', 'PNEUMONIA'], counts):
            percentage = count / len(y) * 100
            print(f"{label}: {count} samples ({percentage:.2f}%)")
    
    def build_resnet_model(self):
        """
        Build ResNet50V2 model for pneumonia classification
        
        Returns:
            Model: Compiled ResNet model
        """
        # Base ResNet model
        base_model = ResNet50V2(
            weights='imagenet', 
            include_top=False, 
            input_shape=(self.img_height, self.img_width, 3)
        )
        
        # Freeze base model layers
        base_model.trainable = False
        
        # Add custom layers
        model = Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(1, activation='sigmoid')
        ])
        
        # Compile model
        model.compile(
            optimizer=Adam(learning_rate=0.0001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def build_densenet_model(self):
        """
        Build DenseNet121 model for pneumonia classification
        
        Returns:
            Model: Compiled DenseNet model
        """
        # Base DenseNet model
        base_model = DenseNet121(
            weights='imagenet', 
            include_top=False, 
            input_shape=(self.img_height, self.img_width, 3)
        )
        
        # Freeze base model layers
        base_model.trainable = False
        
        # Add custom layers
        model = Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(1, activation='sigmoid')
        ])
        
        # Compile model
        model.compile(
            optimizer=Adam(learning_rate=0.0001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def evaluate_model(self, model, X_test, y_test):
        """
        Comprehensive model evaluation
        
        Args:
            model (Model): Trained model
            X_test (array): Test images
            y_test (array): Test labels
        
        Returns:
            dict: Detailed model performance metrics
        """
        # Predict probabilities
        y_pred_proba = model.predict(X_test).ravel()
        y_pred = (y_pred_proba > 0.5).astype(int)
        
        # Compute metrics
        results = {
            'confusion_matrix': confusion_matrix(y_test, y_pred),
            'classification_report': classification_report(y_test, y_pred),
            'roc_auc': roc_auc_score(y_test, y_pred_proba),
            'precision_recall_auc': average_precision_score(y_test, y_pred_proba)
        }
        
        return results
    
    def predict_pneumonia(self, model, image_path):
        """
        Predict pneumonia for a single X-ray image
        
        Args:
            model (Model): Trained model
            image_path (str): Path to the X-ray image
        
        Returns:
            tuple: Prediction probability and label
        """
        # Load and preprocess image
        img = load_img(image_path, target_size=(self.img_height, self.img_width))
        img_array = img_to_array(img) / 255.0
        img_array = np.expand_dims(img_array, axis=0)
        
        # Predict
        prediction_proba = model.predict(img_array)[0][0]
        prediction_label = "PNEUMONIA" if prediction_proba > 0.5 else "NORMAL"
        
        return prediction_proba, prediction_label
    
    def run_pipeline(self):
        """
        Run the complete machine learning pipeline
        """
        # Load and preprocess training data
        X_train, y_train = self.load_and_preprocess_data(self.train_path)
        
        # Visualize original training data distribution
        self.visualize_class_distribution(y_train, "Original Training Data Distribution")
        
        # Load and preprocess test data
        X_test, y_test = self.load_and_preprocess_data(self.test_path, is_test=True)
        
        # Visualize test data distribution
        self.visualize_class_distribution(y_test, "Test Data Distribution")
        
        # Prepare models
        resnet_model = self.build_resnet_model()
        densenet_model = self.build_densenet_model()
        
        # Model checkpoints
        resnet_checkpoint = ModelCheckpoint(
            'best_resnet_model.h5', 
            monitor='val_accuracy', 
            save_best_only=True
        )
        densenet_checkpoint = ModelCheckpoint(
            'best_densenet_model.h5', 
            monitor='val_accuracy', 
            save_best_only=True
        )
        
        # Early stopping
        early_stopping = EarlyStopping(
            monitor='val_loss', 
            patience=10, 
            restore_best_weights=True
        )
        
        # Train ResNet model
        print("\nTraining ResNet Model...")
        resnet_history = resnet_model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=self.epochs,
            batch_size=self.batch_size,
            callbacks=[resnet_checkpoint, early_stopping]
        )
        
        # Train DenseNet model
        print("\nTraining DenseNet Model...")
        densenet_history = densenet_model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=self.epochs,
            batch_size=self.batch_size,
            callbacks=[densenet_checkpoint, early_stopping]
        )
        
        # Evaluate models
        print("\nEvaluating ResNet Model:")
        resnet_results = self.evaluate_model(resnet_model, X_test, y_test)
        print_model_results(resnet_results, "ResNet")
        
        print("\nEvaluating DenseNet Model:")
        densenet_results = self.evaluate_model(densenet_model, X_test, y_test)
        print_model_results(densenet_results, "DenseNet")
        
        return resnet_model, densenet_model

def print_model_results(results, model_name):
    """
    Print detailed model evaluation results
    
    Args:
        results (dict): Model performance metrics
        model_name (str): Name of the model
    """
    print(f"\n{model_name} Model Performance:")
    print("Confusion Matrix:")
    print(results['confusion_matrix'])
    print("\nClassification Report:")
    print(results['classification_report'])
    print(f"ROC AUC: {results['roc_auc']:.4f}")
    print(f"Precision-Recall AUC: {results['precision_recall_auc']:.4f}")

# Main execution
if __name__ == "__main__":
    # Dataset path
    dataset_path = "/kaggle/input/chest-xray-pneumonia/chest_xray"
    
    # Initialize and run pipeline
    pipeline = PneumoniaClassificationPipeline(dataset_path)
    resnet_model, densenet_model = pipeline.run_pipeline()
    
    # Example prediction (replace with actual X-ray image path)
    sample_pneumonia_image = "/path/to/sample/pneumonia/image.jpg"
    sample_normal_image = "/path/to/sample/normal/image.jpg"
    
    # Predict using ResNet
    print("\nResNet Model Predictions:")
    resnet_pneumonia_pred = pipeline.predict_pneumonia(resnet_model, sample_pneumonia_image)
    resnet_normal_pred = pipeline.predict_pneumonia(resnet_model, sample_normal_image)
    print(f"Pneumonia Image - Probability: {resnet_pneumonia_pred[0]:.4f}, Label: {resnet_pneumonia_pred[1]}")
    print(f"Normal Image - Probability: {resnet_normal_pred[0]:.4f}, Label: {resnet_normal_pred[1]}")
    
    # Predict using DenseNet
    print("\nDenseNet Model Predictions:")
    densenet_pneumonia_pred = pipeline.predict_pneumonia(densenet_model, sample_pneumonia_image)
    densenet_normal_pred = pipeline.predict_pneumonia(densenet_model, sample_normal_image)
    print(f"Pneumonia Image - Probability: {densenet_pneumonia_pred[0]:.4f}, Label: {densenet_pneumonia_pred[1]}")
    print(f"Normal Image - Probability: {densenet_normal_pred[0]:.4f}, Label: {densenet_normal_pred[1]}")

# Requirements (to be installed):
# tensorflow
# scikit-learn
# matplotlib
# opencv-python

NORMAL: 4023 samples (25.71%)
PNEUMONIA: 11625 samples (74.29%)
NORMAL: 234 samples (37.50%)
PNEUMONIA: 390 samples (62.50%)
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50v2_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94668760/94668760[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/densenet/densenet121_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m29084464/29084464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


ValueError: The filepath provided must end in `.keras` (Keras model format). Received: filepath=best_resnet_model.h5

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

# Deep Learning and ML Libraries
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50V2, DenseNet121
from tensorflow.keras.layers import (
    Dense, GlobalAveragePooling2D, Input, 
    Dropout, Flatten, Concatenate
)
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# Additional Libraries
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    confusion_matrix, 
    classification_report, 
    roc_curve, 
    roc_auc_score,
    precision_recall_curve,
    average_precision_score
)
from sklearn.preprocessing import LabelEncoder

# Memory Management and Augmentation
from tensorflow.keras.preprocessing.image import load_img, img_to_array

class PneumoniaClassificationPipeline:
    def __init__(self, dataset_path):
        """
        Initialize the pipeline with dataset paths and configuration
        
        Args:
            dataset_path (str): Root path of the chest X-ray dataset
        """
        self.dataset_path = dataset_path
        self.train_path = os.path.join(dataset_path, 'train')
        self.test_path = os.path.join(dataset_path, 'test')
        self.val_path = os.path.join(dataset_path, 'val')
        
        # Configuration parameters
        self.img_height = 224
        self.img_width = 224
        self.batch_size = 16  # Reduced batch size for memory efficiency
        self.epochs = 30  # Reduced epochs
        
    def load_and_preprocess_data(self, directory, is_test=False):
        """
        Load and preprocess chest X-ray images with memory-efficient approach
        
        Args:
            directory (str): Directory to load images from
            is_test (bool): Whether this is test data (no augmentation)
        
        Returns:
            tuple: Preprocessed image data and labels
        """
        images = []
        labels = []
        
        # Data augmentation for training
        if not is_test:
            datagen = ImageDataGenerator(
                rotation_range=20,
                width_shift_range=0.2,
                height_shift_range=0.2,
                horizontal_flip=True,
                zoom_range=0.2,
                shear_range=0.2,
                fill_mode='nearest'
            )
        
        for label in ['NORMAL', 'PNEUMONIA']:
            class_path = os.path.join(directory, label)
            class_label = 1 if label == 'PNEUMONIA' else 0
            
            for img_name in os.listdir(class_path):
                img_path = os.path.join(class_path, img_name)
                try:
                    # Load and preprocess image
                    img = load_img(img_path, target_size=(self.img_height, self.img_width))
                    img_array = img_to_array(img) / 255.0  # Normalize
                    
                    # Apply augmentation only for training
                    if not is_test:
                        # Generate additional augmented images
                        for _ in range(2):  # Generate 2 additional augmented images
                            augmented = datagen.random_transform(img_array)
                            images.append(augmented)
                            labels.append(class_label)
                    
                    # Original image
                    images.append(img_array)
                    labels.append(class_label)
                    
                except Exception as e:
                    print(f"Error loading {img_path}: {e}")
        
        return np.array(images), np.array(labels)
    
    def visualize_class_distribution(self, y, title):
        """
        Visualize class distribution
        
        Args:
            y (array): Labels
            title (str): Plot title
        """
        plt.figure(figsize=(8, 4))
        unique, counts = np.unique(y, return_counts=True)
        plt.bar(['NORMAL', 'PNEUMONIA'], counts)
        plt.title(title)
        plt.ylabel('Number of Samples')
        plt.tight_layout()
        plt.savefig(f'{title.lower().replace(" ", "_")}.png')
        plt.close()
        
        # Print distribution details
        for label, count in zip(['NORMAL', 'PNEUMONIA'], counts):
            percentage = count / len(y) * 100
            print(f"{label}: {count} samples ({percentage:.2f}%)")
    
    def build_resnet_model(self):
        """
        Build ResNet50V2 model for pneumonia classification
        
        Returns:
            Model: Compiled ResNet model
        """
        # Base ResNet model
        base_model = ResNet50V2(
            weights='imagenet', 
            include_top=False, 
            input_shape=(self.img_height, self.img_width, 3)
        )
        
        # Freeze base model layers
        base_model.trainable = False
        
        # Add custom layers
        model = Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(1, activation='sigmoid')
        ])
        
        # Compile model
        model.compile(
            optimizer=Adam(learning_rate=0.0001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def build_densenet_model(self):
        """
        Build DenseNet121 model for pneumonia classification
        
        Returns:
            Model: Compiled DenseNet model
        """
        # Base DenseNet model
        base_model = DenseNet121(
            weights='imagenet', 
            include_top=False, 
            input_shape=(self.img_height, self.img_width, 3)
        )
        
        # Freeze base model layers
        base_model.trainable = False
        
        # Add custom layers
        model = Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(1, activation='sigmoid')
        ])
        
        # Compile model
        model.compile(
            optimizer=Adam(learning_rate=0.0001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def evaluate_model(self, model, X_test, y_test):
        """
        Comprehensive model evaluation
        
        Args:
            model (Model): Trained model
            X_test (array): Test images
            y_test (array): Test labels
        
        Returns:
            dict: Detailed model performance metrics
        """
        # Predict probabilities
        y_pred_proba = model.predict(X_test).ravel()
        y_pred = (y_pred_proba > 0.5).astype(int)
        
        # Compute metrics
        results = {
            'confusion_matrix': confusion_matrix(y_test, y_pred),
            'classification_report': classification_report(y_test, y_pred),
            'roc_auc': roc_auc_score(y_test, y_pred_proba),
            'precision_recall_auc': average_precision_score(y_test, y_pred_proba)
        }
        
        return results
    
    def predict_pneumonia(self, model, image_path):
        """
        Predict pneumonia for a single X-ray image
        
        Args:
            model (Model): Trained model
            image_path (str): Path to the X-ray image
        
        Returns:
            tuple: Prediction probability and label
        """
        # Load and preprocess image
        img = load_img(image_path, target_size=(self.img_height, self.img_width))
        img_array = img_to_array(img) / 255.0
        img_array = np.expand_dims(img_array, axis=0)
        
        # Predict
        prediction_proba = model.predict(img_array)[0][0]
        prediction_label = "PNEUMONIA" if prediction_proba > 0.5 else "NORMAL"
        
        return prediction_proba, prediction_label
    
    def run_pipeline(self):
        """
        Run the complete machine learning pipeline
        """
        # Load and preprocess training data
        X_train, y_train = self.load_and_preprocess_data(self.train_path)
        
        # Visualize original training data distribution
        self.visualize_class_distribution(y_train, "Original Training Data Distribution")
        
        # Load and preprocess test data
        X_test, y_test = self.load_and_preprocess_data(self.test_path, is_test=True)
        
        # Visualize test data distribution
        self.visualize_class_distribution(y_test, "Test Data Distribution")
        
        # Prepare models
        resnet_model = self.build_resnet_model()
        densenet_model = self.build_densenet_model()
        
        # Model checkpoints (updated to .keras extension)
        resnet_checkpoint = ModelCheckpoint(
            'best_resnet_model.keras', 
            monitor='val_accuracy', 
            save_best_only=True
        )
        densenet_checkpoint = ModelCheckpoint(
            'best_densenet_model.keras', 
            monitor='val_accuracy', 
            save_best_only=True
        )
        
        # Early stopping
        early_stopping = EarlyStopping(
            monitor='val_loss', 
            patience=10, 
            restore_best_weights=True
        )
        
        # Train ResNet model
        print("\nTraining ResNet Model...")
        resnet_history = resnet_model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=self.epochs,
            batch_size=self.batch_size,
            callbacks=[resnet_checkpoint, early_stopping]
        )
        
        # Train DenseNet model
        print("\nTraining DenseNet Model...")
        densenet_history = densenet_model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=self.epochs,
            batch_size=self.batch_size,
            callbacks=[densenet_checkpoint, early_stopping]
        )
        
        # Evaluate models
        print("\nEvaluating ResNet Model:")
        resnet_results = self.evaluate_model(resnet_model, X_test, y_test)
        print_model_results(resnet_results, "ResNet")
        
        print("\nEvaluating DenseNet Model:")
        densenet_results = self.evaluate_model(densenet_model, X_test, y_test)
        print_model_results(densenet_results, "DenseNet")
        
        return resnet_model, densenet_model

def print_model_results(results, model_name):
    """
    Print detailed model evaluation results
    
    Args:
        results (dict): Model performance metrics
        model_name (str): Name of the model
    """
    print(f"\n{model_name} Model Performance:")
    print("Confusion Matrix:")
    print(results['confusion_matrix'])
    print("\nClassification Report:")
    print(results['classification_report'])
    print(f"ROC AUC: {results['roc_auc']:.4f}")
    print(f"Precision-Recall AUC: {results['precision_recall_auc']:.4f}")

# Main execution
if __name__ == "__main__":
    # Dataset path (you'll need to replace this with your actual path)
    dataset_path = "/path/to/chest_xray"
    
    # Initialize and run pipeline
    pipeline = PneumoniaClassificationPipeline(dataset_path)
    resnet_model, densenet_model = pipeline.run_pipeline()
    
    # Example prediction (replace with actual X-ray image paths)
    sample_pneumonia_image = "/kaggle/input/chest-xray-pneumonia/chest_xray/test/PNEUMONIA/person100_bacteria_477.jpeg"
    sample_normal_image = "/kaggle/input/chest-xray-pneumonia/chest_xray/test/NORMAL/IM-0006-0001.jpeg"
    
    # Predict using ResNet
    print("\nResNet Model Predictions:")
    resnet_pneumonia_pred = pipeline.predict_pneumonia(resnet_model, sample_pneumonia_image)
    resnet_normal_pred = pipeline.predict_pneumonia(resnet_model, sample_normal_image)
    print(f"Pneumonia Image - Probability: {resnet_pneumonia_pred[0]:.4f}, Label: {resnet_pneumonia_pred[1]}")
    print(f"Normal Image - Probability: {resnet_normal_pred[0]:.4f}, Label: {resnet_normal_pred[1]}")
    
    # Predict using DenseNet
    print("\nDenseNet Model Predictions:")
    densenet_pneumonia_pred = pipeline.predict_pneumonia(densenet_model, sample_pneumonia_image)
    densenet_normal_pred = pipeline.predict_pneumonia(densenet_model, sample_normal_image)
    print(f"Pneumonia Image - Probability: {densenet_pneumonia_pred[0]:.4f}, Label: {densenet_pneumonia_pred[1]}")
    print(f"Normal Image - Probability: {densenet_normal_pred[0]:.4f}, Label: {densenet_normal_pred[1]}")

# Requirements (to be installed):
# tensorflow
# scikit-learn
# matplotlib
# opencv-python

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

# Deep Learning and ML Libraries
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50V2, DenseNet121
from tensorflow.keras.layers import (
    Dense, GlobalAveragePooling2D, Input, 
    Dropout, Flatten, Conv2D, 
    MaxPooling2D, BatchNormalization
)
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# Additional Libraries
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    confusion_matrix, 
    classification_report, 
    roc_auc_score
)

# Imbalance Handling
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import TomekLinks
from imblearn.pipeline import Pipeline as ImbPipeline

# SHAP for Interpretability
import shap

class AdvancedPneumoniaClassificationPipeline:
    def __init__(self, dataset_path):
        """
        Initialize the advanced pipeline with memory-efficient configurations
        
        Args:
            dataset_path (str): Root path of the chest X-ray dataset
        """
        self.dataset_path = dataset_path
        self.train_path = os.path.join(dataset_path, 'train')
        self.test_path = os.path.join(dataset_path, 'test')
        
        # Reduced configuration parameters to save memory
        self.img_height = 224
        self.img_width = 224
        self.batch_size = 16  # Reduced batch size
        self.epochs = 30  # Reduced epochs
        self.num_classes = 2
        
        # Configure GPU memory growth
        gpus = tf.config.experimental.list_physical_devices('GPU')
        if gpus:
            try:
                for gpu in gpus:
                    tf.config.experimental.set_memory_growth(gpu, True)
            except RuntimeError as e:
                print(e)
    
    def load_and_preprocess_images(self, directory, is_test=False, max_images=None):
        """
        Memory-efficient image loading and preprocessing
        
        Args:
            directory (str): Directory to load images from
            is_test (bool): Whether this is test data (no augmentation)
            max_images (int, optional): Limit number of images to load
        
        Returns:
            tuple: Preprocessed image data and labels
        """
        images = []
        labels = []
        
        # Simplified data augmentation
        if not is_test:
            datagen = ImageDataGenerator(
                rotation_range=20,
                width_shift_range=0.1,
                height_shift_range=0.1,
                horizontal_flip=True,
                zoom_range=0.1
            )
        
        label_map = {'NORMAL': 0, 'PNEUMONIA': 1}
        
        for label, class_index in label_map.items():
            class_path = os.path.join(directory, label)
            image_files = os.listdir(class_path)
            
            # Limit images if specified
            if max_images:
                image_files = image_files[:max_images]
            
            for img_name in image_files:
                img_path = os.path.join(class_path, img_name)
                try:
                    # More memory-efficient image loading
                    img = cv2.imread(img_path, cv2.IMREAD_COLOR)
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                    img = cv2.resize(img, (self.img_height, self.img_width))
                    img = img / 255.0  # Normalize
                    
                    # Apply augmentation only for training
                    if not is_test:
                        # Generate fewer augmented images
                        for _ in range(1):  # Generate 1 additional augmented image
                            augmented = datagen.random_transform(img)
                            images.append(augmented)
                            labels.append(class_index)
                    
                    # Original image
                    images.append(img)
                    labels.append(class_index)
                    
                except Exception as e:
                    print(f"Error loading {img_path}: {e}")
        
        return np.array(images), np.array(labels)
    
    def apply_smote(self, X, y):
        """
        Apply SMOTE and Tomek Links for balanced dataset with memory efficiency
        
        Args:
            X (array): Input features
            y (array): Input labels
        
        Returns:
            tuple: Balanced dataset
        """
        # Flatten the image for SMOTE
        X_flat = X.reshape(X.shape[0], -1)
        
        # Create SMOTE-Tomek Links pipeline
        smote_tomek = ImbPipeline([
            ('smote', SMOTE(sampling_strategy='auto', random_state=42)),
            ('tomek', TomekLinks())
        ])
        
        # Apply SMOTE-Tomek
        X_resampled, y_resampled = smote_tomek.fit_resample(X_flat, y)
        
        # Reshape back to image dimensions
        X_resampled = X_resampled.reshape(
            X_resampled.shape[0], 
            self.img_height, 
            self.img_width, 
            3
        )
        
        return X_resampled, y_resampled
    
    def build_ensemble_models(self):
        """
        Build multiple deep learning models for ensemble with memory-efficient design
        
        Returns:
            list: Compiled models
        """
        # ResNet50V2 Model
        resnet_base = ResNet50V2(
            weights='imagenet', 
            include_top=False, 
            input_shape=(self.img_height, self.img_width, 3)
        )
        resnet_base.trainable = False
        
        resnet_model = Sequential([
            resnet_base,
            GlobalAveragePooling2D(),
            Dense(256, activation='relu'),  # Reduced dense layer size
            Dropout(0.3),  # Adjusted dropout
            Dense(1, activation='sigmoid')
        ])
        
        # DenseNet121 Model
        densenet_base = DenseNet121(
            weights='imagenet', 
            include_top=False, 
            input_shape=(self.img_height, self.img_width, 3)
        )
        densenet_base.trainable = False
        
        densenet_model = Sequential([
            densenet_base,
            GlobalAveragePooling2D(),
            Dense(256, activation='relu'),  # Reduced dense layer size
            Dropout(0.3),  # Adjusted dropout
            Dense(1, activation='sigmoid')
        ])
        
        # Custom Lightweight CNN Model
        inputs = Input(shape=(self.img_height, self.img_width, 3))
        x = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
        x = BatchNormalization()(x)
        x = MaxPooling2D()(x)
        x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
        x = BatchNormalization()(x)
        x = MaxPooling2D()(x)
        x = Flatten()(x)
        x = Dense(128, activation='relu')(x)
        x = Dropout(0.3)(x)
        outputs = Dense(1, activation='sigmoid')(x)
        
        lightweight_model = Model(inputs=inputs, outputs=outputs)
        
        # Compile models
        models = [resnet_model, densenet_model, lightweight_model]
        
        for model in models:
            model.compile(
                optimizer=Adam(learning_rate=0.0001),
                loss='binary_crossentropy',
                metrics=['accuracy']
            )
        
        return models
    
    def ensemble_predict(self, models, X):
        """
        Ensemble prediction using soft voting
        
        Args:
            models (list): List of trained models
            X (array): Input images
        
        Returns:
            array: Ensemble predictions
        """
        predictions = [model.predict(X, verbose=0) for model in models]
        ensemble_pred = np.mean(predictions, axis=0)
        return ensemble_pred
    
    def run_advanced_pipeline(self):
        """
        Run the complete advanced machine learning pipeline with memory optimization
        """
        # Load and preprocess training data with image limit
        X_train, y_train = self.load_and_preprocess_images(self.train_path, max_images=1000)
        
        # Visualize original training data distribution
        self.visualize_class_distribution(y_train, "Original Training Data Distribution", 'before')
        
        # Apply SMOTE to balance dataset
        X_train_resampled, y_train_resampled = self.apply_smote(X_train, y_train)
        
        # Visualize balanced training data distribution
        self.visualize_class_distribution(y_train_resampled, "Balanced Training Data Distribution", 'after')
        
        # Load test data
        X_test, y_test = self.load_and_preprocess_images(self.test_path, is_test=True, max_images=500)
        
        # Prepare ensemble models
        models = self.build_ensemble_models()
        
        # Train models with early stopping and model checkpointing
        trained_models = []
        for i, model in enumerate(models, 1):
            print(f"\nTraining Model {i}")
            checkpoint = ModelCheckpoint(
                f'best_model_{i}.keras', 
                monitor='val_accuracy', 
                save_best_only=True
            )
            early_stopping = EarlyStopping(
                monitor='val_loss', 
                patience=5, 
                restore_best_weights=True
            )
            
            history = model.fit(
                X_train_resampled, y_train_resampled,
                validation_split=0.2,
                epochs=self.epochs,
                batch_size=self.batch_size,
                callbacks=[checkpoint, early_stopping],
                verbose=1
            )
            
            trained_models.append(model)
        
        # Ensemble prediction
        ensemble_pred_proba = self.ensemble_predict(trained_models, X_test)
        ensemble_pred = (ensemble_pred_proba > 0.5).astype(int)
        
        # Evaluate ensemble performance
        print("\nEnsemble Model Performance:")
        print("Confusion Matrix:")
        print(confusion_matrix(y_test, ensemble_pred))
        print("\nClassification Report:")
        print(classification_report(y_test, ensemble_pred))
        print(f"ROC AUC: {roc_auc_score(y_test, ensemble_pred_proba):.4f}")
        
        return trained_models, ensemble_pred_proba
    
    def visualize_class_distribution(self, y, title, before_after='before'):
        """
        Visualize class distribution with enhanced plotting
        
        Args:
            y (array): Labels
            title (str): Plot title
            before_after (str): Indicate before or after SMOTE
        """
        plt.figure(figsize=(10, 5))
        unique, counts = np.unique(y, return_counts=True)
        plt.bar(['NORMAL', 'PNEUMONIA'], counts)
        plt.title(f'{title} - {before_after.capitalize()} SMOTE')
        plt.ylabel('Number of Samples')
        plt.tight_layout()
        plt.savefig(f'{title.lower().replace(" ", "_")}_{before_after}_smote.png')
        plt.close()
        
        # Print distribution details
        for label, count in zip(['NORMAL', 'PNEUMONIA'], counts):
            percentage = count / len(y) * 100
            print(f"{label}: {count} samples ({percentage:.2f}%)")
    
    def predict_pneumonia(self, models, image_path):
        """
        Predict pneumonia for a single X-ray image using ensemble
        
        Args:
            models (list): Trained models
            image_path (str): Path to the X-ray image
        
        Returns:
            tuple: Prediction probability and label
        """
        # Load and preprocess image
        img = cv2.imread(image_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (self.img_height, self.img_width))
        img = img / 255.0
        img = np.expand_dims(img, axis=0)
        
        # Ensemble prediction
        prediction_proba = self.ensemble_predict(models, img)[0][0]
        prediction_label = "PNEUMONIA" if prediction_proba > 0.5 else "NORMAL"
        
        return prediction_proba, prediction_label

# Main execution
if __name__ == "__main__":
    # Updated dataset path
    dataset_path = "/kaggle/input/chest-xray-pneumonia/chest_xray"
    
    # Initialize and run advanced pipeline
    pipeline = AdvancedPneumoniaClassificationPipeline(dataset_path)
    ensemble_models, predictions = pipeline.run_advanced_pipeline()
    
    # Example predictions
    sample_pneumonia_image = os.path.join(dataset_path, "test/PNEUMONIA/person100_bacteria_477.jpeg")
    sample_normal_image = os.path.join(dataset_path, "test/NORMAL/IM-0006-0001.jpeg")
    
    # Predict using ensemble
    print("\nEnsemble Model Predictions:")
    pneumonia_pred = pipeline.predict_pneumonia(ensemble_models, sample_pneumonia_image)
    normal_pred = pipeline.predict_pneumonia(ensemble_models, sample_normal_image)
    
    print(f"Pneumonia Image - Probability: {pneumonia_pred[0]:.4f}, Label: {pneumonia_pred[1]}")
    print(f"Normal Image - Probability: {normal_pred[0]:.4f}, Label: {normal_pred[1]}")

# Requirements (to be installed):
# tensorflow
# scikit-learn
# imbalanced-learn
# matplotlib
# opencv-python
# shap