## Pipeline for the Mixed-groups image classification

In [None]:
###### NB. This notebook can be run in one go or can be exported to a .py file and will also run ######
##### NB. This will copy the images and save them to a new dir where it then randomly samples and arranges them into train, val, test folders #####
#### NB. Produces model performance metrics in a .csv, Gradcam images and saliency map images ####
### NB. the VGG16 additonal layers were tuned using 'keras-tuner' ###

In [None]:
# Standard library imports
import os
import random
import math
import shutil
from datetime import datetime

# Data science imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import cm
from sklearn.metrics import confusion_matrix, classification_report

# Data augmentation
import Augmentor

# TensorFlow and Keras imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, backend, optimizers, regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.callbacks import (
    ModelCheckpoint, 
    LearningRateScheduler, 
    TensorBoard, 
    EarlyStopping
)
from tensorflow.keras.layers import (
    Conv2D, 
    MaxPooling2D, 
    ZeroPadding2D, 
    Activation, 
    Flatten, 
    Dense, 
    Dropout, 
    GlobalAveragePooling2D, 
    BatchNormalization
)
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input

# Keras Tuner
import keras_tuner as kt

# Visualization tools
from tf_keras_vis.gradcam import Gradcam
from tf_keras_vis.saliency import Saliency
from tf_keras_vis.utils.model_modifiers import ReplaceToLinear

# Initialize timestamp
date = datetime.now().strftime('%Y_%m_%d-%I:%M_%S_%p')

In [None]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # or set it to '3' to suppress all messages, including INFO and WARNING

