### Imports

In [None]:
# pip install Augmentor scikit-learn gputil

In [None]:
import os
import re
import csv
import time
import GPUtil
import random
import shutil
import Augmentor
import numpy as np
import pandas as pd
from PIL import Image
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img 
from tensorflow.keras.applications import MobileNetV2, MobileNetV3Small, MobileNetV3Large, VGG16, InceptionV3, ResNet50, ResNet101, ResNet152
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as preprocess_input_mobilenet_v2
from tensorflow.keras.applications.mobilenet_v3 import preprocess_input as preprocess_input_mobilenet_v3
from keras.applications.vgg16 import preprocess_input as preprocess_input_vgg16
from keras.applications.inception_v3 import preprocess_input as preprocess_input_inceptionv3
from tensorflow.keras.applications.resnet50 import preprocess_input as preprocess_input_resnet50
from tensorflow.keras.applications.resnet import preprocess_input as preprocess_input_resnet101
from tensorflow.keras.applications.resnet_v2 import preprocess_input as preprocess_input_resnet152

### GPU Check

In [None]:
import tensorflow as tf
print("TensorFlow version:", tf.__version__)
print(tf.test.is_built_with_cuda())

In [None]:
# Get GPU information
gpus = GPUtil.getGPUs()
for gpu in gpus:
    print(f"GPU {gpu.id}: {gpu.name}, GPU Load: {gpu.load * 100}%")

### 6:2:2 split of each folder -> create csv files -> check for duplicates -> check for leakage -> remove duplicates and leakage (if any) -> oversample train to 3000 using augmentations -> creating final csv files [all done only once, hence, commented.]

In [None]:
### Functions

### Split by 6:2:2, creating separate train, val and test folders

def split_data(input_folder, output_folder):
    # Create train, val, and test folders
    train_folder = os.path.join(output_folder, 'train')
    val_folder = os.path.join(output_folder, 'val')
    test_folder = os.path.join(output_folder, 'test')

    os.makedirs(train_folder, exist_ok=True)
    os.makedirs(val_folder, exist_ok=True)
    os.makedirs(test_folder, exist_ok=True)

    # Dictionary to store counts for each class
    class_counts = {}

    # Iterate through each class folder in the input folder
    for class_folder in os.listdir(input_folder):
        class_path = os.path.join(input_folder, class_folder)

        # Skip if it's not a directory
        if not os.path.isdir(class_path):
            continue

        # Create subfolders in train, val, and test
        train_class_folder = os.path.join(train_folder, class_folder)
        val_class_folder = os.path.join(val_folder, class_folder)
        test_class_folder = os.path.join(test_folder, class_folder)

        os.makedirs(train_class_folder, exist_ok=True)
        os.makedirs(val_class_folder, exist_ok=True)
        os.makedirs(test_class_folder, exist_ok=True)

        # Get the list of images in the class folder
        images = os.listdir(class_path)
        random.shuffle(images)

        # Calculate the number of images for each split
        total_images = len(images)
        train_split = int(0.6 * total_images)
        val_split = int(0.2 * total_images)

        # Copy images to train, val, and test folders
        for i, image in enumerate(images):
            src_path = os.path.join(class_path, image)
            
            if i < train_split:
                dst_path = os.path.join(train_class_folder, image)
            elif i < train_split + val_split:
                dst_path = os.path.join(val_class_folder, image)
            else:
                dst_path = os.path.join(test_class_folder, image)

            shutil.copy(src_path, dst_path)

        # Update class counts dictionary
        class_counts[class_folder] = {
            'total': total_images,
            'train': train_split,
            'val': val_split,
            'test': total_images - train_split - val_split
        }

    return class_counts

### -------------------------------------------------------------------------

### Create csv for a folder

