<a href="https://colab.research.google.com/github/dovkess/FGCVBreedDetection/blob/main/dog_functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import cell. this is a cell run once to avoid having import problems later on.
!pip install tensorflow
from google.colab import drive
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications import VGG19, InceptionResNetV2, InceptionV3, NASNetLarge
from tensorflow.keras.applications import ConvNeXtXLarge, ResNet152V2, Xception, DenseNet201, EfficientNetV2L
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.optimizers import Adam
from keras.layers import Dense, Dropout, Flatten, BatchNormalization, GlobalAveragePooling2D
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.preprocessing import image
import time
from sklearn.metrics import confusion_matrix
import seaborn as sns
import pickle
import threading
import numpy as np
from tensorflow.keras.models import load_model
import os
import glob
import shutil
import random
import matplotlib.pyplot as plt
from PIL import Image
%pip install lime
import lime
from lime import lime_image
from skimage.segmentation import mark_boundaries
from sklearn.model_selection import KFold, train_test_split
!pip install ultralytics
from ultralytics import YOLO
import io
import base64

# Mount Google Drive
drive.mount('/content/drive')



In [None]:
def copy_data_from_drive(drive_loc, local_loc='Images'):
    '''
    Copy data from drive to local machine
    Expecting the data to be folder

    Args:
        drive_loc (str): path to the folder in drive
        local_loc (str): path to the folder in local machine
    '''
    print(time.strftime("%Y-%m-%d_%H:%M:%S"))
    base_path = '/content/{}'.format(local_loc)
    %cp -r {drive_loc} {base_path}
    print(time.strftime("%Y-%m-%d_%H:%M:%S"))


In [None]:
def copy_file_thread(source, target):
    '''
    Copy file from source to target in a separate thread

    Args:
        source (str): path to the source file
        target (str): path to the target file
    '''
    shutil.copy(source, target)

def copy_images_by_list(file_names, drive_location, local_loc='Images'):
    '''
    Copy images from drive to local machine
    Expecting the data to be in a folder

    Args:
        file_names (list): list of file names to copy
        drive_location (str): path to the folder in drive
        local_loc (str): path to the folder in local machine
    '''
    local_dir = '/content/'+local_loc
    folders = set([f.split('/')[0] for f in file_names])
    # create sub directories
    for folder in folders:
        folder_path = os.path.join(local_dir, folder)
        if not os.path.isdir(folder_path):
            os.makedirs(folder_path)

    print(time.strftime("%Y-%m-%d_%H:%M:%S"))
    # Copy the images
    for e, f in enumerate(file_names):
        drive_path = os.path.join(drive_location, f)
        local_path = os.path.join(local_dir, f)
        threading.Thread(target=copy_file_thread, args=(drive_path, local_path)).start()
        # Sleep to allow the threads to complete the work and avoid threshing.
        if e % 500 == 0:
            time.sleep(5)
    print(time.strftime("%Y-%m-%d_%H:%M:%S"))

In [None]:
# ======================= CREATE DATA SETS =========================== #
def create_data_sets(base_path, batch_size, validation_split, classes, train=True, input_size=[331, 331]):
    '''
    Create data sets for training and testing
    Args:
        base_path (str): path to the folder containing the images
        batch_size (int): batch size for each iteration
        validation_split (float): percentage of the data to use for validation
        classes (list): list of classes
        train (bool): whether to create a training set or not
        input_size (list): size of the input images

    Returns:
        train_generator (ImageDataGenerator): training data generator
        test_generator (ImageDataGenerator): testing data generator
    '''
    train_datagen = ImageDataGenerator(
        rescale=1./255,
        shear_range=0.0,
        zoom_range=0.0,
        horizontal_flip=True,
        validation_split=validation_split
    )

    train_generator = train_datagen.flow_from_directory(
        base_path,
        target_size=(input_size[0], input_size[1]),
        batch_size=batch_size,
        subset='training',
        classes=classes,
        shuffle = True if train else False
    )
    test_generator = None
    if train:
        test_datagen = ImageDataGenerator(rescale=1./255, validation_split=validation_split)

        test_generator = test_datagen.flow_from_directory(
            base_path,
            target_size=(input_size[0], input_size[1]),
            batch_size=64,
            subset='validation',
            classes=classes,
            shuffle = False
        )
    return train_generator, test_generator