print("Number of GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))


In [None]:

# Seed for reproducibility
SEED = 666

# Function to initialize seeds for all libraries which might have stochastic behavior
def set_seeds(seed=SEED):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    tf.random.set_seed(seed)
    np.random.seed(seed)

# Function to ensure partial determinism
def set_partial_determinism(seed=SEED):
    set_seeds(seed=seed)
    
    # Comment out the line below if you are facing issues with deterministic operations
    # os.environ['TF_DETERMINISTIC_OPS'] = '1'
    
    # Use the following lines if you want to use CPU for deterministic operations
    # tf.config.set_visible_devices([], 'GPU')

# Call the above function with seed value
set_partial_determinism(seed=SEED)

# Ensure XLA is disabled
tf.config.optimizer.set_jit(False)


In [None]:
from datetime import datetime

date = datetime.now().strftime('%Y_%m_%d-%I:%M_%S_%p')

In [None]:


# Define the path to the root directory where the subdirectories are located
root_dir = 'LS/ventral/' ##etc


# Define the path to the new directory to create
new_dir = "../save_offs/LS_ventral_MIXED/pt1"



In [None]:
##### NB. I set this up to copy only subdirs with at least 150 images to new dir. Good for pruning small classes #####


# Create the new directory if it does not exist
if not os.path.exists(new_dir):
    os.makedirs(new_dir)

# Iterate through the subdirectories in the root directory
for subdir in os.listdir(root_dir):
    # Construct the full path to the subdirectory
    subdir_path = os.path.join(root_dir, subdir)
    
    # Check if the subdirectory contains at least 200 images
    if len(os.listdir(subdir_path)) >= 1:
        # Copy the subdirectory and its contents to the new directory
        shutil.copytree(subdir_path, os.path.join(new_dir, subdir))


In [None]:
##### NB. The following code prunes images from specific species-location combinations to ensure no more than 100 images per combination #####


pruned_dir = "../save_offs/LS_ventral_MIXED/pt2/pruned_directory/"
if not os.path.exists(pruned_dir):
    os.makedirs(pruned_dir)

# Species and their location codes for pruning
prune_specs = {
    'fv': ['pa'],
    'lc': ['ba', 'pa', 'sj'],
    'ls': ['pdc', 'pm']
}

# Iterate through the subdirectories in the new directory
for subdir in os.listdir(new_dir):
    subdir_path = os.path.join(new_dir, subdir)
    
    # Ensure the path is a directory
    if os.path.isdir(subdir_path):
        # Iterate through the files in each subdirectory
        species_images = {}
        
        for filename in os.listdir(subdir_path):
            # Check if the file is an image
            if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
                # Extract species code and location code from filename
                parts = filename.split('_')
                if len(parts) >= 3:
                    species_code = parts[0]
                    location_code = parts[1]

                    # Check if the species needs pruning at this location
                    if species_code in prune_specs and location_code in prune_specs[species_code]:
                        # Add the image path to the dictionary for this species-location combination
                        key = f"{species_code}_{location_code}"
                        if key not in species_images:
                            species_images[key] = []
                        species_images[key].append(os.path.join(subdir_path, filename))
        
        # Prune images to move any excess images beyond 100 per species-location
        for key, images in species_images.items():
            if len(images) > 100:
                # Randomly select images to move to pruned directory
                images_to_move = random.sample(images, len(images) - 100)

                # Create a directory in pruned_dir to save the pruned images
                species_code, location_code = key.split('_')
                save_dir = os.path.join(pruned_dir, species_code, location_code)
                if not os.path.exists(save_dir):
                    os.makedirs(save_dir)

                # Move the excess images to the pruned directory
                for image in images_to_move:
                    shutil.move(image, save_dir)

In [None]:
# Define source directory and destination directory paths
src_dir = "../save_offs/LS_ventral_MIXED/pt1"  # Update to the correct source directory path
dst_dir = "../save_offs/LS_ventral_MIXED/pt2"

# Check if the source directory exists
if not os.path.exists(src_dir):
    raise FileNotFoundError(f"Source directory '{src_dir}' does not exist. Please check the path and try again.")

# Configurable number of training and validation samples
num_train_samples_per_class = 120  # Changeable parameter for training samples
num_val_samples_per_class = 30     # Changeable parameter for validation samples

# Create train, validation, and test directories if they do not exist
train_dir = os.path.join(dst_dir, "train")
val_dir = os.path.join(dst_dir, "validation")
test_dir = os.path.join(dst_dir, "test")
for d in [train_dir, val_dir, test_dir]:
    if not os.path.exists(d):
        os.makedirs(d)

# Get the list of classes (subdirectories) in the source directory
classes = [subdir for subdir in os.listdir(src_dir) if os.path.isdir(os.path.join(src_dir, subdir))]
num_classes = len(classes)

# Create new mixed class directories in train and validation folders (test remains original)
mixed_class_names = [f"mixed_class_{i+1}" for i in range(num_classes)]
for mixed_class in mixed_class_names:
    for base_dir in [train_dir, val_dir]:
        mixed_class_dir = os.path.join(base_dir, mixed_class)
        if not os.path.exists(mixed_class_dir):
            os.makedirs(mixed_class_dir)

# Loop through each class and gather equal numbers of images
class_images = {}
for class_name in classes:
    class_path = os.path.join(src_dir, class_name)
    image_files = [f for f in os.listdir(class_path) if f.endswith(".jpg")]
    class_images[class_name] = image_files

# Determine the minimum number of images available across all classes
min_images_per_class = min(len(images) for images in class_images.values())

# Calculate the number of images to use per set (train/val/test) from each class
num_train = min(num_train_samples_per_class, min_images_per_class)
num_val = min(num_val_samples_per_class, min_images_per_class - num_train)
num_test = min(min_images_per_class - num_train - num_val, min_images_per_class)

# Sample images from each class and distribute them into mixed classes for training and validation, and original classes for test
for class_name, images in class_images.items():
    random.shuffle(images)
    train_samples = images[:num_train]
    val_samples = images[num_train:num_train + num_val]
    test_samples = images[num_train + num_val:num_train + num_val + num_test]

    # Distribute images into mixed classes for training and validation
    for i, mixed_class in enumerate(mixed_class_names):
        # Assign training images
        train_dst_dir = os.path.join(train_dir, mixed_class)
        for img in train_samples[i::num_classes]:  # Distribute evenly across mixed classes
            src_path = os.path.join(src_dir, class_name, img)
            dst_path = os.path.join(train_dst_dir, img)
            shutil.copy(src_path, dst_path)

        # Assign validation images
        val_dst_dir = os.path.join(val_dir, mixed_class)
        for img in val_samples[i::num_classes]:
            src_path = os.path.join(src_dir, class_name, img)
            dst_path = os.path.join(val_dst_dir, img)
            shutil.copy(src_path, dst_path)

    # Assign test images to original test class directories
    test_dst_dir = os.path.join(test_dir, class_name)
    if not os.path.exists(test_dst_dir):
        os.makedirs(test_dst_dir)
    for img in test_samples:
        src_path = os.path.join(src_dir, class_name, img)
        dst_path = os.path.join(test_dst_dir, img)
        shutil.copy(src_path, dst_path)


In [None]:


# Create a new directory called "test_even" within the destination directory
test_dir = os.path.join(dst_dir, "test")
test_even_dir = os.path.join(dst_dir, "test_even")
if not os.path.exists(test_even_dir):
    os.makedirs(test_even_dir)




In [None]:

# Define paths
pruned_dir = "../save_offs/LS_ventral_MIXED/pt2/pruned_directory/"
test_dir = "../save_offs/LS_ventral_MIXED/pt2/test/"

# Ensure the pruned directory exists
if not os.path.exists(pruned_dir):
    raise FileNotFoundError(f"Pruned directory does not exist: {pruned_dir}")

# Get the 'south_' subdirectory
south_subdir = next((d for d in os.listdir(test_dir) if 'south' in d.lower() and os.path.isdir(os.path.join(test_dir, d))), None)

if not south_subdir:
    raise FileNotFoundError("No target subdirectory found in the test directory with 'south' in the name.")

# Debug: print the identified 'south_' subdirectory
print(f"Identified target subdirectory for moving images: {south_subdir}")

# Organize pruned images back into the 'south_' test set folder
for root, dirs, files in os.walk(pruned_dir):
    for filename in files:
        if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
            src_path = os.path.join(root, filename)
            dst_path = os.path.join(test_dir, south_subdir, filename)

            try:
                # Move the file to the destination directory
                print(f"Attempting to move {src_path} to {dst_path}")
                shutil.move(src_path, dst_path)
                print(f"Moved {src_path} to {dst_path}")
            except PermissionError as pe:
                print(f"Permission error while moving {src_path} to {dst_path}: {pe}")
            except FileNotFoundError as fnfe:
                print(f"File not found error while moving {src_path} to {dst_path}: {fnfe}")
            except Exception as e:
                print(f"Failed to move {src_path} to {dst_path}: {e}")

# Verify that no files remain in the pruned directory
def verify_pruned_directory_empty(pruned_dir):
    for root, dirs, files in os.walk(pruned_dir):
        if files:
            print(f"Files still remaining in pruned directory: {root}")
            for file in files:
                print(f" - {file}")
        else:
            print(f"No files remaining in pruned directory: {root}")

verify_pruned_directory_empty(pruned_dir)


In [None]:
# Loop through subdirectories in test directory
for subdir in os.listdir(test_dir):
    subdir_path = os.path.join(test_dir, subdir)
    if os.path.isdir(subdir_path):
        # Create identical subdirectories within test_even directory
        test_even_subdir = os.path.join(test_even_dir, subdir)
        if not os.path.exists(test_even_subdir):
            os.makedirs(test_even_subdir)

        # Randomly sample 20 images from the subdirectory
        image_files = [f for f in os.listdir(subdir_path) if f.endswith(".jpg")]
        num_images = len(image_files)
        if num_images < 20:
            num_samples = num_images
        else:
            num_samples = 20
        sample_files = random.sample(image_files, num_samples)

        # Copy sampled images to the test_even subdirectory
        for sample_file in sample_files:
            src_path = os.path.join(subdir_path, sample_file)
            dst_path = os.path.join(test_even_subdir, sample_file)
            shutil.copy(src_path, dst_path)

In [None]:

dir = '../save_offs/LS_ventral_MIXED/pt2/train/'

# Get a list of all subdirectories within the main directory
subdirs = [d for d in os.listdir(dir) if os.path.isdir(os.path.join(dir, d))]

# Loop over each subdirectory and apply augmentations
for subdir in subdirs:
    subdir_path = os.path.join(dir, subdir)
    p = Augmentor.Pipeline(subdir_path, output_directory='')
    p.flip_left_right(probability=0.5)
    p.flip_top_bottom(probability=0.5)
    #p.rotate(probability=0.9,max_left_rotation=18,max_right_rotation=18)
    p.rotate90(probability=0.5)
    p.rotate180(probability=0.5)
    p.rotate270(probability=0.5)
    #p.scale(probability=0.5, scale_factor=1.5)
    p.random_brightness(probability=0.5,min_factor=0.5,max_factor=1.5)
    p.random_contrast(probability=0.5,min_factor=0.5,max_factor=1.5)

    # Get the number of files in the directory
    num_files = len(os.listdir(subdir_path))

    # Sample additional images if necessary
    num_samples = 2400 - num_files
    if num_samples > 0:
        p.sample(num_samples)


In [None]:
dir = '../save_offs/LS_ventral_MIXED/pt2/validation/'

# Get a list of all subdirectories within the main directory
subdirs = [d for d in os.listdir(dir) if os.path.isdir(os.path.join(dir, d))]

# Loop over each subdirectory and apply augmentations
for subdir in subdirs:
    subdir_path = os.path.join(dir, subdir)
    p = Augmentor.Pipeline(subdir_path, output_directory='')
    p.flip_left_right(probability=0.5)
    p.flip_top_bottom(probability=0.5)
    #p.rotate(probability=0.9,max_left_rotation=18,max_right_rotation=18)
    p.rotate90(probability=0.5)
    p.rotate180(probability=0.5)
    p.rotate270(probability=0.5)
    #p.scale(probability=0.5, scale_factor=1.5)
    p.random_brightness(probability=0.5,min_factor=0.5,max_factor=1.5)
    p.random_contrast(probability=0.5,min_factor=0.5,max_factor=1.5)

    # Get the number of files in the directory
    num_files = len(os.listdir(subdir_path))

    # Sample additional images if necessary
    num_samples = 600 - num_files
    if num_samples > 0:
        p.sample(num_samples)

In [None]:
# Define the paths to your data directories
train_dir = '../save_offs/LS_ventral_MIXED/pt2/train/'
validation_dir = '../save_offs/LS_ventral_MIXED/pt2/validation/'
test_dir = '../save_offs/LS_ventral_MIXED/pt2/test_even/'
test_full_dir = '../save_offs/LS_ventral_MIXED/pt2/test/'


img_width, img_height = 224, 224
BATCHSIZE = 16

# Define a function to preprocess the images
def preprocess_image(image):
    image = tf.image.resize(image, (img_width, img_height))
    image = image / 255.0
    return image


In [None]:
# Load the train data
train_datagen = keras.preprocessing.image.ImageDataGenerator(
    preprocessing_function=preprocess_image
)

val_datagen = keras.preprocessing.image.ImageDataGenerator(
    preprocessing_function=preprocess_image
    )

In [None]:
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(img_width, img_height),
    batch_size=BATCHSIZE,
    class_mode='categorical',
    shuffle=True)

