In [19]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import shutil
from pathlib import Path

In [20]:
# Set random seed for reproducibility
tf.random.set_seed(42)

# Configuration parameters
IMG_HEIGHT = 224
IMG_WIDTH = 224
BATCH_SIZE = 32
EPOCHS = 50
NUM_CLASSES = 2

In [21]:
# Source directories
main_dir = '../data/Corona'  # Replace with your directory path
covid_dir = os.path.join(main_dir, 'covid')
normal_dir = os.path.join(main_dir, 'normal')

In [22]:
# Function to create train/val/test splits
def create_data_splits(base_dir):
    # Create directories for splits
    splits = ['train', 'validation', 'test']
    classes = ['covid', 'normal']
    
    for split in splits:
        for cls in classes:
            os.makedirs(os.path.join(base_dir, split, cls), exist_ok=True)

In [23]:
def prepare_dataset():
    # Create temporary directory for splits
    temp_dir = 'temp_dataset'
    os.makedirs(temp_dir, exist_ok=True)
    create_data_splits(temp_dir)
    
    # Function to split and copy files
    def process_class_files(class_dir, class_name):
        files = [f for f in os.listdir(class_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        # First split: train + remaining
        train_files, temp_files = train_test_split(files, test_size=0.3, random_state=42)
        # Second split: validation + test
        val_files, test_files = train_test_split(temp_files, test_size=0.5, random_state=42)
        
        # Copy files to respective directories
        for file, split in zip([train_files, val_files, test_files], ['train', 'validation', 'test']):
            for f in file:
                src = os.path.join(class_dir, f)
                dst = os.path.join(temp_dir, split, class_name, f)
                shutil.copy2(src, dst)
                
        return len(train_files), len(val_files), len(test_files)
    # Process both classes
    covid_counts = process_class_files(covid_dir, 'covid')
    normal_counts = process_class_files(normal_dir, 'normal')
    
    print("Dataset split complete:")
    print(f"COVID images - Train: {covid_counts[0]}, Validation: {covid_counts[1]}, Test: {covid_counts[2]}")
    print(f"Normal images - Train: {normal_counts[0]}, Validation: {normal_counts[1]}, Test: {normal_counts[2]}")
    
    return temp_dir

In [24]:
# Prepare the dataset
dataset_dir = prepare_dataset()

Dataset split complete:
COVID images - Train: 48, Validation: 10, Test: 11
Normal images - Train: 17, Validation: 4, Test: 4


In [25]:
# Data Augmentation for training
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

In [26]:
# Only rescaling for validation and test
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

In [27]:
# Create data generators
train_generator = train_datagen.flow_from_directory(
    os.path.join(dataset_dir, 'train'),
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary'
)

Found 65 images belonging to 2 classes.


In [28]:
validation_generator = val_datagen.flow_from_directory(
    os.path.join(dataset_dir, 'validation'),
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary'
)

Found 14 images belonging to 2 classes.


In [29]:
test_generator = test_datagen.flow_from_directory(
    os.path.join(dataset_dir, 'test'),
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary'
)

Found 15 images belonging to 2 classes.


In [30]:
def build_model():
    # Base model - Using EfficientNetB0 as feature extractor
    base_model = tf.keras.applications.EfficientNetB0(
        include_top=False,
        weights='imagenet',
        input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)
    )
    
    # Freeze the base model
    base_model.trainable = False

    # Create the model
    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.3),
        layers.Dense(128, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.Dense(1, activation='sigmoid')
    ])
    
    return model

In [31]:
# Create model
model = build_model()

In [32]:
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy', tf.keras.metrics.AUC(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
)

In [33]:
# Callbacks
checkpoint = ModelCheckpoint(
    'best_model.h5',
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    verbose=1
)

In [34]:
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

In [35]:
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=1e-6,
    verbose=1
)

In [36]:
# Train the model
history = model.fit(
    train_generator,
    epochs=EPOCHS,
    validation_data=validation_generator,
    callbacks=[checkpoint, early_stopping, reduce_lr]
)

Epoch 1/50
Epoch 1: val_loss improved from inf to 0.70916, saving model to best_model.h5


TypeError: Unable to serialize [2.0896919 2.1128857 2.1081853] to JSON. Unrecognized type <class 'tensorflow.python.framework.ops.EagerTensor'>.