def create_csv(input_folder, output_csv):
    # Open CSV file in write mode
    with open(output_csv, 'w', newline='') as csv_file:
        # Create CSV writer
        csv_writer = csv.writer(csv_file)
        
        # Write header
        csv_writer.writerow(['Image_Path', 'Label'])

        # Initialize count
        total_rows = 0

        # Iterate through each class folder in the input folder
        for label in os.listdir(input_folder):
            class_folder = os.path.join(input_folder, label)

            # Skip if it's not a directory
            if not os.path.isdir(class_folder):
                continue

            # Iterate through each image in the class folder
            for image in os.listdir(class_folder):
                # Get the image path
                image_path = os.path.join(class_folder, image)

                # Write the row to the CSV file
                csv_writer.writerow([image_path, label])

                # Increment the count
                total_rows += 1

    return total_rows

### -------------------------------------------------------------------------

### Oversample to n using augmentations

def oversample_aug(main_folder, output_folder, n=3000):
    # Iterate over subfolders
    for subfolder in os.listdir(main_folder):
        subfolder_path = os.path.join(main_folder, subfolder)

        # Check if it's a directory
        if os.path.isdir(subfolder_path):
            # Check if the directory contains images
            if any(f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tif')) for f in os.listdir(subfolder_path)):
                # Create an Augmentor pipeline for each subfolder
                pipeline = Augmentor.Pipeline(subfolder_path, output_directory=os.path.join(output_folder, subfolder))
                # Add your augmentations here
                pipeline.rotate(probability=1, max_left_rotation=5, max_right_rotation=5)
                pipeline.rotate(probability=0.7, max_left_rotation=10, max_right_rotation=10)
                pipeline.flip_left_right(probability=0.5)
                pipeline.flip_top_bottom(probability=0.5)

                # Execute the augmentation process
                pipeline.sample(n)  # Adjust the number of samples as needed
            else:
                print(f"No images found in {subfolder_path}. Skipping.")
            
### -------------------------------------------------------------------------

### Check for dupliates in a csv file, on the column Image_Path
            
def check_duplicates(file):
    df = pd.read_csv(f'/uoa/home/s04bs3/data/dasa/{file}.csv')
    duplicates_specific_column= df[df.duplicated(subset=['Image_Path'])]
    print(f"\nDuplicates in {file} based on Image_Path:")
    print(duplicates_specific_column)
    
### -------------------------------------------------------------------------

### Functions to deal with leakage

def extract_pattern(path):
    pattern = r'(\d{2}-\d{2}-\d{4}-\d{2}-\d{2}-\d{2}\.hvdfrm\d+\(\d+,\d+\)-Z\d+\.\d+)'
    match = re.search(pattern, path)
    return match.group() if match else None

def get_paths_with_pattern(df):
    return set(df['Image_Path'].apply(extract_pattern))

def get_full_path(pattern, df):
    # Find the row where Image_Path contains the pattern
    match_row = df[df['Image_Path'].apply(lambda x: pattern in x)]
    
    # Assuming there is only one match, get the full path
    full_path = match_row['Image_Path'].iloc[0] if not match_row.empty else None
    
    return full_path

def check_leakage(train_path, val_path, test_path):
    train_df = pd.read_csv(train_path)
    val_df = pd.read_csv(val_path)
    test_df = pd.read_csv(test_path)

    train_paths = get_paths_with_pattern(train_df)
    val_paths = get_paths_with_pattern(val_df)
    test_paths = get_paths_with_pattern(test_df)

    overlap_train_val = train_paths.intersection(val_paths)
    overlap_train_test = train_paths.intersection(test_paths)
    overlap_val_test = val_paths.intersection(test_paths)

    print("Overlap between train and val:", overlap_train_val)
    print("Overlap between train and test:", overlap_train_test)
    print("Overlap between val and test:", overlap_val_test)
    print("\n")

    return train_df, val_df, test_df, overlap_train_val, overlap_train_test, overlap_val_test

def remove_leakage(train_df, val_df, overlap_train_val, overlap_train_test):
    # Filter rows from train_df where Image_Path is in overlap_train_val or overlap_train_test
    train_df_no_overlap = train_df[~train_df['Image_Path'].apply(extract_pattern).isin(overlap_train_val.union(overlap_train_test))]

    # Filter rows from val_df where Image_Path is in overlap_train_val
    val_df_no_overlap = val_df[~val_df['Image_Path'].apply(extract_pattern).isin(overlap_train_val)]

    return train_df_no_overlap, val_df_no_overlap

def delete_images(image_paths):
    for path in image_paths:
        try:
            os.remove(path)
            print(f"Deleted: {path}")
        except Exception as e:
            print(f"Error deleting {path}: {e}")
            
            import os

In [None]:
# ### 6:2:2 split

# # Provide the input and output folder paths
# input_folder_path = "/uoa/home/s04bs3/data/dasa/ALL/"
# output_folder_path = "/uoa/home/s04bs3/data/dasa/"

# counts = split_data(input_folder_path, output_folder_path)

# # Display counts for each class
# for class_name, count_info in counts.items():
#     print(f"Class: {class_name}")
#     print(f"Total: {count_info['total']} | Train: {count_info['train']} | Val: {count_info['val']} | Test: {count_info['test']}")
#     print("---")

In [None]:
# import os
# from PIL import Image
# import shutil

# def convert_folder_to_rgb(input_folder, output_folder):
#     # Create output folder if it doesn't exist
#     if not os.path.exists(output_folder):
#         os.makedirs(output_folder)

#     # Initialize counts for LA and RGB images
#     la_count = 0
#     rgb_count = 0

#     # Iterate through each folder in the input directory
#     for folder_name in os.listdir(input_folder):
#         folder_path = os.path.join(input_folder, folder_name)
#         output_folder_path = os.path.join(output_folder, folder_name)

#         # Create output folder for this folder if it doesn't exist
#         if not os.path.exists(output_folder_path):
#             os.makedirs(output_folder_path)

#         # Iterate through each image file in the folder
#         for image_name in os.listdir(folder_path):
#             image_path = os.path.join(folder_path, image_name)
#             output_image_path = os.path.join(output_folder_path, image_name)

#             # Check if the image is a PNG or BMP file
#             if image_name.lower().endswith(('.png', '.bmp')):
#                 # Check if the image is in LA format
#                 if is_la_image(image_path):
#                     # Convert LA image to RGB and save it in the output folder
#                     la_to_rgb(image_path, output_image_path)
#                     la_count += 1
#                 else:
#                     # If the image is already in RGB format, copy it to the output folder
#                     shutil.copy(image_path, output_image_path)
#                     rgb_count += 1

#     print(f"LA images found: {la_count}")
#     print(f"RGB images found: {rgb_count}")

# def is_la_image(image_path):
#     # Check if the image has an alpha channel (indicating LA format)
#     with Image.open(image_path) as img:
#         return img.mode == "LA"

# def la_to_rgb(input_path, output_path):
#     with Image.open(input_path) as img:
#         # Convert LA image to RGB
#         img_rgb = img.convert("RGB")
#         # Save the converted image
#         img_rgb.save(output_path)

In [None]:
# input_folder = "/uoa/home/s04bs3/data/dasa/train/"
# output_folder = "/uoa/home/s04bs3/data/dasa/train_RGB/"

# convert_folder_to_rgb(input_folder, output_folder)

In [None]:
# ### create csv -- train.csv, val.csv, test.csv & check for duplicates

# for n in ['train_RGB', 'val_RGB', 'test_RGB']:
#     input_folder_path = f"/uoa/home/s04bs3/data/dasa/{n}/"
#     output_csv_path = f"/uoa/home/s04bs3/data/dasa/{n}.csv"
#     total_rows = create_csv(input_folder_path, output_csv_path)
#     print(f'\nTotal rows created: {total_rows}')
#     check_duplicates(n)

In [None]:
# # Example usage:
# train_path = '/uoa/home/s04bs3/data/dasa/train_RGB.csv'
# val_path = '/uoa/home/s04bs3/data/dasa/val_RGB.csv'
# test_path = '/uoa/home/s04bs3/data/dasa/test_RGB.csv'

# train_df, val_df, test_df, overlap_train_val, overlap_train_test, overlap_val_test = check_leakage(train_path, val_path, test_path)
# train_df_no_overlap, val_df_no_overlap = remove_leakage(train_df, val_df, overlap_train_val, overlap_train_test)

# # Get full paths for overlapping images
# full_paths_train = [get_full_path(pattern, train_df) for pattern in overlap_train_val.union(overlap_train_test) if pattern is not None]
# full_paths_val = [get_full_path(pattern, val_df) for pattern in overlap_val_test if pattern is not None]


# # Delete images
# delete_images(full_paths_train)
# delete_images(full_paths_val)

In [None]:
# ### oversample using augmentations to n=3000

# main_folder = "/uoa/home/s04bs3/data/dasa/train_RGB"
# output_folder = "/uoa/home/s04bs3/data/dasa/train_o_RGB/"

# oversample_aug(main_folder, output_folder, 3000)

In [None]:
# ### final csv files

# for n in ['train_o_RGB','val_RGB','test_RGB']:
#     input_folder_path = f"/uoa/home/s04bs3/data/dasa/{n}/"
#     output_csv_path = f"/uoa/home/s04bs3/data/dasa/{n}.csv"
#     total_rows = create_csv(input_folder_path, output_csv_path)
#     print(f'Total rows created: {total_rows}\n\n')

### Functions to train, evaluate and save results

In [None]:
def train_model(model_name, train_data, val_data, epochs=10, batch_size=32):
    
    target_size=(224,224)

    if model_name.lower() in ["mobilenetv2", "mobilenetv2-0.5", "mobilenetv2-0.75"]:
        prep_fn = preprocess_input_mobilenet_v2
    elif model_name.lower() == "vgg16":
        prep_fn = preprocess_input_vgg16
    elif model_name.lower() == "inceptionv3":
        prep_fn = preprocess_input_inceptionv3
        target_size=(299,299)
    elif model_name.lower() == "ResNet50":
        prep_fn = preprocess_input_resnet50
    elif model_name.lower() == "ResNet101":
        prep_fn = preprocess_input_resnet101
    elif model_name.lower() == "ResNet152":
        prep_fn = preprocess_input_resnet152
    else:
        prep_fn = preprocess_input_mobilenet_v3
    
    # Set up data generators
    train_datagen = ImageDataGenerator(preprocessing_function=prep_fn)
    val_datagen = ImageDataGenerator(preprocessing_function=prep_fn)

    # Set up data generators
    train_generator = train_datagen.flow_from_dataframe(
        dataframe=train_data,
        x_col="Image_Path",
        y_col="Label",
        target_size=target_size,
        batch_size=batch_size,
        class_mode="categorical",
        shuffle=False
    )

    val_generator = val_datagen.flow_from_dataframe(
        dataframe=val_data,
        x_col="Image_Path",
        y_col="Label",
        target_size=target_size,
        batch_size=batch_size,
        class_mode="categorical",
        shuffle=False
    )

    # Build model based on model_name
    base_model = None

    if model_name == "MobileNetV2":
        base_model = MobileNetV2(input_shape=(224,224, 3), include_top=False, weights="imagenet")
    elif model_name == "MobileNetV2-0.75":
        base_model = MobileNetV2(input_shape=(224,224, 3), include_top=False, weights="imagenet", alpha=0.75)
    elif model_name == "MobileNetV2-0.5":
        base_model = MobileNetV2(input_shape=(224,224, 3), include_top=False, weights="imagenet", alpha=0.5)
    elif model_name == "MobileNetV3Small":
        base_model = MobileNetV3Small(input_shape=(224,224, 3), include_top=False, weights="imagenet")
    elif model_name == "MobileNetV3Small-0.75":
        base_model = MobileNetV3Small(input_shape=(224,224, 3), include_top=False, weights="imagenet", alpha=0.75)
    elif model_name == "MobileNetV3Small-Min":
        base_model = MobileNetV3Small(input_shape=(224,224, 3), include_top=False, weights="imagenet", minimalistic=True)
    elif model_name == "MobileNetV3Large":
        base_model = MobileNetV3Large(input_shape=(224,224, 3), include_top=False, weights="imagenet")
    elif model_name == "MobileNetV3Large-0.75":
        base_model = MobileNetV3Large(input_shape=(224,224, 3), include_top=False, weights="imagenet", alpha=0.75)
    elif model_name == "MobileNetV3Large-Min":
        base_model = MobileNetV3Large(input_shape=(224,224, 3), include_top=False, weights="imagenet", minimalistic=True)
    elif model_name == "VGG16":
        base_model = VGG16(input_shape=(224,224, 3), include_top=False, weights="imagenet")
    elif model_name == "InceptionV3":
        base_model = InceptionV3(input_shape=(299,299, 3), include_top=False, weights="imagenet")
    elif model_name == "ResNet50":
        base_model = ResNet50(input_shape=(224,224, 3), include_top=False, weights="imagenet")
    elif model_name == "ResNet101":
        base_model = ResNet101(input_shape=(224,224, 3), include_top=False, weights="imagenet")
    elif model_name == "ResNet152":
        base_model = ResNet152(input_shape=(224,224, 3), include_top=False, weights="imagenet")

    if base_model:
        base_model.trainable = False

        model = models.Sequential([
            base_model,
            layers.GlobalAveragePooling2D(),
            layers.Dropout(0.5),
            layers.Dense(128, activation='relu'),
            layers.Dense(32, activation='relu'),
            layers.Dense(12, activation='softmax')
        ])

        model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

        # Define callbacks (EarlyStopping)
        early_stopping = EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True)
        model_checkpoint = ModelCheckpoint(f"/uoa/home/s04bs3/data/dasa/models_RGB/{model_name}.h5", save_best_only=True)

        # Start the timer for training
        start_time = time.time()

        # Train the model
        history = model.fit(
            train_generator,
            epochs=epochs,
            validation_data=val_generator,
            callbacks=[early_stopping, model_checkpoint]
        )

        # Stop the timer for training
        end_time = time.time()

        # Calculate and print the training time
        training_time = round(end_time - start_time,2)
        print(f"Training Time: {training_time} seconds")

        return model, history

    else:
        print(f"Model {model_name} not recognized.")
        return None, None