# Load the validation data

validation_generator = val_datagen.flow_from_directory(
    validation_dir,
    target_size=(img_width, img_height),
    batch_size=BATCHSIZE,
    class_mode='categorical',
    shuffle=False
)

# Load the test data
test_datagen = keras.preprocessing.image.ImageDataGenerator(preprocessing_function=preprocess_image)
test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(img_width, img_height),
    batch_size=BATCHSIZE,
    class_mode='categorical',
    shuffle=False
)



test_full_datagen = keras.preprocessing.image.ImageDataGenerator(preprocessing_function=preprocess_image)
test_full_generator = test_full_datagen.flow_from_directory(
    test_full_dir,
    target_size=(img_width, img_height),
    batch_size=BATCHSIZE,
    class_mode='categorical',
    shuffle=False
)


# Store the class names in a list
class_names = list(train_generator.class_indices.keys())
n_classes = len(class_names)
print(f'Class names: {class_names}')
print('Num of classes:', n_classes)

# Store the number of images in each set
train_set_size = train_generator.n
validation_set_size = validation_generator.n
test_set_size = test_generator.n
test_full_set_size = test_full_generator.n

print("Train set size:", train_set_size)
print("Validation set size:", validation_set_size)
print("Test set size:", test_set_size)
print('test_full_size:', test_full_set_size)