In [37]:
def plot_training_history(history):
    # Convert EagerTensor to numpy arrays if needed
    def convert_to_numpy(history_dict):
        numpy_history = {}
        for key, value in history_dict.items():
            numpy_history[key] = np.array(value)
        return numpy_history
    
    history_dict = convert_to_numpy(history.history)
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot accuracy
    ax1.plot(history_dict['accuracy'])
    ax1.plot(history_dict['val_accuracy'])
    ax1.set_title('Model Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend(['Train', 'Validation'])
    
    # Plot loss
    ax2.plot(history_dict['loss'])
    ax2.plot(history_dict['val_loss'])
    ax2.set_title('Model Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend(['Train', 'Validation'])
    
    plt.tight_layout()
    plt.show()

In [38]:
# Evaluate on test set
def evaluate_model(model, test_generator):
    # Get predictions
    predictions = []
    labels = []
    
    # Reset the generator
    test_generator.reset()
    
    # Predict in batches
    for i in range(len(test_generator)):
        x, y = test_generator[i]
        pred = model.predict(x)
        predictions.extend(pred)
        labels.extend(y)
        
        if len(labels) >= len(test_generator.labels):
            break
    
    predictions = np.array(predictions)
    labels = np.array(labels[:len(test_generator.labels)])
    
    # Convert predictions to binary
    y_pred = (predictions > 0.5).astype(int)
    y_true = labels
    
    # Calculate metrics
    test_loss, test_accuracy, test_auc, test_precision, test_recall = model.evaluate(test_generator)
    
    print("\nTest Results:")
    print(f"Loss: {test_loss:.4f}")
    print(f"Accuracy: {test_accuracy:.4f}")
    print(f"AUC: {test_auc:.4f}")
    print(f"Precision: {test_precision:.4f}")
    print(f"Recall: {test_recall:.4f}")
    
    # Plot confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()
    
    # Print classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=['Normal', 'COVID']))
    
    return y_true, y_pred

In [39]:
# Save model function
def save_model_and_weights(model, model_name='covid19_detection_model'):
    # Save model architecture as JSON
    model_json = model.to_json()
    with open(f"{model_name}.json", "w") as json_file:
        json_file.write(model_json)
    
    # Save weights
    model.save_weights(f"{model_name}_weights.h5")
    
    print(f"Model saved as {model_name}.json and {model_name}_weights.h5")

In [40]:
# Fine-tuning
def fine_tune_model():
    # Unfreeze some layers of the base model
    base_model = model.layers[0]
    base_model.trainable = True
    
    # Freeze all layers except the last 30
    for layer in base_model.layers[:-30]:
        layer.trainable = False
    
    # Recompile model with a lower learning rate
    model.compile(
        optimizer=optimizers.Adam(learning_rate=1e-5),
        loss='binary_crossentropy',
        metrics=['accuracy', tf.keras.metrics.AUC(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
    )
    
    return model

In [41]:
# After training, use these functions:
try:
    # Plot training history
    plot_training_history(history)
    
    # Evaluate model
    y_true, y_pred = evaluate_model(model, test_generator)
    
    # Save model
    save_model_and_weights(model)
    
    # Fine-tune model
    fine_tuned_model = fine_tune_model()
    history_fine_tune = fine_tuned_model.fit(
        train_generator,
        epochs=20,
        validation_data=validation_generator,
        callbacks=[checkpoint, early_stopping, reduce_lr]
    )
    
    # Plot fine-tuning history
    plot_training_history(history_fine_tune)
    
    # Evaluate fine-tuned model
    print("\nFine-tuned Model Evaluation:")
    y_true_ft, y_pred_ft = evaluate_model(fine_tuned_model, test_generator)
    
    # Save fine-tuned model
    save_model_and_weights(fine_tuned_model, 'covid19_detection_model_finetuned')
    
except Exception as e:
    print(f"An error occurred: {str(e)}")
    
finally:
    # Clean up temporary directory
    try:
        shutil.rmtree(dataset_dir)
    except:
        print("Error while deleting temporary directory")

An error occurred: name 'history' is not defined


In [42]:
# Fine-tune the model
fine_tuned_model = fine_tune_model()
history_fine_tune = fine_tuned_model.fit(
    train_generator,
    epochs=20,
    validation_data=validation_generator,
    callbacks=[checkpoint, early_stopping, reduce_lr]
)

FileNotFoundError: [Errno 2] No such file or directory: 'temp_dataset\\train\\normal\\NORMAL2-IM-1142-0001-0001.jpeg'