# Start


In [None]:
import os
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from keras.layers import (
    Activation,
    BatchNormalization,
    Conv2D,
    Dense,
    Dropout,
    Flatten,
    InputLayer,
    MaxPooling2D,
    Rescaling,
)
from keras.models import Sequential

In [None]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
tf.config.experimental.enable_op_determinism()
tf.random.set_global_generator(tf.random.Generator.from_seed(SEED))
os.environ["TF_DETERMINISTIC_OPS"] = "1"

## Definitions


### Main


In [None]:
BATCH_SIZE = 64
EPOCHS = 100


def get_dataset(size, color_mode, clearing, rgb_skip):
    if rgb_skip and color_mode == "rgb":
        ds = tf.keras.preprocessing.image_dataset_from_directory(
            f"../data/processed/{clearing}/{color_mode}_skip/{size}x{size}",
            shuffle=True,
            image_size=(size, size),
            batch_size=BATCH_SIZE,
            color_mode=color_mode,
        )
    else:
        ds = tf.keras.preprocessing.image_dataset_from_directory(
            f"../data/processed/{clearing}/{color_mode}/{size}x{size}",
            shuffle=True,
            image_size=(size, size),
            batch_size=BATCH_SIZE,
            color_mode=color_mode,
        )

    # Split dataset
    data_size = len(ds)
    train_split = 0.7
    val_split = 0.2
    test_split = 0.1

    train_size = int(train_split * data_size)
    val_size = int(val_split * data_size)

    train_ds = ds.take(train_size)
    val_ds = ds.skip(train_size).take(val_size)
    test_ds = ds.skip(train_size).skip(val_size)

    # Cache, shuffle and prefetch
    buffer_size = len(train_ds) * BATCH_SIZE
    buffer_size

    number_of_images = train_ds.cardinality().numpy() * BATCH_SIZE
    AUTOTUNE = tf.data.AUTOTUNE
    train_ds = train_ds.cache().shuffle(number_of_images, seed=SEED).prefetch(buffer_size=AUTOTUNE)
    val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

    return train_ds, val_ds, test_ds


def get_input_shape(train_ds):
    for batch, _ in train_ds.take(1):
        return batch.shape

In [None]:
data_augmentation = tf.keras.Sequential(
    [
        tf.keras.layers.RandomFlip("horizontal"),
        tf.keras.layers.RandomRotation(0.1),
        tf.keras.layers.RandomZoom(0.1),
        tf.keras.layers.RandomContrast(0.1),
        tf.keras.layers.RandomBrightness(0.1),
    ]
)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True)

In [None]:
def show_accuracy(history):
    plt.plot(history.history["accuracy"], label="Training Accuracy")
    plt.plot(history.history["val_accuracy"], label="Validation Accuracy")
    plt.title("Model accuracy")
    plt.ylabel("Accuracy")
    plt.xlabel("Epoch")
    plt.legend(["Train", "Validation"], loc="upper left")
    plt.show()


def show_loss(history):
    plt.plot(history.history["loss"], label="Training Loss")
    plt.plot(history.history["val_loss"], label="Validation Loss")
    plt.title("Model loss")
    plt.ylabel("Loss")
    plt.xlabel("Epoch")
    plt.legend(["Train", "Validation"], loc="upper left")
    plt.show()


def show_evaluation(model, val_ds, history):
    loss, acc = model.evaluate(val_ds, batch_size=BATCH_SIZE)
    print(f"Loss: {round(loss, 3)}, Acc: {round(acc*100, 3)}%")
    show_accuracy(history)
    show_loss(history)


def show_evaluation_df(history):
    plt.plot(history["accuracy"], label="Training Accuracy")
    plt.plot(history["val_accuracy"], label="Validation Accuracy")
    plt.title("Model accuracy")
    plt.ylabel("Accuracy")
    plt.xlabel("Epoch")
    plt.legend(["Train", "Validation"], loc="upper left")
    plt.show()

    plt.plot(history["loss"], label="Training Loss")
    plt.plot(history["val_loss"], label="Validation Loss")
    plt.title("Model loss")
    plt.ylabel("Loss")
    plt.xlabel("Epoch")
    plt.legend(["Train", "Validation"], loc="upper left")
    plt.show()

### Testing