##############################################################################################################

def evaluate_model(model_name, model_path, test_csv_path, target_size=(224, 224), batch_size=32):
    # Load the test data
    test_data = pd.read_csv(test_csv_path)

    # Load the saved model
    model = load_model(model_path)

    # Create an ImageDataGenerator for test data
    if model_name.lower() in ["mobilenetv2", "mobilenetv2-0.5", "mobilenetv2-0.75"]:
        prep_fn = preprocess_input_mobilenet_v2
    elif model_name.lower() == "vgg16":
        prep_fn = preprocess_input_vgg16
    elif model_name.lower() == "inceptionv3":
        prep_fn = preprocess_input_inceptionv3
        target_size = (299,299)
    elif model_name.lower() == "ResNet50":
        prep_fn = preprocess_input_resnet50
    elif model_name.lower() == "ResNet101":
        prep_fn = preprocess_input_resnet101
    elif model_name.lower() == "ResNet152":
        prep_fn = preprocess_input_resnet152
    else:
        prep_fn = preprocess_input_mobilenet_v3
    test_datagen = ImageDataGenerator(preprocessing_function=prep_fn)  # You may need to adjust this based on your training data preprocessing

    # Configure the test generator
    test_generator = test_datagen.flow_from_dataframe(
        dataframe=test_data,
        x_col="Image_Path",
        y_col="Label",
        target_size=target_size,
        batch_size=batch_size,
        class_mode="categorical",
        shuffle=False  # Important: set shuffle to False for reproducibility
    )

    # Start the timer for testing
    start_time = time.time()

    # Get predictions
    y_pred = model.predict(test_generator)

    # Stop the timer for testing
    end_time = time.time()

    # Calculate and print the testing time
    testing_time = round(end_time - start_time, 2)
    print(f"Testing Time: {testing_time} seconds")

    # Convert predictions to class labels
    y_pred_classes = np.argmax(y_pred, axis=1)

    # Convert true labels to class labels
    y_true = test_generator.classes

    # Calculate and print confusion matrix
    conf_mat = confusion_matrix(y_true, y_pred_classes)
    print("Confusion Matrix:")
    print(conf_mat)

    # Calculate and print classification report
    class_labels = list(test_generator.class_indices.keys())
    report = classification_report(y_true, y_pred_classes, target_names=class_labels)
    print("\nClassification Report:")
    print(report)

    save_results(model_name, conf_mat, report, testing_time)

