In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import optuna
from optuna.integration import TFKerasPruningCallback
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
import os
from PIL import Image
import cv2

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

In [17]:
# Loading data
labels = ['PNEUMONIA', 'NORMAL']
img_size = 150

def get_training_data(data_dir):
    data = []
    for label in labels:
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)
        for img in os.listdir(path):
            try:
                img_path = os.path.join(path, img)
                img_arr = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                if img_arr is None:
                    continue
                resized_arr = cv2.resize(img_arr, (img_size, img_size))
                data.append([resized_arr, class_num])
            except Exception as e:
                print(f"Error processing {img_path}: {e}")
    return np.array(data, dtype=object)

In [19]:
# Load data
train = get_training_data(r'C:\Users\danie\Leiden_Scripts\Datascience2025_A2\data\chest_xray\train')
test = get_training_data(r'C:\Users\danie\Leiden_Scripts\Datascience2025_A2\data\chest_xray\test')
val = get_training_data(r'C:\Users\danie\Leiden_Scripts\Datascience2025_A2\data\chest_xray\val')

# Prepare data for training
X_train = np.array([i[0] for i in train]).reshape(-1, img_size, img_size, 1)
y_train = np.array([i[1] for i in train])
X_test = np.array([i[0] for i in test]).reshape(-1, img_size, img_size, 1)
y_test = np.array([i[1] for i in test])
X_val = np.array([i[0] for i in val]).reshape(-1, img_size, img_size, 1)
y_val = np.array([i[1] for i in val])

In [21]:
def create_model(trial):
    """Create a CNN model with hyperparameters to be optimized"""
    # Hyperparameters to optimize
    n_layers = trial.suggest_int('n_layers', 2, 5)
    initial_filters = trial.suggest_int('initial_filters', 16, 64)
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
    learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64])
    
    model = keras.Sequential()
    
    # First layer
    model.add(layers.Conv2D(initial_filters, (3, 3), activation='relu', input_shape=(150, 150, 1)))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Additional layers
    for i in range(n_layers - 1):
        filters = initial_filters * (2 ** (i + 1))
        model.add(layers.Conv2D(filters, (3, 3), activation='relu', padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Dropout(dropout_rate))
    
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.Dense(1, activation='sigmoid'))
    
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer,
                 loss='binary_crossentropy',
                 metrics=['accuracy'])
    
    return model, batch_size

In [24]:
def objective(trial):
    """Objective function for Optuna optimization"""
    # Create model
    model, batch_size = create_model(trial)
    
    # Data augmentation
    datagen = ImageDataGenerator(
        rotation_range=trial.suggest_int('rotation_range', 0, 30),
        width_shift_range=trial.suggest_float('width_shift_range', 0, 0.2),
        height_shift_range=trial.suggest_float('height_shift_range', 0, 0.2),
        horizontal_flip=trial.suggest_categorical('horizontal_flip', [True, False]),
        fill_mode='nearest'
    )
    
    # Training
    history = model.fit(
        datagen.flow(X_train, y_train, batch_size=batch_size),
        epochs=10,
        validation_data=(X_val, y_val),
        callbacks=[
            TFKerasPruningCallback(trial, 'val_accuracy'),
            keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=3)
        ]
    )
    
    return history.history['val_accuracy'][-1]

In [25]:
# Create study and optimize
study = optuna.create_study(direction='maximize', pruner=optuna.pruners.MedianPruner())
study.optimize(objective, n_trials=50)

[I 2025-04-11 16:18:18,910] A new study created in memory with name: no-name-6cfb3385-41f8-4603-b2c3-627bd1aeb463
  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
