# Import und Initialisierung

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import cv2
import tensorflow as tf
import numpy as np
import absl.logging
import random

In [3]:
# so 'WARNING:absl:Found untraced functions' does not show up again
absl.logging.set_verbosity(absl.logging.ERROR)

# flags for easier changing of code behavior
create_new_dataset = False
# if you want to train a model
training = True
# if you want to load a trained model
load_model = None # 'model_vgg19_base_dense50relu_batchnorm_dense10softmax_4epoch_10labels_alt_data'
# if you want to evaluate a loaded OR trained model
validation = False
# if you want to do final evaluation on a loaded OR trained model
testing = False
single_label_testing = False
# if you want to save a model and do a test or validation evaluation on a model
model_file_name = 'model_vgg19_base_dense50relu_batchnorm_dense10softmax_4epoch_10labels_benchmark_local_vs_colab'
# self-explanatory
number_of_epochs = 4

directory_names = [
    'Viburnum tinus',
    'Crataegus monogyna',
    'Hedera helix',
    'Ulmus minor',
    'Arbutus unedo',
    'Platanus x',
    'Robinia pseudoacacia',
    'Buxus sempervirens',
    'Betula pendula',
    'Corylus avellana'
]

# batch_size = 32
filepath = '/content/drive/MyDrive/colab_project/'
training_folder = 'training_data'       # 70% -> 700 per label, but everything x3 for unfiltered data
validation_folder = 'validation_data'   # 20% -> 200 per label
testing_folder = 'testing_data'         # 10% -> 100 per label
dataset_folder = 'dataset'
checkpoint_folder = 'model_checkpoints'
single_label_testing_folder = 'single_label_testing_folder'

# utils

image_augmenter

In [4]:
# shapes an image and rotates it by a random amount
def rotate_image_random(image, shape=None):
    if shape is not None:
        image = reshape_image(image, shape)
    height, width = image.shape[:2]
    angle = random.randint(0, 360)
    rotation_matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1)
    rotated_image = cv2.warpAffine(image, rotation_matrix, (width, height))
    mask = cv2.inRange(rotated_image, (0, 0, 0), (0, 0, 0))
    rotated_image[mask == 255] = (255, 255, 255)

    return rotated_image

def change_white_background(image, background_image=None, shape=None):

    # Convert image to HSV color space
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    # Define lower and upper bounds for white color
    lower_white = np.array([0, 0, 100])
    upper_white = np.array([360, 25, 255])

    # Create a mask for white pixels
    mask = cv2.inRange(hsv, lower_white, upper_white)

    # Perform closing operation on white pixels
    kernel = np.ones((3, 3), np.uint8)
    closed_mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)

    # Invert the mask
    inverted_mask = cv2.bitwise_not(closed_mask)

    # Apply the mask to the image
    result = cv2.bitwise_and(image, image, mask=inverted_mask)

    if background_image is not None:
        background_image = reshape_image(background_image, shape)
        random_color_image = background_image
    else:
        # Generate a random color in BGR format with the same shape and dtype as the image
        random_color_image = np.full(image.shape, np.random.randint(0, 256, size=3, dtype=np.uint8))

    # Replace the white pixels with the random color
    background = cv2.bitwise_and(random_color_image, random_color_image, mask=closed_mask)

    # Combine the result and the background
    final_image = cv2.add(result, background)

    cv2.waitKey()
    cv2.destroyAllWindows()

    return final_image

def closing_operation(image, rgb_color):
    black_image = np.zeros(image.shape, dtype=np.uint8)
    # create a mask that has 1 for every pixel of the color rgb_color
    mask = cv2.inRange(image, rgb_color - 1, rgb_color + 1)

    # reverse mask, so foreground is 1
    reversed_mask = cv2.bitwise_not(mask)

    kernel = np.ones((3, 3), np.uint8)

    closed_mask = cv2.morphologyEx(reversed_mask, cv2.MORPH_CLOSE, kernel)

    result = cv2.bitwise_and(image, image, mask=closed_mask)

    return result

