In [1]:
import os
import zipfile as zf
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2

# Path to the repaired ZIP file
zip_file_path = "PartB_DFU_dataset - Copy.zip"
extract_path = "DFU_dataset"

if os.path.exists(zip_file_path):
    try:
        with zf.ZipFile(zip_file_path, 'r') as files:
            files.extractall(extract_path)
        print(f"Extraction completed successfully to '{extract_path}'")
    except zf.BadZipFile:
        print("Error: The ZIP file is corrupted.")
    except OSError as e:
        print(f"OS error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
else:
    print(f"Error: The file '{zip_file_path}' does not exist.")


2024-10-09 14:18:24.758642: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-09 14:18:24.776402: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-10-09 14:18:24.781854: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-10-09 14:18:24.795637: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Extraction completed successfully to 'DFU_dataset'


In [2]:
import numpy as np # linear algebra
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np
from PIL import Image
# Define the root directory where your image folders are located
root_directory = "DFU_dataset/PartB_DFU_dataset - Copy"

# Initialize lists to store image paths and corresponding class labels for both datasets
image_paths_ischaemia = []
categories_ischaemia = []
image_paths_infection = []
categories_infection = []

# Iterate over each class and its subdirectories
for class_name in ["Infection", "Ischaemia"]:
    for augmentation_type in ["Aug-Negative", "Aug-Positive"]:
        folder_path = os.path.join(root_directory, class_name, augmentation_type)
        category = f"{class_name.lower()}{'pov' if 'Positive' in augmentation_type else 'neg'}"
        
        # Iterate over image files in the current directory
        for file_name in os.listdir(folder_path):
            if file_name.endswith(".jpg"):  # Assuming images are jpg format
                image_path = os.path.join(folder_path, file_name)
                if class_name == "Ischaemia":
                    image_paths_ischaemia.append(image_path)
                    categories_ischaemia.append("ischemia" if "Positive" in augmentation_type else "non-ischemia")
                elif class_name == "Infection":
                    image_paths_infection.append(image_path)
                    categories_infection.append("infection" if "Positive" in augmentation_type else "non-infection")

# Create DataFrames for each dataset
df_ischaemia = pd.DataFrame({"category": categories_ischaemia, "image_path": image_paths_ischaemia})
df_infection = pd.DataFrame({"category": categories_infection, "image_path": image_paths_infection})

# Label encoding for Ischaemia dataset
label_encoder_ischaemia = LabelEncoder()
df_ischaemia['Class_Label'] = label_encoder_ischaemia.fit_transform(df_ischaemia['category'])
print("Ischaemia Class Mapping:")
for class_label, numerical_label in zip(df_ischaemia['category'].unique(), df_ischaemia['Class_Label'].unique()):
    print(f"{class_label}: {numerical_label}")

# Label encoding for Infection dataset
label_encoder_infection = LabelEncoder()
df_infection['Class_Label'] = label_encoder_infection.fit_transform(df_infection['category'])
print("Infection Class Mapping:")
for class_label, numerical_label in zip(df_infection['category'].unique(), df_infection['Class_Label'].unique()):
    print(f"{class_label}: {numerical_label}")

# Shuffle both DataFrames
df_ischaemia = df_ischaemia.sample(frac=1).reset_index(drop=True)
df_infection = df_infection.sample(frac=1).reset_index(drop=True)

# Helper function to load and process images
def load_images(df):
    images = []
    target_labels = []   
    for index, row in df.iterrows():
        image = Image.open(row['image_path'])
        image_array = np.array(image.resize((224, 224)))  # Resize image to fit MobileNet input size
        images.append(image_array)
        target_labels.append(row['Class_Label'])
    return np.array(images), np.array(target_labels)

# Load images for both datasets
images_ischaemia, target_labels_ischaemia = load_images(df_ischaemia)
images_infection, target_labels_infection = load_images(df_infection)

print("Shape of Ischaemia images array:", images_ischaemia.shape)
print("Shape of Ischaemia target labels array:", target_labels_ischaemia.shape)
print("Shape of Infection images array:", images_infection.shape)
print("Shape of Infection target labels array:", target_labels_infection.shape)

# Split the Ischaemia dataset
X_train_ischaemia, X_test_ischaemia, y_train_ischaemia, y_test_ischaemia = train_test_split(
    images_ischaemia, target_labels_ischaemia, test_size=0.3, random_state=42)
X_val_ischaemia, X_test_ischaemia, y_val_ischaemia, y_test_ischaemia = train_test_split(
    X_test_ischaemia, y_test_ischaemia, test_size=0.25, random_state=42)  # 0.25 * 0.3 = 0.075

# Split the Infection dataset
X_train_infection, X_test_infection, y_train_infection, y_test_infection = train_test_split(
    images_infection, target_labels_infection, test_size=0.3, random_state=42)
X_val_infection, X_test_infection, y_val_infection, y_test_infection = train_test_split(
    X_test_infection, y_test_infection, test_size=0.25, random_state=42)  # 0.25 * 0.3 = 0.075

print("Ischaemia Training set shape:", X_train_ischaemia.shape, y_train_ischaemia.shape)
print("Ischaemia Validation set shape:", X_val_ischaemia.shape, y_val_ischaemia.shape)
print("Ischaemia Test set shape:", X_test_ischaemia.shape, y_test_ischaemia.shape)
print("Infection Training set shape:", X_train_infection.shape, y_train_infection.shape)
print("Infection Validation set shape:", X_val_infection.shape, y_val_infection.shape)
print("Infection Test set shape:", X_test_infection.shape, y_test_infection.shape)

Ischaemia Class Mapping:
non-ischemia: 1
ischemia: 0
Infection Class Mapping:
non-infection: 1
infection: 0
Shape of Ischaemia images array: (9870, 224, 224, 3)
Shape of Ischaemia target labels array: (9870,)
Shape of Infection images array: (5890, 224, 224, 3)
Shape of Infection target labels array: (5890,)
Ischaemia Training set shape: (6909, 224, 224, 3) (6909,)
Ischaemia Validation set shape: (2220, 224, 224, 3) (2220,)
Ischaemia Test set shape: (741, 224, 224, 3) (741,)
Infection Training set shape: (4123, 224, 224, 3) (4123,)
Infection Validation set shape: (1325, 224, 224, 3) (1325,)
Infection Test set shape: (442, 224, 224, 3) (442,)


In [38]:
import numpy as np
from kerastuner import HyperModel, HyperParameters
from kerastuner.tuners import BayesianOptimization
from tensorflow.keras.applications import MobileNet
from tensorflow.keras.layers import Dropout, TimeDistributed, Flatten, LSTM, Dense, BatchNormalization
from tensorflow.keras.models import Sequential
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
import pandas as pd
import time

# Define the model-building function
def build_model(hp):
    base_model = MobileNet(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
    for layer in base_model.layers:
        layer.trainable = False
    
    model = Sequential([
        base_model,
        TimeDistributed(Flatten()),
        LSTM(hp.Int('lstm_units_1', min_value=60, max_value=80, step=30), 
             dropout=hp.Float('dropout_1', min_value=0.2, max_value=0.3, step=0.1), 
             return_sequences=True),
        LSTM(hp.Int('lstm_units_2', min_value=30, max_value=60, step=10), 
             dropout=hp.Float('dropout_2', min_value=0.2, max_value=0.3, step=0.1), 
             return_sequences=True),
        LSTM(hp.Int('lstm_units_3', min_value=10, max_value=30, step=10), 
             dropout=0.2, 
             return_sequences=False),
        Dense(84, activation='relu'),
        Dropout(0.3),
        BatchNormalization(),
        Dense(32, activation='relu'),
        Dropout(0.2),
        Dense(3, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.005),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

# Instantiate the tuner
tuner = BayesianOptimization(
    build_model,
    objective='val_accuracy',
    max_trials=5,
    directory='LSTM_mobileNet_tuning',
    project_name='Bay_ischaemia_tuning'  # Updated project name
)

# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, min_lr=1e-6)

# List to store trial results
trial_results = []

# Custom callback to log each trial with hyperparameters in separate columns
class TrialLogger(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        self.start_time = time.time()

    def on_train_end(self, logs=None):
        elapsed_time = time.time() - self.start_time
        
        # Get the current trial's hyperparameters
        trial_hyperparams = {key: value for key, value in self.params.items()}
        
        # Create a trial info dictionary with hyperparameters as separate columns
        trial_info = {
            'trial_number': len(trial_results) + 1,
            'accuracy': logs['accuracy'],
            'loss': logs['loss'],
            'val_accuracy': logs['val_accuracy'],
            'val_loss': logs['val_loss'],
            'training_time': elapsed_time,
        }

        # Add hyperparameters to the trial_info
        for key, value in trial_hyperparams.items():
            trial_info[key] = value  # Each hyperparameter gets its own column

        trial_results.append(trial_info)

# Run the search and log trial results
tuner.search(X_train_ischaemia, y_train_ischaemia, epochs=10,  # Updated variable names
             validation_data=(X_val_ischaemia, y_val_ischaemia),  # Updated variable names
             callbacks=[reduce_lr, early_stopping, TrialLogger()])

# Convert results to a DataFrame for better visualization
trial_df = pd.DataFrame(trial_results)

# Print the results for each trial
print(trial_df)


Trial 5 Complete [00h 01m 03s]
val_accuracy: 0.8090090155601501

Best val_accuracy So Far: 0.8135135173797607
Total elapsed time: 00h 05m 20s
   trial_number  accuracy      loss  val_accuracy  val_loss  training_time  \
0             1  0.816471  0.410784      0.804955  0.429463      53.363787   
1             2  0.814734  0.413955      0.794595  0.430587      53.402954   
2             3  0.798668  0.435290      0.776577  0.488355      53.380224   
3             4  0.816905  0.410006      0.813514  0.409536      53.417816   
4             5  0.822261  0.405351      0.809009  0.411479      53.130682   

  verbose  epochs  steps  
0    auto      10    216  
1    auto      10    216  
2    auto      10    216  
3    auto      10    216  
4    auto      10    216  


In [None]:
# Get the best hyperparameters and build the best model
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
best_model = tuner.hypermodel.build(best_hps)

# Fit the best model
history = best_model.fit(X_train_ischaemia, y_train_ischaemia, epochs=20, batch_size=32, 
                          validation_data=(X_val_ischaemia, y_val_ischaemia), 
                          callbacks=[early_stopping, reduce_lr])

# Evaluate the best model
test_loss, test_accuracy = best_model.evaluate(X_test_ischaemia, y_test_ischaemia)

# Predictions and classification metrics
y_pred = best_model.predict(X_test_ischaemia)
y_pred_classes = np.argmax(y_pred, axis=1)

# Calculate precision, recall, and F1 score
precision = precision_score(y_test_ischaemia, y_pred_classes, average='macro')
recall = recall_score(y_test_ischaemia, y_pred_classes, average='macro')
f1 = f1_score(y_test_ischaemia, y_pred_classes, average='macro')


Epoch 1/20
[1m216/216[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 27ms/step - accuracy: 0.6303 - loss: 0.7123 - val_accuracy: 0.6734 - val_loss: 0.5887 - learning_rate: 0.0050
Epoch 2/20
[1m216/216[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 21ms/step - accuracy: 0.7226 - loss: 0.5610 - val_accuracy: 0.7378 - val_loss: 0.5525 - learning_rate: 0.0050
Epoch 3/20
[1m216/216[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 21ms/step - accuracy: 0.7461 - loss: 0.5263 - val_accuracy: 0.7608 - val_loss: 0.4976 - learning_rate: 0.0050
Epoch 4/20
[1m216/216[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 21ms/step - accuracy: 0.7672 - loss: 0.4957 - val_accuracy: 0.7482 - val_loss: 0.5156 - learning_rate: 0.0050
Epoch 5/20
[1m216/216[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 22ms/step - accuracy: 0.7590 - loss: 0.5070 - val_accuracy: 0.7626 - val_loss: 0.4757 - learning_rate: 0.0050
Epoch 6/20
[1m216/216[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0

In [None]:
# Store best trial results
best_trial = {
    'Accuracy (%)': test_accuracy * 100,
    'Loss (%)': test_loss * 100,
    'Training Time (s)': trial_df['training_time'].iloc[trial_df['val_accuracy'].idxmax()],
    'Precision': precision,
    'Recall': recall,
    'F1 Score': f1,
}

# Display the best trial metrics
print("\nBest Trial Results:")
print(best_trial)