In [None]:
model = tf.keras.applications.VGG16(input_shape=(224, 224, 3),
                                   weights = 'imagenet',
                                   include_top = False
                                   )


X= model.layers[-1].output

# Additonal layers can be added and changed from here
X = tf.keras.layers.GlobalAveragePooling2D()(X)
X = tf.keras.layers.BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001)(X)
X = tf.keras.layers.Dropout(0.25)(X)
X = tf.keras.layers.Dense(288, activation='relu'
                         )(X)
X = tf.keras.layers.BatchNormalization()(X)
X = tf.keras.layers.Dropout(0.2)(X)

predictions = Dense(n_classes, activation="softmax"
                   )(X)


model_final = Model(model.input, predictions)

In [None]:
for layers in (model.layers)[:-1]:
    print(layers)
    layers.trainable = False
    
for layer in model_final.layers:
    if isinstance(layer, keras.layers.BatchNormalization):
        layer.trainable = False

In [None]:
for index, layer in enumerate(model_final.layers):
    print("Layer: {}, Trainable: {}".format(index, layer.trainable))


In [None]:
model_final.summary()

In [None]:
checkpoint_path = '../save_offs/LS_ventral_MIXED/limpet_aug_mk1'+date+'.keras'
checkpoint_dir = os.path.dirname(checkpoint_path)

