In [None]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import keras
from keras.models import Sequential
from keras.layers import Conv2D, Lambda, MaxPooling2D, Rescaling # convolution layers
from keras.layers import Dense, Dropout, Flatten # core layers
from keras.utils.np_utils import to_categorical
from keras.preprocessing.image import ImageDataGenerator

import tensorflow as tf
from keras.layers import BatchNormalization

from sklearn.metrics import f1_score
from sklearn.preprocessing import LabelEncoder

import json

from utils import plot_cm


In [None]:
input_shape = (58, 58)

adaptive_based_on_val = True  # if False, then uses ada_delta with decay, else uses stopping and decay based on val_acc

In [None]:
if adaptive_based_on_val:
    train_ds = tf.keras.utils.image_dataset_from_directory(
        "images2/train",
        validation_split=0.1,
        subset="training",
        seed=20,
        color_mode='rgb',
        image_size=input_shape,
        label_mode="categorical",
        shuffle=True,
    )

    val_ds = tf.keras.utils.image_dataset_from_directory(
        "images2/train",
        validation_split=0.1,
        subset="validation",
        seed=20,
        color_mode='rgb',
        image_size=input_shape,
        batch_size=64,
        label_mode="categorical",
        shuffle=False,
    )
else:
    train_ds = tf.keras.utils.image_dataset_from_directory(
        "images2/train",
        seed=42,
        color_mode='rgb',
        image_size=input_shape,
        label_mode="categorical",
        shuffle=True,
        batch_size=1000,
    )

In [None]:
model=Sequential()

model.add(Rescaling(1./127.5, offset=-1, input_shape=(input_shape[0], input_shape[1], 3)))
model.add(Conv2D(filters=64, kernel_size = (6,6), activation="relu", strides=(2, 2)))
model.add(Conv2D(filters=64, kernel_size = (3,3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(BatchNormalization())

model.add(Conv2D(filters=128, kernel_size = (3,3), activation="relu"))
model.add(Conv2D(filters=128, kernel_size = (3,3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(BatchNormalization())    

model.add(Conv2D(filters=256, kernel_size = (3,3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(BatchNormalization())
    
model.add(Flatten())
model.add(Dense(512,activation="relu"))
    
model.add(Dense(9,activation="softmax"))
if adaptive_based_on_val:
    model.compile(loss="categorical_crossentropy", optimizer='adam', metrics=["accuracy"])
else:
    ada_delta_ = keras.optimizers.Adadelta(lr=1, rho=0.95, epsilon=1e-08, decay=0.03)
    model.compile(loss="categorical_crossentropy", optimizer=ada_delta_, metrics=["accuracy"])

In [None]:
model.summary()

In [None]:
if adaptive_based_on_val:
    es = keras.callbacks.EarlyStopping(
        monitor="val_accuracy", # metrics to monitor
        patience=10, # how many epochs before stop
        verbose=1,
        mode="max", # we need the maximum accuracy.
        restore_best_weights=True, # 
        )

    rp = keras.callbacks.ReduceLROnPlateau(
        monitor="val_accuracy",
        factor=0.2,
        patience=3,
        verbose=1,
        mode="max",
        min_lr=0.00001,
        )
    h = model.fit(train_ds, validation_data=val_ds, epochs=200, callbacks=[rp, es])
    # Do 5 epochs with a lower learning rate
    model.compile(loss="categorical_crossentropy", optimizer=keras.optimizers.Adam(learning_rate=1e-5), metrics=["accuracy"])
    h = model.fit(val_ds, epochs=5)
else:
    h = model.fit(train_ds, epochs=140)

In [None]:
test_ds = tf.keras.utils.image_dataset_from_directory(
    "images2/test",
    validation_split=None,
    seed=42,
    image_size=input_shape,
    batch_size=2000,
    label_mode="categorical",
    shuffle=False,
    color_mode='rgb',
)
for image_batch, labels_batch in test_ds:
  y_test_true = np.array(labels_batch)

In [None]:
y_pred = model.predict(test_ds) # Predict class probabilities as 2 => [0.1, 0, 0.9, 0, 0, 0, 0, 0, 0, 0]
Y_pred = np.argmax(y_pred, 1) # Decode Predicted labels
Y_test_treu = np.argmax(y_test_true, 1) # Decode Predicted labels

In [None]:
fig = plt.figure(figsize=(10, 10)) # Set Figure

mat = confusion_matrix(Y_test_treu, Y_pred) # Confusion matrix

# Plot Confusion matrix
sns.heatmap(mat, square=True, annot=True, cbar=False, cmap=plt.cm.Blues, fmt='.0f')
plt.xlabel('Predicted Values')
plt.ylabel('True Values');
plt.show();

In [None]:
f1_score(Y_test_treu, Y_pred, average='macro')

In [None]:
with open('data/le_name_mapping.json', 'r') as f:
    mapping = json.load(f)
    le = LabelEncoder()
mapping['classes'] = [mapping[str(int(i))] for i in range(9)]
le.classes_ = np.array(mapping['classes'])

In [None]:
import importlib
import utils

importlib.reload(utils)
from utils import plot_cm

In [None]:
from importlib.resources import path


plot_cm(Y_test_treu, Y_pred, le, save=1, title='Confusion Matrix: CNN Model', figname='cnn_cm', save_path='figures/CNN/')

proportion_correct = f1_score(Y_test_treu, Y_pred, average='macro')
print('Test Accuracy: {}'.format(proportion_correct))