##############################################################################################################

def save_results(model_name, conf_mat, report, testing_time):
    # Save all results in a single text file
    with open(f"/uoa/home/s04bs3/data/dasa/results_RGB/{model_name}_results.txt", 'w') as file:
        file.write("Confusion Matrix:\n")
        file.write(np.array_str(conf_mat))
        file.write("\n\nClassification Report:\n")
        file.write(report)
        file.write(f"\nTesting Time: {testing_time} seconds")

### Train & Test

In [None]:
model_name = ["MobileNetV2","MobileNetV2-0.75", "MobileNetV2-0.5",
              "MobileNetV3Small", "MobileNetV3Small-0.75", "MobileNetV3Small-Min",
              "MobileNetV3Large", "MobileNetV3Large-0.75", "MobileNetV3Large-Min",
              "VGG16","InceptionV3", "ResNet50", "ResNet101","ResNet152"]

train_data = pd.read_csv("/uoa/home/s04bs3/data/dasa/train_o_RGB.csv")
val_data = pd.read_csv("/uoa/home/s04bs3/data/dasa/val_RGB.csv")
test_csv_path = '/uoa/home/s04bs3/data/dasa/test_RGB.csv'

for n in range(len(model_name)):
    # Train the model
    model, history = train_model(model_name[n], train_data, val_data, epochs=30, batch_size=64)

    # Evaluate the model
    model_path = f'/uoa/home/s04bs3/data/dasa/models_RGB/{model_name[n]}.h5'    
    evaluate_model(model_name[n], model_path, test_csv_path)


In [None]:
# Replace this with the path to your saved models directory
models_directory = '/uoa/home/s04bs3/data/dasa/models/'

# List of model names
model_name = ["MobileNetV2", "MobileNetV2-0.75", "MobileNetV2-0.5",
              "MobileNetV3Small", "MobileNetV3Small-0.75", "MobileNetV3Small-Min",
              "MobileNetV3Large", "MobileNetV3Large-0.75", "MobileNetV3Large-Min",
              "VGG16","InceptionV3", "ResNet50", "ResNet101","ResNet152"]

for model in model_name:
    # Load the model
    model_path = os.path.join(models_directory, f'{model}.h5')
    loaded_model = load_model(model_path)

    # Get the size of the model file
    model_size_bytes = os.path.getsize(model_path)

    # Convert size to MB
    model_size_MB = model_size_bytes / (1024 * 1024)

    # Get the number of parameters (in millions)
    num_params = loaded_model.count_params() / 1e6

    print(f"{model} - Model Size: {model_size_MB:.2f} MB | Number of Parameters: {num_params:.2f} million")