#checkpoint = ModelCheckpoint(monitor='val_accuracy', verbose=1, save_best_only=True, save_weights_only=False, mode='auto', save_freq = 'epochs')
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath = checkpoint_path,save_best_only = True, monitor='val_accuracy' ,save_weights_only = False, verbose = 1)
early = EarlyStopping(monitor='val_accuracy', min_delta=0.01, patience=2, verbose=4, mode='max')

base_learning_rate = 0.0005

In [None]:


model_final.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              loss=tf.keras.losses.categorical_crossentropy,
              metrics=['accuracy',
                       #keras.metrics.SparseTopKCategoricalAccuracy(k=3)
                      ])


In [None]:
initial_epochs = 3

In [None]:
def lr_exp_decay(initial_epochs, lr):
    k = 0.01
    return base_learning_rate * math.exp(-k*initial_epochs)

In [None]:
history = model_final.fit(train_generator,
                          epochs= initial_epochs,
                          validation_data= validation_generator,
                          callbacks=[cp_callback,
                                     early,
                                    ],
                         )


In [None]:
test_loss, test_acc = model_final.evaluate(
    test_generator)

In [None]:
for layer in model_final.layers:
    if isinstance(layer, keras.layers.BatchNormalization):
        layer.trainable = False
    else:
        layer.trainable = True

for index, layer in enumerate(model_final.layers):
    print("Layer: {}, Trainable: {}".format(index, layer.trainable))

In [None]:
# Let's take a look to see how many layers are in the base model
print("Number of layers in the base model: ", len(model_final.layers))

# Fine-tune from this layer onwards
fine_tune_at = 8

# Freeze all the layers before the `fine_tune_at` layer
for layer in model_final.layers[:fine_tune_at]:
  layer.trainable = False

for layer in model_final.layers:
    if isinstance(layer, keras.layers.BatchNormalization):
        layer.trainable = False


In [None]:
model_final.summary()

In [None]:
for index, layer in enumerate(model_final.layers):
    print("Layer: {}, Trainable: {}".format(index, layer.trainable))

In [None]:
# learning rate has been lowered as more model ahs been opened for training. Should stop overfititng

TL_learningRate = base_learning_rate/10

model_final.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=TL_learningRate),
              loss=tf.keras.losses.categorical_crossentropy,
              metrics=['accuracy'])

In [None]:
fine_tune_epochs = 200

total_epochs = initial_epochs + fine_tune_epochs

#np.random.seed(343)
# fit the model
history_fine = model_final.fit(
  train_generator,
  epochs=total_epochs,
  initial_epoch = history.epoch[-1],
  #initial_epoch = fine_tune_epochs,
  validation_data=validation_generator,
  callbacks=[cp_callback, early,
             #LearningRateScheduler(TF_lr_exp_decay, verbose=1)
            ],
  #batch_size=128,
  #shuffle=True
  )


