In [None]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
import cv2
import os

import tensorflow as tf
from tensorflow import keras as k
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import array_to_img
from tensorflow.keras.layers import Input, Dense, Flatten, Dropout, Conv2D, MaxPooling2D, GlobalAveragePooling2D, Activation, concatenate
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
import tensorflow_addons as tfa
import keras_tuner

os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [None]:
#Variables
image_size = 256
in_channel_tool = 3
num_classes = 3
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 40
num_epochs = 100

In [None]:
#Functions
#Multimodal implementation of Trockman, A., & Kolter, J. Z. (2022). Patches are all you need?.
#https://github.com/locuslab/convmixer
#Convert images to numpy array

def read_tools(file_paths, image_size, channels):
  images = []
  
  for file_path in file_paths:
    img = cv2.imread(file_path)
    res = cv2.resize(img, dsize=(image_size, image_size), interpolation=cv2.INTER_CUBIC)
    images.append(res)
  
  images = np.asarray(images, dtype=np.float32)
  
  # normalize
  images = images / np.max(images)
  
  # reshape to match Keras expectaions
  images = images.reshape(images.shape[0], image_size, image_size, channels)

  return images

#Patch dataset

def generate_datasets(images, is_train=False):
    dataset = images
    if is_train:
        dataset = dataset.shuffle(batch_size * 10)
    dataset = dataset.batch(batch_size)
    if is_train:
        dataset = dataset.map(
            lambda x, y: (data_augmentation(x), y), num_parallel_calls=auto
        )
    return dataset.prefetch(auto)

#Model creation methods

def activation_module(x):
    x = layers.Activation("gelu")(x)
    return layers.BatchNormalization()(x)


def base_module(x, filters, patch_size):
    x = layers.Conv2D(filters, kernel_size=patch_size, strides=patch_size)(x)
    return activation_module(x)


def multi_mixer_module(x, filters, kernel_size):
    # Depthwise convolution.
    x0 = x
    x = layers.DepthwiseConv2D(kernel_size=kernel_size, padding="same")(x)
    x = layers.Add()([activation_module(x), x0])  # Residual.

    # Pointwise convolution.
    x = layers.Conv2D(filters, kernel_size=1)(x)
    x = activation_module(x)

    return x


def load_multi_mixer(hp):
    
    filters=hp.Int("filters", min_value=128, max_value=512, step=128)
    depth=hp.Int("depth", min_value=4, max_value=8, step=4)
    kernel_size=hp.Int("kernel_size", min_value=3, max_value=9, step=1)
    patch_size=hp.Int("patch_size", min_value=4, max_value=32, step=4)
    
    inputs = k.Input((image_size, image_size, in_channel_tool))
    x = layers.Rescaling(scale=1.0 / 255)(inputs)

    # Extract patch embeddings.
    x = base_module(x, filters, patch_size)

    # Multi_mixer modul.
    for _ in range(depth):
        x = multi_mixer_module(x, filters, kernel_size)

    # Classification block.
    x = layers.GlobalAvgPool2D()(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    
    model = k.Model(inputs, outputs)
    
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    
    return model

#Run experiment

def launch_experiment(model):
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

    checkpoint_filepath = "/tmp/checkpoint"
    checkpoint_callback = k.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )

    history = model.fit(
        new_train_dataset,
        validation_data=new_val_dataset,
        epochs=num_epochs,
        shuffle=True,
        callbacks=[checkpoint_callback],
    )

    model.load_weights(checkpoint_filepath)
    _, accuracy = model.evaluate(new_test_dataset)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, model

In [None]:
#Dataset acquisition
train_df = pd.read_csv('../Data/Labels/train.csv', index_col=0)
test_df = pd.read_csv('../Data/Labels/test.csv', index_col=0)
val_df = pd.read_csv('../Data/Labels/test.csv', index_col=0)

train_df['tool'] = train_df.index.map(lambda id: f'../Data/Datasets/tool/{id}.jpg')
test_df['tool'] = test_df.index.map(lambda id: f'../Data/Datasets/tool/{id}.jpg')
val_df['tool'] = val_df.index.map(lambda id: f'../Data/Datasets/tool/{id}.jpg')

In [None]:
#Plot dataset distribution
plt.figure(figsize=(6,6))
sns.displot(train_df['tool_label'])
plt.title('Label Distribution')
plt.show()

In [None]:
#Read tool images and convert them to NumPy array
x_train_tool = read_tools(train_df.tool.values, image_size, in_channel_tool)
x_test_tool = read_tools(test_df.tool.values, image_size, in_channel_tool)
x_val_tool = read_tools(val_df.tool.values, image_size, in_channel_tool)
#Bring the labels to an acceptable form
labels_train = train_df.tool_label.values - 1
labels_test = test_df.tool_label.values - 1
labels_val = val_df.tool_label.values - 1

In [None]:
#Create tensorflow datasets objects and add patch embedding to train dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train_tool, labels_train))
test_dataset = tf.data.Dataset.from_tensor_slices((x_test_tool, labels_test))
val_dataset = tf.data.Dataset.from_tensor_slices((x_val_tool, labels_val))
train_dataset = train_dataset.shuffle(x_train_tool.shape[0], seed=777)

auto = tf.data.AUTOTUNE

data_augmentation = k.Sequential(
    [layers.RandomCrop(image_size, image_size), k.layers.RandomFlip("horizontal"),],
    name="data_augmentation",
)

new_train_dataset = generate_datasets(train_dataset, is_train=True)
new_val_dataset = generate_datasets(val_dataset)
new_test_dataset = generate_datasets(test_dataset)

In [None]:
#Multi_mixer model architecture optimization
hp = keras_tuner.HyperParameters()

tuner = keras_tuner.RandomSearch(
    hypermodel=load_multi_mixer,
    objective="val_accuracy",
    max_trials=100,
    seed=None,
    hyperparameters=None,
    tune_new_entries=True,
    allow_new_entries=True,
    overwrite=True,
    project_name="tool_opt"
)

history_opt = tuner.search(
        new_train_dataset,
        epochs=num_epochs,
        shuffle=True,
        validation_data=new_val_dataset
    )

In [None]:
#Optimization summary #1
tuner.search_space_summary()

In [None]:
#Optimization summary #2
tuner.results_summary()

In [None]:
#Choose the best model based on architecture optimization
models = tuner.get_best_models(num_models=2)
best_model = models[0]
best_model.build(input_shape=(image_size, image_size, in_channel_tool))
best_model.summary()

In [None]:
#Choose the best hyperparameters
best_hps = tuner.get_best_hyperparameters(5)
model = load_multi_mixer(best_hps[0])

In [None]:
#Train optimized architecture
history, conv_mixer_model = launch_experiment(model)

In [None]:
#Save model
conv_mixer_model.save("../models/tool_aug_tool_opt.h5")

In [None]:
#Plot accuracy history
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
#Plot loss history
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()