In [None]:
# ======================= CREATE SINGLE DATA SET =========================== #

def create_single_data_set(base_path, batch_size, classes, shuffle=False, input_size=[331, 331]):
    '''
    Create data sets for training or testing.

    Args:
        base_path (str): path to the folder containing the images
        batch_size (int): batch size for each iteration
        classes (list): list of classes
        shuffle (bool): whether to shuffle the data or not
        input_size (list): size of the input images

    Returns:
        data_gen (ImageDataGenerator): data generator
    '''
    img_datagen = ImageDataGenerator(
        rescale=1./255,
    )

    data_gen = img_datagen.flow_from_directory(
        base_path,
        target_size=(input_size[0], input_size[1]),
        batch_size=batch_size,
        classes=classes,
        shuffle = shuffle
    )
    return data_gen

In [None]:
def define_nasnetlarge_net(class_num, lr):
    '''
    Define NASNet params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model (Sequential): model
    '''
    base_model = NASNetLarge(weights='imagenet', include_top=False, input_shape=(331, 331, 3))

    # Freeze base model layers
    base_model.trainable = False
    model = Sequential()
    model.add(base_model)
    model.add(GlobalAveragePooling2D())
    prediction = Dense(class_num, activation='softmax')
    model.add(prediction)

    # Compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model


def define_inseption_net(class_num, lr):
    # ======================= DEFINE NET InceptionV3 V3 =========================== #
    '''
    Define InceptionV3 params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model
    '''
    base_model_v3 = InceptionV3(weights='imagenet', include_top=False, input_shape=(229, 229, 3))

    # Freeze base model layers
    base_model_v3.trainable = False
    model_v3 = Sequential()
    model_v3.add(base_model_v3)
    model_v3.add(GlobalAveragePooling2D())
    prediction_v3 = Dense(class_num, activation='softmax')
    model_v3.add(prediction_v3)

    # Compile the model
    model_v3.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model_v3.summary()
    return model_v3