In [None]:
test_loss, test_acc = model_final.evaluate(
    test_generator)

In [None]:
acc_str = str(round(test_acc * 100,2))


model_name = f'../save_offs/LS_ventral_MIXED/fvDorsal_nVSs{acc_str}_{date}.keras'

model_final.save(model_name)

In [None]:

# Make predictions on the test set
predictions = model_final.predict(test_generator)

# Convert predictions to class labels
predicted_class_indices = np.argmax(predictions, axis=1)
predicted_classes = [class_names[idx] for idx in predicted_class_indices]

# Get true class labels
true_classes = test_generator.classes
class_labels = list(test_generator.class_indices.keys())

# Compute confusion matrix and classification report
confusion_mtx = confusion_matrix(true_classes, predicted_class_indices)
class_report = classification_report(true_classes, predicted_class_indices, target_names=class_labels, output_dict=True)

# Convert confusion matrix and classification report to DataFrame
confusion_mtx_df = pd.DataFrame(confusion_mtx, index=class_labels, columns=class_labels)
class_report_df = pd.DataFrame(class_report).transpose()

# Save confusion matrix and classification report to Excel
with pd.ExcelWriter('../save_offs/LS_ventral_MIXED/classification_results_test.xlsx') as writer:  
    confusion_mtx_df.to_excel(writer, sheet_name='Confusion Matrix')
    class_report_df.to_excel(writer, sheet_name='Classification Report')


In [None]:

# Make predictions on the test set
predictions = model_final.predict(test_full_generator)

# Convert predictions to class labels
predicted_class_indices = np.argmax(predictions, axis=1)
predicted_classes = [class_names[idx] for idx in predicted_class_indices]

# Get true class labels
true_classes = test_full_generator.classes
class_labels = list(test_full_generator.class_indices.keys())

# Compute confusion matrix and classification report
confusion_mtx = confusion_matrix(true_classes, predicted_class_indices)
class_report = classification_report(true_classes, predicted_class_indices, target_names=class_labels, output_dict=True)

# Convert confusion matrix and classification report to DataFrame
confusion_mtx_df = pd.DataFrame(confusion_mtx, index=class_labels, columns=class_labels)
class_report_df = pd.DataFrame(class_report).transpose()

# Save confusion matrix and classification report to Excel
with pd.ExcelWriter('../save_offs/LS_ventral_MIXED/classification_results_test_full.xlsx') as writer:  
    confusion_mtx_df.to_excel(writer, sheet_name='Confusion Matrix')
    class_report_df.to_excel(writer, sheet_name='Classification Report')

In [None]:

# Set the path to the folder containing the images
img_folder_path = '../save_offs/LS_ventral_MIXED/pt2/test_even/'

# Set the path to the folder where the predicted images will be saved
output_folder_path = '../save_offs/LS_ventral_MIXED/predicted_images'

# Create an ImageDataGenerator instance
datagen = ImageDataGenerator()
generator = datagen.flow_from_directory(img_folder_path, target_size=(img_width, img_height), batch_size=1, shuffle=False, class_mode='categorical')

# Initialize an empty list to store the results
results_list = []

# Iterate through the subdirectories in the folder
for sub_dir_name in os.listdir(img_folder_path):
    sub_dir_path = os.path.join(img_folder_path, sub_dir_name)
    if os.path.isdir(sub_dir_path):
        # Create a corresponding subdirectory in the output folder
        output_sub_dir_path = os.path.join(output_folder_path, sub_dir_name)
        if not os.path.exists(output_sub_dir_path):
            os.makedirs(output_sub_dir_path)

        # Iterate through the images in the subdirectory
        for img_file in os.listdir(sub_dir_path):
            if img_file.endswith('.jpg'):  # Consider only JPG files
                # Read the image, convert to RGB color space, and resize
                img_path = os.path.join(sub_dir_path, img_file)
                with Image.open(img_path) as img:
                    img = img.convert('RGB')
                    img = img.resize((img_width, img_height))

                # Make a prediction on the image
                img_array = np.asarray(img)
                img_array = preprocess_image(img_array)
                prediction = model_final.predict(np.expand_dims(img_array, axis=0))[0]
                predicted_class_idx = np.argmax(prediction)
                predicted_class = class_names[predicted_class_idx]
                confidence_score = prediction[predicted_class_idx]

                # Save a copy of the image to the corresponding predicted class subdirectory
                output_sub_dir_class_path = os.path.join(output_sub_dir_path, str(predicted_class_idx))
                if not os.path.exists(output_sub_dir_class_path):
                    os.makedirs(output_sub_dir_class_path)
                output_img_path = os.path.join(output_sub_dir_class_path, img_file)
                img.save(output_img_path)

                # Add the result to the list
                actual_class = generator.class_indices[sub_dir_name]
                result_dict = {
                    'Image': img_file,
                    'Actual_Class': actual_class,
                    'Predicted_Class': predicted_class,
                    'Confidence_Score': confidence_score
                }
                results_list.append(result_dict)

