In [16]:
import tensorflow as tf
from tensorflow.keras.applications import DenseNet121  # Changed to DenseNet121 for better medical image analysis
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, BatchNormalization
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

In [17]:
# Constants
IMAGE_SIZE = 224
BATCH_SIZE = 16  # Smaller batch size for better generalization
EPOCHS = 100     # More epochs for thorough training
BASE_PATH = "D:/LALA/chest_xray"

In [18]:
def create_data_generators():
    """Create data generators with medical-specific augmentation"""
    train_datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=10,        # Limited rotation for medical images
        width_shift_range=0.1,    # Subtle shifts
        height_shift_range=0.1,
        zoom_range=0.1,          # Subtle zoom
        horizontal_flip=True,     # Only horizontal flip is medically valid
        fill_mode='constant',
        validation_split=0.1,
        brightness_range=[0.9, 1.1]  # Subtle brightness adjustment
    )

    test_datagen = ImageDataGenerator(rescale=1./255)

    print("Creating generators...")
    train_generator = train_datagen.flow_from_directory(
        os.path.join(BASE_PATH, 'train'),
        target_size=(IMAGE_SIZE, IMAGE_SIZE),
        batch_size=BATCH_SIZE,
        class_mode='binary',
        classes=['Not Infected', 'Infected'],
        shuffle=True,
        subset='training'
    )

    validation_generator = train_datagen.flow_from_directory(
        os.path.join(BASE_PATH, 'train'),
        target_size=(IMAGE_SIZE, IMAGE_SIZE),
        batch_size=BATCH_SIZE,
        class_mode='binary',
        classes=['Not Infected', 'Infected'],
        shuffle=True,
        subset='validation'
    )

    test_generator = test_datagen.flow_from_directory(
        os.path.join(BASE_PATH, 'test'),
        target_size=(IMAGE_SIZE, IMAGE_SIZE),
        batch_size=1,  # Batch size 1 for precise evaluation
        class_mode='binary',
        classes=['Not Infected', 'Infected'],
        shuffle=False
    )


In [4]:
# 3. Create the model
def create_model():
    # Load the pre-trained EfficientNetB0 model
    base_model = EfficientNetB0(
        weights='imagenet',
        include_top=False,
        input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3)
    )
    
    # Freeze the base model layers
    base_model.trainable = False
    
    model = Sequential([
        base_model,
        GlobalAveragePooling2D(),
        BatchNormalization(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.4),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(128, activation='relu'),
        BatchNormalization(),
        Dropout(0.2),
        Dense(1, activation='sigmoid')
    ])
    
    return model

In [5]:
# 4. Calculate class weights
def calculate_class_weights(train_generator):
    total_samples = train_generator.samples
    n_samples_per_class = train_generator.class_counts
    n_classes = len(n_samples_per_class)
    
    class_weights = {}
    for i in range(n_classes):
        class_weights[i] = total_samples / (n_classes * n_samples_per_class[i])
    
    return class_weights

In [11]:

# 5. Training function
def train_model():
    # Create generators
    train_generator, validation_generator, test_generator = create_data_generators()
    
    # Create model
    model = create_model()
    
    # Compile model
    model.compile(
        optimizer=Adam(learning_rate=1e-4),
        loss='binary_crossentropy',
        metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall(), tf.keras.metrics.AUC()]
    )
    
    # Set up callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=8,
            restore_best_weights=True,
            min_delta=0.001
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=4,
            min_lr=1e-7
        ),
        ModelCheckpoint(
            'best_model.h5',
            monitor='val_accuracy',
            save_best_only=True,
            mode='max'
        )
    ]
     # Calculate class weights
    class_weights = calculate_class_weights(train_generator)
    
    # Initial training
    print("Initial training...")
    history = model.fit(
        train_generator,
        epochs=EPOCHS,
        validation_data=validation_generator,
        callbacks=callbacks,
        class_weight=class_weights
    )
    
    # Fine-tuning
    print("Fine-tuning...")
    base_model = model.layers[0]
    base_model.trainable = True
    
    # Freeze early layers
    for layer in base_model.layers[:-30]:
        layer.trainable = False
    
    # Recompile with lower learning rate
    model.compile(
        optimizer=Adam(learning_rate=1e-6),
        loss='binary_crossentropy',
        metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall(), tf.keras.metrics.AUC()]
    )
    
    # Continue training
    history_fine = model.fit(
        train_generator,
        epochs=20,
        validation_data=validation_generator,
        callbacks=callbacks,
        class_weight=class_weights
    )
    # Evaluate on test set
    test_results = model.evaluate(test_generator)
    print("\nTest results:")
    for metric_name, value in zip(model.metrics_names, test_results):
        print(f"{metric_name}: {value:.4f}")
    
    # Save the final model
    model.save('final_model.h5')
    return model, history, history_fine

    

In [12]:
# 6. Plot training history
def plot_training_history(history, history_fine):
    def plot_metric(metric):
        plt.figure(figsize=(10, 4))
        
        # Initial training
        plt.subplot(1, 2, 1)
        plt.plot(history.history[metric])
        plt.plot(history.history[f'val_{metric}'])
        plt.title(f'Initial Training - {metric}')
        plt.xlabel('Epoch')
        plt.ylabel(metric)
        plt.legend(['Train', 'Validation'])
        
        # Fine-tuning
        plt.subplot(1, 2, 2)
        plt.plot(history_fine.history[metric])
        plt.plot(history_fine.history[f'val_{metric}'])
        plt.title(f'Fine-tuning - {metric}')
        plt.xlabel('Epoch')
        plt.ylabel(metric)
        plt.legend(['Train', 'Validation'])
        
        plt.tight_layout()
        plt.show()
    
    # Plot accuracy and loss
    plot_metric('accuracy')
    plot_metric('loss')

In [15]:
import os
from collections import Counter

def count_images_in_classes(folder_path):
    class_counts = {}
    for class_name in os.listdir(folder_path):
        class_path = os.path.join(folder_path, class_name)
        if os.path.isdir(class_path):
            class_counts[class_name] = len(os.listdir(class_path))
    return class_counts

train_counts = count_images_in_classes('D:/LALA/chest_xray/train')
val_counts = count_images_in_classes('D:/LALA/chest_xray/val')
test_counts = count_images_in_classes('D:/LALA/chest_xray/test')

print("Train class distribution:", train_counts)
print("Validation class distribution:", val_counts)
print("Test class distribution:", test_counts)


Train class distribution: {'Infected': 3895, 'Not Infected': 1341}
Validation class distribution: {'Infected': 8, 'Not Infected': 8}
Test class distribution: {'Infected': 390, 'Not Infected': 234}