In [None]:
def get_model(input_shape, learning_rate=0.001):
    model = Sequential(
        [
            # Preprocessing layers
            InputLayer(shape=input_shape[1:]),
            Rescaling(1.0 / 255),
            # Input and first conv block
            Conv2D(32, (3, 3), activation="relu"),
            MaxPooling2D((2, 2)),
            Dropout(0.1),
            # Second conv block
            Conv2D(64, (3, 3), activation="relu"),
            MaxPooling2D((2, 2)),
            Dropout(0.1),
            # Third conv block
            Conv2D(128, (3, 3), activation="relu"),
            MaxPooling2D((2, 2)),
            Dropout(0.1),
            # Flatten and dense layers
            Flatten(),
            Dense(256, activation="relu"),
            Dropout(0.1),
            Dense(64, activation="relu"),
            Dropout(0.1),
            Dense(16, activation="relu"),
            Dropout(0.1),
            Dense(3, activation="softmax"),
        ]
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

    return model

In [None]:
if not os.path.exists("../model_history"):
    os.makedirs("../model_history")

In [None]:
def test_model(clearing, color, size, rgb_skip):
    if rgb_skip:
        history_out = f"../model_history/history_{clearing}_{color}_skip_{size}.csv"
    else:
        history_out = f"../model_history/history_{clearing}_{color}_{size}.csv"
    print(f"Testing model for {clearing}, {color}, {size}, {rgb_skip}")
    if not os.path.exists(history_out):
        train_ds, val_ds, test_ds = get_dataset(size, color, clearing, rgb_skip)
        train_ds = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y))
        input_shape = get_input_shape(train_ds)

        model = get_model(input_shape)

        history = model.fit(
            train_ds,
            epochs=EPOCHS,
            validation_data=val_ds,
            batch_size=BATCH_SIZE,
            validation_batch_size=BATCH_SIZE,
            callbacks=[early_stopping],
        )

        history_df = pd.DataFrame(history.history)
        history_df.to_csv(history_out, index=False)

In [None]:
def test_model_learning(clearing, color, size, rgb_skip, learning_rate):
    if rgb_skip:
        history_out = f"../model_history/history_{clearing}_{color}_skip_{size}_learn{learning_rate}.csv"
    else:
        history_out = f"../model_history/history_{clearing}_{color}_{size}.csv"
    print(f"Testing model for {clearing}, {color}, {size}, {rgb_skip}, {learning_rate}")
    if not os.path.exists(history_out):
        train_ds, val_ds, test_ds = get_dataset(size, color, clearing, rgb_skip)
        train_ds = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y))
        input_shape = get_input_shape(train_ds)

        model = get_model(input_shape, learning_rate)

        history = model.fit(
            train_ds,
            epochs=EPOCHS,
            validation_data=val_ds,
            batch_size=BATCH_SIZE,
            validation_batch_size=BATCH_SIZE,
            callbacks=[early_stopping],
        )

        history_df = pd.DataFrame(history.history)
        history_df.to_csv(history_out, index=False)

# Testing


## Main Model


In [None]:
train_ds, val_ds, test_ds = get_dataset(64, "grayscale", "1_clearing", False)
train_ds = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y))
input_shape = get_input_shape(train_ds)

In [None]:
model = Sequential(
    [
        # Preprocessing layers
        InputLayer(shape=input_shape[1:]),
        Rescaling(1.0 / 255),
        # Input and first conv block
        Conv2D(32, (3, 3), activation="relu"),
        MaxPooling2D((2, 2)),
        Dropout(0.1),
        # Second conv block
        Conv2D(64, (3, 3), activation="relu"),
        MaxPooling2D((2, 2)),
        Dropout(0.1),
        # Third conv block
        Conv2D(128, (3, 3), activation="relu"),
        MaxPooling2D((2, 2)),
        Dropout(0.1),
        # Flatten and dense layers
        Flatten(),
        Dense(256, activation="relu"),
        Dropout(0.1),
        Dense(64, activation="relu"),
        Dropout(0.1),
        Dense(16, activation="relu"),
        Dropout(0.1),
        Dense(3, activation="softmax"),
    ]
)

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"],
)

In [None]:
history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=val_ds,
    batch_size=BATCH_SIZE,
    validation_batch_size=BATCH_SIZE,
    callbacks=[early_stopping],
)

In [None]:
show_evaluation(model, val_ds, history)

## Testing Dataset


### Baseline


In [None]:
test_model("1_clearing", "grayscale", 64, False)
test_model("1_clearing", "grayscale", 96, False)
test_model("1_clearing", "grayscale", 128, False)
test_model("1_clearing", "rgb", 64, False)
test_model("1_clearing", "rgb", 96, False)
test_model("1_clearing", "rgb", 128, False)
test_model("1_clearing", "rgb", 64, True)
test_model("1_clearing", "rgb", 96, True)
test_model("1_clearing", "rgb", 128, True)

### Increased kernel size


In [None]:
# Testing kernel mode 5,5
test_model("1_clearing", "rgb", 128, False)
test_model("1_clearing", "rgb", 128, True)

### Cleaning comparison


In [None]:
clearings = ["no_clearing", "1_clearing", "2_clearing", "3_clearing"]
for clearing in clearings:
    test_model(clearing, "rgb", 64, True)

### Learning rate


In [None]:
learning_rates = [
    0.01,
    0.005,
    0.002,
    0.0015,
    0.00075,
    0.0005,
    0.0001,
]

for learning_rate in learning_rates:
    test_model_learning("1_clearing", "rgb", 64, True, learning_rate)

In [None]:
test_model_learning("1_clearing", "rgb", 64, True, 0.0001)