# Create a DataFrame from the results list
results_df = pd.DataFrame(results_list)

# Save the results to a CSV file named 'predicted_results.csv'
results_df.to_csv('../save_offs/LS_ventral_MIXED/predicted_images/predicted_results.csv', index=False)


In [None]:
replace2linear = ReplaceToLinear()

gradcam = Gradcam(model_final,
                  model_modifier=replace2linear,
                  clone=True)

def score_function(output):
    return output[0][class_index]

img_folder_path = '../save_offs/LS_ventral_MIXED/predicted_images'

# Create heatmap directory
heatmap_dir = '../save_offs/LS_ventral_MIXED/true_heatmaps/'
os.makedirs(heatmap_dir, exist_ok=True)

# Initialize an empty DataFrame
df = pd.DataFrame(columns=['Image_Name', 'Subdirectory', 'Class_Index', 'Heatmap_Intensity'])

# Existing batch_size declaration
BATCHSIZE = 32  # You can adjust this value based on your available memory

for sub_dir_name in os.listdir(img_folder_path):
    sub_dir_path = os.path.join(img_folder_path, sub_dir_name)
    if os.path.isdir(sub_dir_path):
        for sub_sub_dir_name in os.listdir(sub_dir_path):
            sub_sub_dir_path = os.path.join(sub_dir_path, sub_sub_dir_name)
            if os.path.isdir(sub_sub_dir_path):
                class_index = int(sub_sub_dir_name)
                print(f"Processing images in sub-subdirectory {sub_sub_dir_path}, using class index {class_index}")

                # Get the list of image files in the sub-sub-directory
                img_files = [img_file for img_file in os.listdir(sub_sub_dir_path) if img_file.endswith('.jpg')]

                # Process images in batches
                for i in range(0, len(img_files), BATCHSIZE):
                    batch_img_files = img_files[i:i+BATCHSIZE]
                    for img_file in batch_img_files:
                        img_path = os.path.join(sub_sub_dir_path, img_file)
                        img = load_img(img_path, target_size=(224, 224))
                        x = np.array(img)
                        x = np.expand_dims(x, axis=0)
                        x = preprocess_input(x)

                        cam = gradcam(score_function, x, penultimate_layer=-1)

                        heatmap_quantified = np.sum(cam[0])

                        # Add results to dataframe
                        df = pd.concat([df, pd.DataFrame([{'Image_Name': img_file, 'Subdirectory': sub_dir_name, 'Class_Index': class_index, 'Heatmap_Intensity': heatmap_quantified}])], ignore_index=True)

                        heatmap = np.uint8(cm.jet(cam[0])[..., :3] * 255)
                        plt.imshow(img)
                        plt.imshow(heatmap, cmap='jet', alpha=0.5)
                        plt.axis('off')
                        plt.title(f"{img_file}, Heatmap intensity: {heatmap_quantified}")

                        # Save the image
                        heatmap_save_dir = os.path.join(heatmap_dir, sub_dir_name, sub_sub_dir_name)
                        os.makedirs(heatmap_save_dir, exist_ok=True)
                        save_path = os.path.join(heatmap_save_dir, f"{img_file}_heatmap.jpg")
                        plt.savefig(save_path)
                        plt.close()

                    # After processing each batch, clear memory
                    K.clear_session()