def define_ConvNeXtXLarge_net(class_num, lr):
    #===================== DEFINE NET ConvNeXtXLarge =============================== #
    '''
    Define ConvNeXtXLarge params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model
    '''
    base_model_convnext = ConvNeXtXLarge(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

    # Freeze base model layers
    base_model_convnext.trainable = False
    model_convnext = Sequential()
    model_convnext.add(base_model_convnext)
    model_convnext.add(GlobalAveragePooling2D())
    prediction_convnext = Dense(class_num, activation='softmax')
    model_convnext.add(prediction_convnext)

    # Compile model
    model_convnext.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model_convnext.summary()
    return model_convnext


def define_vgg19_net(class_num, lr):
    '''
    Define vgg19 params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model
    '''
    # This net showed performance at the very low range for this problem for both cropped and un cropped (cropped was better)
    base_model = VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = False
    model = Sequential()
    model.add(base_model)
    model.add(GlobalAveragePooling2D())
    model.add(Dense(class_num, activation='softmax'))
    opt = Adam(learning_rate=lr)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

def define_resnet152_net(class_num, lr):
    '''
    Define resnet params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model
    '''
    base_model = ResNet152V2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = False
    model = Sequential()
    model.add(base_model)
    model.add(GlobalAveragePooling2D())
    model.add(Dense(class_num, activation='softmax'))
    opt = Adam(learning_rate=lr)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

def define_xception_net(class_num, lr):
    '''
    Define xception params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model
    '''
    base_model = Xception(weights='imagenet', include_top=False, input_shape=(299, 299, 3))
    base_model.trainable = False
    model = Sequential()
    model.add(base_model)
    model.add(GlobalAveragePooling2D())
    model.add(Dense(class_num, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

def define_densenet201_net(class_num, lr):
    '''
    Define densenet params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model
    '''
    base_model = DenseNet201(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = False
    model = Sequential()
    model.add(base_model)
    model.add(GlobalAveragePooling2D())
    model.add(Dense(class_num, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

def define_efficientnetv2l_net(class_num, lr):
    '''
    Define efficientnet params for transfer learning.

    Args:
        class_num (int): number of classes
        lr (float): learning rate

    Returns:
        model
    '''
    # This net showed performance at the very low range for this problem both
    # for cropped and uncropped images (cropped was better)
    base_model = EfficientNetV2L(weights='imagenet', include_top=False, input_shape=(384, 384, 3))
    base_model.trainable = False
    model = Sequential()
    model.add(base_model)
    model.add(GlobalAveragePooling2D())
    model.add(Dense(class_num, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model


In [None]:
def train_net(model, train_generator, test_generator, snapshot_dir='/content/drive/MyDrive/DogProject/snapshots/', epochs=20):
    # ======================= TRAIN THE NET =========================== #
    '''
    Run transfer learning on a net.

    Args:
        model (Sequential): model to train
        train_generator (ImageDataGenerator): training data generator
        test_generator (ImageDataGenerator): testing data generator
        snapshot_dir (str): directory to save snapshots
        epochs (int): number of epochs to train

    '''
    checkpoint_filepath = '{}/check{}.keras'.format(snapshot_dir, time.strftime("%Y-%m-%d_%H:%M:%S"))
    model_checkpoint_callback = keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath,
        monitor='val_accuracy',
        mode='max',
        save_best_only=True)

    # Train the model
    # Pass the generators directly to model.fit
    h = model.fit(train_generator, epochs=epochs, validation_data=test_generator, callbacks=[model_checkpoint_callback]) # Added [] around callback
    t = time.strftime("%Y-%m-%d_%H:%M:%S")
    # Save the model
    if not os.path.isdir(snapshot_dir):
        os.mkdir(snapshot_dir)
    # Save the model in the native Keras format
    model.save(os.path.join(snapshot_dir, 'dog_breed_classifier_{}.keras'.format(t)))
    # save test file names
    names = test_generator.image_filenames if hasattr(test_generator, 'image_filenames') else test_generator.filenames
    pickle.dump(names, open('/content/drive/MyDrive/DogProject/test_filenames_{}.pkl'.format(t), 'wb'))
    # pickle.dump(test_generator.class_indices, open('/content/drive/MyDrive/MyDrive/DogProject/class_indices_{}.pkl'.format(t), 'wb'))

In [None]:
def predict_image(model, image_path, class_indices, size=[331, 331], show_five=False, plot_image=False):
    '''
    Predict the breed of a dog from an image.

    Args:
        model (Sequential): model to use for prediction
        image_path (str): path to the image
        class_indices (dict): dictionary mapping class indices to class names
        size (list): size of the input image
        show_five (bool): whether to show the top 5 predictions
        plot_image (bool): whether to draw the image

    Returns:
        preds (list): list of predictions
    '''
    resize_and_rescale = tf.keras.Sequential([
        keras.layers.Resizing(size[0], size[1]),
        keras.layers.Rescaling(1./255)
        ])
    xs = image.img_to_array(image_path)
    xs = resize_and_rescale(xs)
    xs = np.expand_dims(xs, axis=0)

    # Make predictions
    preds = model.predict(xs)
    predicted_class_index = np.argmax(preds)

    # class_inds = pickle.load(open('/content/drive/MyDrive/DogProject/class_indices.pkl', 'rb'))
    class_labels = list(class_indices.keys())
    predicted_class = class_labels[predicted_class_index]
    print(f"Predicted dog breed: {predicted_class}")
    if show_five:
        top_5_indices = np.argsort(preds[0])[::-1][:5]  # Get indices of top 5 predictions
        top_5_probabilities = preds[0][top_5_indices]
        top_5_classes = [class_labels[i] for i in top_5_indices]

        print("Top 5 predictions:")
        for i in range(5):
            print(f"{top_5_classes[i]}: {top_5_probabilities[i]:.4f}")

    if plot_image:
        # Display the image
        plt.imshow(xs[0])
        plt.title(f"Predicted breed: {predicted_class}")
        plt.axis('off')
        plt.show()

    return preds


In [None]:
def create_conf_mat(model, test_generator):
    '''
    Create a confusion matrix for a model.

    Args:
        model (Sequential): model to use for prediction
        test_generator (ImageDataGenerator): testing data generator

    '''
    pred = model.predict(test_generator)
    y_pred = pred.argmax(axis=1)
    y_true = test_generator.classes
    cm = confusion_matrix(y_true, y_pred)

    # Plot the confusion matrix
    plt.figure(figsize=(100, 80))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=list(test_generator.class_indices.keys()),
                yticklabels=list(test_generator.class_indices.keys()))
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

In [None]:
def get_worst_pred(cm, class_ind, thresh=6):
    '''
    Get the classes with the worst predictions in a confusion matrix.

    Args:
        cm (np.array): confusion matrix
        class_ind (list): list of class indices
        thresh (int): threshold for the number of bad predictions in confusion matrix

    Returns:
        classes (list): list of classes with the worst predictions
    '''
    classes = []
    for i in range(len(class_ind)):
        for j in range(len(class_ind)):
            if i != j and (cm[i][j] > thresh or cm[j][i] > thresh):
                print(i, j, cm[i][j], cm[j][i])
                print('Between: {} and {}'.format(class_ind[i], class_ind[j]))
                classes.append((i, j))
    return classes

In [None]:
def lime_explain(model, class_labels, image):
    '''
    Explain the prediction of a model using LIME.

    Args:
        model (Sequential): model to use for prediction
        class_labels (list): list of class labels
        image (np.array): image to explain
    '''
    fig, ax = plt.subplots(2, 6, sharex='col', sharey='row')
    fig.set_figwidth(20)
    fig.set_figheight(16)

    explainer = lime_image.LimeImageExplainer(random_state=42)
    explanation = explainer.explain_instance(image[0], model.predict, top_labels=5, num_samples=1000, random_seed=42)
    ax[0, 0].imshow(image[0])
    ax[0, 0].set_title(class_labels[explanation.top_labels[0]])
    for ind, i in enumerate(explanation.top_labels):
        temp, mask = explanation.get_image_and_mask(i, positive_only=True, num_features=5, hide_rest=True)
        ax[0,ind+1].imshow(mark_boundaries(temp / 2 + 0.5, mask))
        ax[0,ind+1].set_title('{}'.format(class_labels[i]))



In [None]:
def show_image(img_path):
    '''
    Show an image.

    Args:
        img_path (str): path to the image
    '''
    img_path = Image.open(img_path).convert("RGB")
    display(img_path)

In [None]:
def filter_images(img_dir, filter_list, filtered_dir=None):
    '''
    Filter images in a directory.

    Args:
        img_dir (str): path to the directory containing the images
        filter_list (list): list of images to filter out
        filtered_dir (str): path to the directory to save the filtered images
    '''
    if filtered_dir is None:
        for img in filter_list:
            if os.path.isfile(img):
                os.remove(img)
    else:
        for img in filter_list:
            if os.path.isfile(img):
                shutil.move(img, filtered_dir)

==== Functions below are for the future work of concat net =====

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.utils import Sequence
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator

class DualInputDataGenerator(Sequence):
    def __init__(self, directory1, directory2, image_filenames, labels, batch_size, target_size=(331, 331), shuffle=True):
        self.directory1 = directory1
        self.directory2 = directory2
        self.image_filenames = image_filenames
        self.labels = labels
        self.batch_size = batch_size
        self.target_size = target_size
        self.shuffle = shuffle
        self.indices = np.arange(len(self.image_filenames))
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __len__(self):
        return int(np.ceil(len(self.image_filenames) / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch_filenames = [self.image_filenames[i] for i in batch_indices]
        batch_labels = [self.labels[i] for i in batch_indices]

        images1 = []
        images2 = []
        for filename in batch_filenames:
            img_path1 = os.path.join(self.directory1, filename)
            img_path2 = os.path.join(self.directory2, filename)

            img1 = image.load_img(img_path1, target_size=self.target_size)
            img1 = image.img_to_array(img1)
            img1 /= 255.0  # Rescale

            img2 = image.load_img(img_path2, target_size=self.target_size)
            img2 = image.img_to_array(img2)
            img2 /= 255.0  # Rescale

            images1.append(img1)
            images2.append(img2)

        # Convert to TensorFlow tensors
        input_tensors = [tf.constant(np.array(images1), dtype=tf.float32), tf.constant(np.array(images2), dtype=tf.float32)]
        label_tensor = tf.constant(np.array(batch_labels), dtype=tf.float32) # Labels are already one-hot, typically float

        return input_tensors, label_tensor

    @property
    def output_signature(self):
        # Define the expected output signature (inputs and labels)
        input_signature = [
            tf.TensorSpec(shape=(None, self.target_size[0], self.target_size[1], 3), dtype=tf.float32),
            tf.TensorSpec(shape=(None, self.target_size[0], self.target_size[1], 3), dtype=tf.float32)
        ]
        label_signature = tf.TensorSpec(shape=(None, self.labels.shape[1]), dtype=tf.float32) # Assuming labels are one-hot encoded

        return input_signature, label_signature


    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

def create_dual_input_generators(train_dir1, train_dir2, val_dir1, val_dir2, batch_size, input_size=[331, 331]):
    # Assuming the subdirectories in train_dir1 and val_dir1 (or dir2) correspond to classes
    # and the filenames within those subdirectories are the same for corresponding images

    class_names = sorted(os.listdir(train_dir1))
    class_indices = dict((name, index) for index, name in enumerate(class_names))

    train_filenames = []
    train_labels = []
    val_filenames = []
    val_labels = []

    # Process training data
    for class_name in class_names:
        class_dir1 = os.path.join(train_dir1, class_name)
        filenames = os.listdir(class_dir1)
        train_filenames.extend([os.path.join(class_name, f) for f in filenames])
        train_labels.extend([class_indices[class_name]] * len(filenames))

    # Process validation data
    for class_name in class_names:
        class_dir1 = os.path.join(val_dir1, class_name)
        filenames = os.listdir(class_dir1)
        val_filenames.extend([os.path.join(class_name, f) for f in filenames])
        val_labels.extend([class_indices[class_name]] * len(filenames))


    # Convert labels to one-hot encoding
    train_labels = tf.keras.utils.to_categorical(train_labels, num_classes=len(class_names))
    val_labels = tf.keras.utils.to_categorical(val_labels, num_classes=len(class_names))


    train_generator = DualInputDataGenerator(
        directory1=train_dir1,
        directory2=train_dir2,
        image_filenames=train_filenames,
        labels=train_labels,
        batch_size=batch_size,
        target_size=input_size,
        shuffle=True
    )

    test_generator = DualInputDataGenerator(
        directory1=val_dir1,
        directory2=val_dir2,
        image_filenames=val_filenames,
        labels=val_labels,
        batch_size=batch_size, # You might want a different batch size for validation
        target_size=input_size,
        shuffle=False # No need to shuffle validation data
    )

    return train_generator, test_generator, class_names

In [None]:
from tensorflow.keras.layers import Input, concatenate

def define_concat_nasnetlarge_net(class_num):
    # Define two separate input layers
    input_1 = Input(shape=(331, 331, 3), name='input_1')
    input_2 = Input(shape=(331, 331, 3), name='input_2')

    # Define two NASNetLarge base models, one for each input with unique names
    base_model_1 = NASNetLarge(weights='imagenet', include_top=False, input_shape=(331, 331, 3), name='nasnet_large_1')
    base_model_2 = NASNetLarge(weights='imagenet', include_top=False, input_shape=(331, 331, 3), name='nasnet_large_2')

    # Process each input through its respective base model
    x1 = base_model_1(input_1)
    x2 = base_model_2(input_2)

    # Add GlobalAveragePooling2D to each branch
    x1 = GlobalAveragePooling2D()(x1)
    x2 = GlobalAveragePooling2D()(x2)

    # Concatenate the outputs of the two branches
    merged = concatenate([x1, x2])

    # Add a dense layer for classification
    prediction = Dense(class_num, activation='softmax')(merged)

    # Create the model with two inputs and one output
    model = Model(inputs=[input_1, input_2], outputs=prediction)

    # Compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.summary()
    return model

In [None]:
def predict_accuracy(model, test_data):
    pred = model.predict(test_data)
    y_pred = pred.argmax(axis=1)
    y_true = test_data.classes
    acc = float(sum((y_pred == y_true))/len(y_pred))
    error_index = np.where(y_pred != y_true)[0]
    return acc, error_index

In [None]:

def prepare_5_fold(data_list):
    dict_by_breed = {}
    split_dict = {}
    for f in data_list:
        dict_by_breed.setdefault(os.path.dirname(f), []).append(f)

    for breed, fs in dict_by_breed.items():
        kf = KFold(n_splits=5, shuffle=True, random_state=42)
        for i, (train_index, test_index) in enumerate(kf.split(fs)):
            fs = np.array(fs)
            sub_dict = split_dict.setdefault(i, {})
            train, validation = train_test_split(fs[train_index], test_size=0.1, random_state=42)
            sub_dict.setdefault('test', []).extend(fs[test_index])
            sub_dict.setdefault('train', []).extend(train)
            sub_dict.setdefault('validation', []).extend(validation)
    return split_dict