def reshape_image(image, shape):
    height, width = image.shape[:2]

    ratio = min(shape[0] / width, shape[1] / height)
    new_size = (int(width * ratio), int(height * ratio))

    resized_img = cv2.resize(image, new_size, interpolation=cv2.INTER_AREA)

    top = ((shape[1] - new_size[1]) // 2)
    bottom = ((shape[1] - new_size[1]) // 2)
    left = ((shape[0] - new_size[0]) // 2)
    right = ((shape[0] - new_size[0]) // 2)

    if (shape[1] - new_size[1]) % 2 == 1:
        top = top + 1
    if (shape[0] - new_size[0]) % 2 == 1:
        right = right + 1

    result = cv2.copyMakeBorder(resized_img,
                                top,
                                bottom,
                                left,
                                right,
                                cv2.BORDER_CONSTANT,
                                value=(255, 255, 255))

    return result

image_filter

In [5]:
def filter_by_white_background(images, filenames=None):
    # create an empty list to store filtered images
    filtered_images = []
    filtered_images_filenames = []

    # loop through images
    # for image in images:
    for i in range(len(images)):
        # convert the image to HSV color space
        hsv = cv2.cvtColor(images[i], cv2.COLOR_BGR2HSV)

        # define lower and upper bounds for white
        lower_white = np.array([0, 0, 100])
        upper_white = np.array([360, 25, 255])

        # create mask for white color
        mask = cv2.inRange(hsv, lower_white, upper_white)

        # count number of white pixels in mask
        white_pixels = np.sum(mask == 255)

        # calculate percentage of white pixels in mask
        white_percentage = white_pixels / (mask.shape[0] * mask.shape[1])

        # if percentage is above a certain threshold, add image to filtered list
        if white_percentage > 0.5:
            filtered_images.append(images[i])
            if filenames is not None:
                filtered_images_filenames.append(filenames[i])

    # return filtered list
    if len(filtered_images_filenames) > 0 and len(filtered_images) > 0:
        return filtered_images, filtered_images_filenames
    elif len(filtered_images) > 0:
        return filtered_images
    elif filenames is None:
        return None
    else:
        return None, None

image_handler

In [6]:
def load_images_from_folder(folder_path, start_index=0, amount=-1):
    # containers
    images = []
    filenames = []

    # get all filenames:
    for filename in os.listdir(folder_path):
        filenames.append(filename)

    # if wanted amount is bigger than actual amount of files existing in the directory
    if amount > len(filenames) or amount == -1:
        amount = len(filenames)

    # remove file extension from filename
    filenames = sorted(filenames, key=lambda x: int(os.path.splitext(x)[0]))

    # only load the files we want (from start_index to start_index + amount)
    for filename in filenames[start_index:start_index + amount]:
        img = cv2.imread(os.path.join(folder_path, filename))
        if img is not None:
            images.append(img)

    return images, filenames[start_index:start_index + amount]

# displays the given images, space to go to next image, ESC to abort
def display_images(images):
    for img in images:
        cv2.namedWindow('image')
        cv2.moveWindow('image', 50, 50)
        cv2.imshow('image', img)
        k = cv2.waitKey(0)
        if k == 27:         # wait for ESC key to exit display mode
            cv2.destroyAllWindows()
            break
        elif k == 32:       # wait for space bar to show next image
            continue
    cv2.destroyAllWindows()

# displays just one image until a any key is pressed
def display_image(img, window_title=None):
    if window_title is not None:
        cv2.namedWindow(window_title)
        cv2.moveWindow(window_title, 50, 50)
        cv2.imshow(window_title, img)
    else:
        cv2.namedWindow('image')
        cv2.moveWindow('image', 50, 50)
        cv2.imshow('image', img)
    cv2.waitKey()
    cv2.destroyAllWindows()

# used to load a dataset into a dict. structure is like this: dict{'label' : (images_for_label, filenames_for_images)}
def load_dataset(filepath, label_names, image_shape=(299, 299), multiplier=1, limit=None, use_originals=False, filter_white_background=False, augment_filtered_dataset=False, backgrounds_foldername=None, only_resize_and_rotate=False):
    if only_resize_and_rotate:
        augment_filtered_dataset = False
        filter_white_background = False

    # dict of 'latin name' : (images, filenames)
    dataset = {}

    # load every image for each wanted label
    for label_name in label_names:
        print(f'now loading: {filepath}/{label_name}')
        images, filenames = load_images_from_folder(f'{filepath}/{label_name}')

        # if the background should be filled with a random image instead of a random color
        if backgrounds_foldername is not None:
            background_images, background_filenames = load_images_from_folder('D:/Repositories/Datasets/PlantCLEF/our_selected_dataset/random_backgrounds') # f'{filepath}/{backgrounds_foldername}')

        # filter for white background if wanted
        if images is not None and filenames is not None:

            if filter_white_background:
                filtered_images, filenames = filter_by_white_background(images, filenames)

                # rotate image and change background color if wanted
                if augment_filtered_dataset:

                    # calculate how to reach limit if wanted
                    if limit is not None:
                        amount_of_images = len(filtered_images)
                        amount_of_images_to_create = limit - amount_of_images
                        multiplier = amount_of_images_to_create // amount_of_images
                        rest = amount_of_images_to_create % amount_of_images
                    else:
                        rest = 0

                    augmented_filtered_images = []
                    filenames_for_augmented_filtered_images = []

                    for i, image in enumerate(filtered_images):

                        if rest > 0:
                            actual_multiplier = multiplier + 1
                            rest -= 1
                        else:
                            actual_multiplier = multiplier

                        if use_originals:
                            augmented_filtered_images.append(reshape_image(image, image_shape))
                            augmented_filename = filenames[i].split('.')
                            augmented_filename = str(augmented_filename[0]) + '_original.' + augmented_filename[
                                1]
                            filenames_for_augmented_filtered_images.append(augmented_filename)

                        for j in range(actual_multiplier):
                            # changes the images dimensions (given shape) and rotates the image
                            augmented_image = rotate_image_random(image, image_shape)

                            # select random image as background
                            """
                            if backgrounds_foldername is not None:
                                background_image = random.choice(background_images)
                            else:
                                background_image = None
                            """
                            # changes the background color of the image
                            augmented_image = change_white_background(augmented_image) # , background_image=background_image, shape=image_shape)

                            augmented_filtered_images.append(augmented_image)

                            # different image names are needed for every image, but we still want to know where they were derived from
                            augmented_filename = filenames[i].split('.')
                            augmented_filename = str(augmented_filename[0]) + '_v' + str(j) + '.' + augmented_filename[1]
                            filenames_for_augmented_filtered_images.append(augmented_filename)

                    dataset[label_name] = (augmented_filtered_images, filenames_for_augmented_filtered_images)

                else:
                    dataset[label_name] = (filtered_images, filenames)

            elif only_resize_and_rotate:
                # calculate amount of images to generate until limit per label is reached
                if limit is not None:
                    amount_of_images = len(images)
                    amount_of_images_to_create = limit - amount_of_images
                    multiplier = amount_of_images_to_create // amount_of_images
                    rest = amount_of_images_to_create % amount_of_images
                else:
                    rest = 0

                augmented_images = []
                filenames_for_augmented_images = []

                for i, image in enumerate(images):

                    if rest > 0:
                        actual_multiplier = multiplier + 1
                        rest -= 1
                    else:
                        actual_multiplier = multiplier

                    if use_originals:
                        augmented_images.append(reshape_image(image, image_shape))
                        augmented_filename = filenames[i].split('.')
                        augmented_filename = str(augmented_filename[0]) + '_original.' + augmented_filename[1]
                        filenames_for_augmented_images.append(augmented_filename)

                    for j in range(actual_multiplier):
                        # changes the images dimensions (given shape) and rotates the image
                        augmented_image = rotate_image_random(image, image_shape)

                        augmented_images.append(augmented_image)

                        # different image names are needed for every image, but we still want to know where they were derived from
                        augmented_filename = filenames[i].split('.')
                        augmented_filename = str(augmented_filename[0]) + '_v' + str(j) + '.' + augmented_filename[1]
                        filenames_for_augmented_images.append(augmented_filename)

                dataset[label_name] = (augmented_images, filenames_for_augmented_images)

            else:
                dataset[label_name] = (images, filenames)
        else:
            print('ERROR: length of list of images and length of list of images filenames are different')

    return dataset

# converts our dataset dict to two lists of images and filenames
def convert_dataset_to_arrays(old_dataset):
    images = []
    labels = []

    for label in old_dataset:
        for i in range(len(old_dataset[label][0])):
            try:
                images.append(old_dataset[label][0][i])
                labels.append(label)
            except TypeError:
                print(f"WARNING: label '{label}' has no images in the dataset")

    return images, labels

def augment_and_save_dataset(filepath_to_data, directory_names, dataset_name, limit, random_backgrounds=None, only_resize_and_rotate=False):
    filepath_to_data = filepath_to_data + dataset_name

    my_dataset = load_dataset(filepath_to_data,
                              directory_names,
                              image_shape=(299, 299),
                              use_originals=True,
                              limit=limit,
                              filter_white_background=True,
                              augment_filtered_dataset=True,
                              only_resize_and_rotate=only_resize_and_rotate,
                              backgrounds_foldername=random_backgrounds)
    save_dataset(my_dataset, folder_name=dataset_name)

def load_tensorflow_dataset_from_folder(filepath=None, folder_name=None):
    # catch unwanted behavior
    if filepath is not None and folder_name is not None:
        raise Exception('Set only one of the two parameters')
    elif filepath is not None:
        filepath_to_data = filepath
    elif folder_name is not None:
        filepath_to_data = os.path.join(os.getcwd(), 'data', folder_name)
    else:
        raise Exception('Set one of the two parameters')

    # load the training dataset
    training_dataset = tf.keras.utils.image_dataset_from_directory(filepath_to_data, labels='inferred')

    # convert labels to one-hot encoded vectors
    training_dataset = training_dataset.map(lambda x, y: (x, tf.one_hot(y, depth=10)))

    return training_dataset

def save_dataset(data, folder_name):
    # get path to 'training_data_big' folder
    project_dir = os.getcwd()

    # create data folder
    if not os.path.exists('data'):
        os.mkdir('data')

    # change current dir to 'data' folder
    os.chdir(os.path.join(project_dir, 'data'))

    # create folder
    if not os.path.exists(folder_name):
        os.mkdir(folder_name)

    # change current dir to <folder_name> folder
    os.chdir(os.path.join(project_dir, 'data', folder_name))

    # iterate over classes of our training data
    for label in data.keys():

        # create folder for class if it did not exist already
        if not os.path.exists(os.path.join(label)):
            os.mkdir(label)

        # iterate over images and filenames of these images
        for i in range(len(data[label][0])):
            # save the image in it using the filename of the image
            cv2.imwrite(os.path.join(label, data[label][1][i]), data[label][0][i])

    # change current dir back to project dir
    os.chdir(project_dir)

# Datensatz

In [7]:
# create own dataset
if create_new_dataset:
    dataset_filepath = filepath + dataset_folder
    
    augment_and_save_dataset(dataset_filepath, directory_names, training_folder, limit=700) # , only_resize_and_rotate=True)
    
    augment_and_save_dataset(dataset_filepath, directory_names, validation_folder, limit=200) # , only_resize_and_rotate=True)
    
    augment_and_save_dataset(dataset_filepath, directory_names, testing_folder, limit=100) # , only_resize_and_rotate=True)


In [8]:
# load, filter and augment the dataset
image_path = filepath + training_folder
my_tensorflow_dataset = tf.keras.utils.image_dataset_from_directory(image_path, labels='inferred')

# convert labels to one-hot encoded vectors
my_tensorflow_dataset = my_tensorflow_dataset.map(lambda x, y: (x, tf.one_hot(y, depth=10)))

Found 3000 files belonging to 3 classes.


# CNN Model

In [9]:
# load inceptionv3 model base
# inception_v3.InceptionV3(weights='imagenet', include_top=False, input_shape=(256, 256, 3))
# vgg16.VGG16(weights='imagenet', include_top=False, input_shape=(256, 256, 3))
# vgg19.VGG19(weights='imagenet', include_top=False, input_shape=(256, 256, 3))
# nasnet.NASNetMobile(weights='imagenet', include_top=False, input_shape=(256, 256, 3))
# nasnet.NASNetLarge(weights='imagenet', include_top=False, input_shape=(256, 256, 3))  !!! ATTENTION !!! MEMORY ERROR !!!

model_base = tf.keras.applications.vgg19.VGG19(weights='imagenet', include_top=False, input_shape=(256, 256, 3))
# freeze trained layers
for layer in model_base.layers:
    layer.trainable = False

# base up until dense layers
z = model_base.output
z = tf.keras.layers.Flatten()(z)

# add own, untrained layers
z = tf.keras.layers.Dense(units=50, activation='relu')(z)
# z = tf.keras.layers.Dense(units=100, activation='relu')(z)

# ideas to try out:
# model.add(layers.BatchNormalization()) # after dense layer
z = tf.keras.layers.BatchNormalization()(z)

# add output layer
predictions = tf.keras.layers.Dense(units=10, activation='softmax')(z)
model = tf.keras.models.Model(inputs=model_base.input, outputs=predictions)

# lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.1,
#                                                              decay_steps=100000,
#                                                              decay_rate=0.96,
#                                                              staircase=True)

# compile with optimizer and loss
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy'])

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5


# Model trainieren

In [17]:
checkpoint_filepath = filepath + '/' + checkpoint_folder + '/' + 'model1.h5'

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

model.fit(my_tensorflow_dataset, epochs=10, callbacks=[model_checkpoint_callback])

In [None]:
if training:

    # create own callback for stats logging
    class TrainingStats(tf.keras.callbacks.Callback):
        def __init__(self, stats_filepath):
            super().__init__()
            self.stats_filepath = stats_filepath

        def on_epoch_end(self, epoch, logs=None):
            if logs is None:
                logs = {}
            with open(self.stats_filepath, "a") as f:
                f.write(f"\nEpoch {epoch}: Loss={logs.get('loss')}, Accuracy={logs.get('accuracy')}\n")

    # create callback for tensorboard
    tensorboard_training_log = tf.keras.callbacks.TensorBoard(log_dir='logs\\{}'.format(model_file_name))
    training_dataset = load_tensorflow_dataset_from_folder(folder_name=training_folder)
    model.fit(training_dataset, epochs=number_of_epochs, callbacks=[TrainingStats("training_stats.txt"), tf.keras.callbacks.ModelCheckpoint("model.h5")])

if model_file_name is not None:
    model.save('trained_models/' + str(model_file_name))

if load_model is not None:
    load_model_name = 'trained_models/' + str(load_model)
    model = tf.keras.models.load_model(load_model_name)

if validation:
    if model_file_name is not None:
        model_name = model_file_name
    elif load_model is not None:
        model_name = load_model

    # create callback for tensorboard
    # tensorboard_validation_log = tf.keras.callbacks.TensorBoard(log_dir='logs\\{}'.format(str(model_name) + 'validation'))
    validation_dataset = load_tensorflow_dataset_from_folder(folder_name=validation_folder)

    loss, acc = model.evaluate(validation_dataset) #, callbacks=[tensorboard_validation_log])
    print('model accuracy:', acc)
    print('model loss:', loss)

if testing:
    # create callback for tensorboard
    # tensorboard_testing_log = tf.keras.callbacks.TensorBoard(log_dir='logs\\{}'.format(str(model_file_name) + 'testing'))
    testing_dataset = load_tensorflow_dataset_from_folder(folder_name=testing_folder)
    loss, acc = model.evaluate(testing_dataset) #, callbacks=[tensorboard_testing_log])
    print('model accuracy:', acc)
    print('model loss:', loss)

if single_label_testing:
    # create callback for tensorboard
    # tensorboard_testing_log = tf.keras.callbacks.TensorBoard(log_dir='logs\\{}'.format(str(model_file_name) + 'single_testing'))
    single_label_testing_dataset = load_tensorflow_dataset_from_folder(folder_name=single_label_testing_folder)
    loss, acc = model.evaluate(single_label_testing_dataset) #, callbacks=[tensorboard_testing_log])
    print('model accuracy:', acc)
    print('model loss:', loss)