# Reorder the DataFrame columns
df = df[['Image_Name', 'Subdirectory', 'Class_Index', 'Heatmap_Intensity']]

# Define the path where you want to save the Excel file
excel_file_path = "../save_offs/LS_ventral_MIXED/heatmap_results.xlsx"

# Save DataFrame to Excel
df.to_excel(excel_file_path, index=False)

In [None]:
replace2linear = ReplaceToLinear()

# Create the Saliency object
saliency = Saliency(model_final,
                    model_modifier=replace2linear,
                    clone=True)

def score_function(output):
    return output[0][class_index]

img_folder_path = '../save_offs/LS_ventral_MIXED/predicted_images'

# Create heatmap directory
heatmap_dir = '../save_offs/LS_ventral_MIXED/true_saliency_maps/'
os.makedirs(heatmap_dir, exist_ok=True)

# Initialize an empty DataFrame
df = pd.DataFrame(columns=['Image_Name', 'Subdirectory', 'Class_Index', 'Saliency_Intensity'])

# Existing batch_size declaration
BATCHSIZE = 32  # You can adjust this value based on your available memory

for sub_dir_name in os.listdir(img_folder_path):
    sub_dir_path = os.path.join(img_folder_path, sub_dir_name)
    if os.path.isdir(sub_dir_path):
        for sub_sub_dir_name in os.listdir(sub_dir_path):
            sub_sub_dir_path = os.path.join(sub_dir_path, sub_sub_dir_name)
            if os.path.isdir(sub_sub_dir_path):
                class_index = int(sub_sub_dir_name)
                print(f"Processing images in sub-subdirectory {sub_sub_dir_path}, using class index {class_index}")

                # Get the list of image files in the sub-sub-directory
                img_files = [img_file for img_file in os.listdir(sub_sub_dir_path) if img_file.endswith('.jpg')]

                # Process images in batches
                for i in range(0, len(img_files), BATCHSIZE):
                    batch_img_files = img_files[i:i+BATCHSIZE]
                    for img_file in batch_img_files:
                        img_path = os.path.join(sub_sub_dir_path, img_file)

                        # Load the image at original size
                        img = Image.open(img_path)
                        original_size = img.size

                        # Resize image to model input size for saliency
                        img_resized = img.resize((224, 224))
                        x = img_to_array(img_resized)
                        x = np.expand_dims(x, axis=0)
                        x = preprocess_input(x)

                        # Generate the saliency map
                        saliency_map = saliency(score_function,
                                                x,
                                                smooth_samples=20,  # Number of gradient iterations
                                                smooth_noise=0.20)  # Noise spread level

                        # Calculate saliency map intensity as a sum
                        saliency_intensity = np.sum(saliency_map[0])

                        # Add results to dataframe
                        df = pd.concat([df, pd.DataFrame([{'Image_Name': img_file, 'Subdirectory': sub_dir_name, 'Class_Index': class_index, 'Saliency_Intensity': saliency_intensity}])], ignore_index=True)

                        # Resize the saliency map to the original image size
                        saliency_resized = np.uint8(cm.jet(saliency_map[0])[..., :3] * 255)
                        saliency_resized = Image.fromarray(saliency_resized).resize(original_size)

                        # Overlay saliency map on original image
                        plt.imshow(img)
                        plt.imshow(saliency_resized, cmap='jet', alpha=0.5)
                        plt.axis('off')
                        plt.title(f"{img_file}, Saliency intensity: {saliency_intensity}")

                        # Save the image
                        saliency_save_dir = os.path.join(heatmap_dir, sub_dir_name, sub_sub_dir_name)
                        os.makedirs(saliency_save_dir, exist_ok=True)
                        save_path = os.path.join(saliency_save_dir, f"{img_file}_saliency.jpg")
                        plt.savefig(save_path)
                        plt.close()

                    # After processing each batch, clear memory
                    K.clear_session()

# Reorder the DataFrame columns
df = df[['Image_Name', 'Subdirectory', 'Class_Index', 'Saliency_Intensity']]

# Define the path where you want to save the Excel file
excel_file_path = "../save_offs/LS_ventral_MIXED/saliency_results.xlsx"

# Save DataFrame to Excel
df.to_excel(excel_file_path, index=False)