[W 2025-04-11 16:18:19,154] Trial 0 failed with parameters: {'n_layers': 4, 'initial_filters': 52, 'dropout_rate': 0.16658576928283475, 'learning_rate': 0.00020426893120736829, 'batch_size': 16} because of the following error: NameError("name 'ImageDataGenerator' is not defined").
Traceback (most recent call last):
  File "c:\Users\danie\Leiden_Scripts\Datascience2025_A2\datascience_env\Lib\site-packages\optuna\study\_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "C:\Users\danie\AppData\Local\Temp\ipykernel_12080\3338140986.py", line 7, in objective
    datagen = ImageDataGenerator(
              ^^^^^^^^^^^^^^^^^^
NameError: name 'ImageDataGenerator' is not defined
[W 2025-04-11 16:18:19,155] Trial 0 failed with value None.


NameError: name 'ImageDataGenerator' is not defined

In [None]:
# Print best parameters
print("Best trial:")
trial = study.best_trial
print("  Value: ", trial.value)
print("  Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

In [None]:
# Visualize optimization history
optuna.visualization.plot_optimization_history(study)
optuna.visualization.plot_param_importances(study)
optuna.visualization.plot_parallel_coordinate(study)

In [None]:
# Train final model with best parameters
best_params = study.best_params
model, batch_size = create_model(study.best_trial)

# Load all data
(x_train, y_train), (x_val, y_val), (x_test, y_test) = get_training_data()

# Data augmentation with best parameters
datagen = keras.preprocessing.image.ImageDataGenerator(
    rotation_range=best_params['rotation_range'],
    width_shift_range=best_params['width_shift_range'],
    height_shift_range=best_params['height_shift_range'],
    horizontal_flip=best_params['horizontal_flip'],
    fill_mode='nearest'
)

# Train final model
history = model.fit(
    datagen.flow(x_train, y_train, batch_size=batch_size),
    epochs=20,
    validation_data=(x_val, y_val),
    callbacks=[
        keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5),
        keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', patience=2, factor=0.3)
    ]
)

# Evaluate on test set
test_loss, test_accuracy = model.evaluate(x_test, y_test)
print(f"Test accuracy: {test_accuracy*100:.2f}%")

In [None]:
# Import additional metrics
from sklearn.metrics import confusion_matrix, classification_report, precision_recall_fscore_support
import seaborn as sns

# Function to evaluate and plot metrics
def evaluate_model(model, x_data, y_data, title_prefix):
    # Get predictions
    y_pred = (model.predict(x_data) > 0.5).astype(int)
    
    # Calculate metrics
    precision, recall, f1, _ = precision_recall_fscore_support(y_data, y_pred, average='binary')
    
    # Create confusion matrix
    cm = confusion_matrix(y_data, y_pred)
    
    # Plot confusion matrix
    plt.figure(figsize=(10, 4))
    
    plt.subplot(1, 2, 1)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'{title_prefix} Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    
    # Plot metrics
    plt.subplot(1, 2, 2)
    metrics = ['Precision', 'Recall', 'F1 Score']
    values = [precision, recall, f1]
    plt.bar(metrics, values)
    plt.title(f'{title_prefix} Metrics')
    plt.ylim(0, 1)
    for i, v in enumerate(values):
        plt.text(i, v + 0.02, f'{v:.3f}', ha='center')
    
    plt.tight_layout()
    plt.show()
    
    # Print detailed classification report
    print(f"\n{title_prefix} Classification Report:")
    print(classification_report(y_data, y_pred))
    
    return precision, recall, f1

# Evaluate CNN model on train and test sets
print("\nEvaluating CNN Model:")
train_metrics = evaluate_model(model, x_train, y_train, "Train")
test_metrics = evaluate_model(model, x_test, y_test, "Test")

# Load and evaluate the alternative ML model (Random Forest)
from sklearn.ensemble import RandomForestClassifier
import joblib

# Load the saved Random Forest model
rf_model = joblib.load('../models/random_forest_model.joblib')

# Reshape data for Random Forest (flatten images)
x_train_flat = x_train.reshape(x_train.shape[0], -1)
x_test_flat = x_test.reshape(x_test.shape[0], -1)

# Evaluate Random Forest model
print("\nEvaluating Random Forest Model:")
rf_train_metrics = evaluate_model(rf_model, x_train_flat, y_train, "Train")
rf_test_metrics = evaluate_model(rf_model, x_test_flat, y_test, "Test")

# Compare models
print("\nModel Comparison Summary:")
print("CNN Model:")
print(f"Train - Precision: {train_metrics[0]:.3f}, Recall: {train_metrics[1]:.3f}, F1: {train_metrics[2]:.3f}")
print(f"Test  - Precision: {test_metrics[0]:.3f}, Recall: {test_metrics[1]:.3f}, F1: {test_metrics[2]:.3f}")
print("\nRandom Forest Model:")
print(f"Train - Precision: {rf_train_metrics[0]:.3f}, Recall: {rf_train_metrics[1]:.3f}, F1: {rf_train_metrics[2]:.3f}")
print(f"Test  - Precision: {rf_test_metrics[0]:.3f}, Recall: {rf_test_metrics[1]:.3f}, F1: {rf_test_metrics[2]:.3f}")

In [None]:
# Plot training history
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training')
plt.plot(history.history['val_accuracy'], label='Validation')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training')
plt.plot(history.history['val_loss'], label='Validation')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()