In [None]:
import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import apply_affine_transform
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.callbacks import ReduceLROnPlateau

from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight


from PIL import Image
import matplotlib.pyplot as plt

from collections import Counter


# Load Dataset

In [None]:
CSV_FILE_NAME = './data/my_file.csv'
# CSV_FILE_NAME = './data/my_file_400.csv'
arr = pd.read_csv(CSV_FILE_NAME, header=None)

In [None]:
X, y_class, y_reg = arr.iloc[:, :-5].values, arr.iloc[:, -5:-4].values, arr.iloc[:, -4:].values
X = X.reshape(-1, 200, 200, 1)
# X = X.reshape(-1, 400, 400, 1)


# one hot encoding
y_class = tf.keras.utils.to_categorical(y_class, num_classes=17)

# dataset split
X_train, X_test, y_train_class, y_test_class = train_test_split(X, y_class, test_size=0.2, random_state=42, stratify=y_class)


# if no data augmentation
class_weights = class_weight.compute_class_weight(class_weight='balanced',
                                                  classes=np.unique(y_train_class.argmax(axis=1)),
                                                  y=y_train_class.argmax(axis=1))

class_weights_dict = dict(enumerate(class_weights))


# Data Augmentation

In [None]:
# def apply_augmentation(images):
#     augmented_images = []
#     for image in images:
#         augmented_image = image.copy()

#         # shear
#         shear_intensity = np.random.uniform(-0.1, 0.1)
#         augmented_image = apply_affine_transform(augmented_image, shear=shear_intensity)

#         # zoom
#         zoom_factor = np.random.uniform(0.9, 1.1)
#         augmented_image = apply_affine_transform(augmented_image, zx=zoom_factor, zy=zoom_factor)

#         # # shift
#         # width_shift = np.random.uniform(-0.1, 0.1)
#         # height_shift = np.random.uniform(-0.1, 0.1)
#         # augmented_image = apply_affine_transform(augmented_image, tx=width_shift, ty=height_shift)

#         # rotate
#         rotation_angle = np.random.uniform(-45, 45)
#         augmented_image = apply_affine_transform(augmented_image, theta=rotation_angle)

#         # # flip
#         # if np.random.rand() < 0.5:
#         #     augmented_image = np.fliplr(augmented_image)
#         # if np.random.rand() < 0.5:
#         #     augmented_image = np.flipud(augmented_image)

#         # brightness
#         brightness_factor = np.random.uniform(0.5, 1.5)
#         augmented_image *= brightness_factor

#         # make sure valeus are between 0 and 1
#         augmented_image = np.clip(augmented_image, 0.0, 1.0)

#         augmented_images.append(augmented_image)

#     return augmented_images


In [None]:
# # add augmented data to each class until they each have 100 samples

# y_train_integers = np.argmax(y_train_class, axis=1)
# max_samples = 100

# X_augmented, y_augmented = [], []

# for label in np.unique(y_train_integers):
#     curr_samples = sum(y_train_integers == label)

#     if curr_samples < max_samples:
#         num_augmented = max_samples - curr_samples

#         samples = X_train[y_train_integers == label]
#         indices = np.random.choice(len(samples), size=num_augmented, replace=True)
#         samples = samples[indices]

#         augmented_samples = apply_augmentation(samples)

#         X_augmented.extend(augmented_samples)

#         one_hot = np.zeros((1, 18))
#         one_hot[0, label] = 1
#         y_augmented.extend([one_hot] * num_augmented)


# X_augmented = np.array(X_augmented)
# y_augmented = np.array(y_augmented)

# X_balanced = np.concatenate((X_train, X_augmented))
# y_balanced = np.concatenate((y_train_class, y_augmented.reshape(-1, 18)))

In [None]:
# print ("number of training examples = " + str(X_balanced.shape[0]))
# print ("number of test examples = " + str(X_test.shape[0]))
# print ("X_train shape: " + str(X_balanced.shape))
# print ("Y_train shape: " + str(y_balanced.shape))
# print ("X_test shape: " + str(X_test.shape))
# print ("Y_test shape: " + str(y_test_class.shape))

In [None]:
# # display image
# index = 10
# print("label: ", y_train_class[index])
# img_array = X_train[index].reshape(400, 400)
# image = Image.fromarray((img_array * 255))

# plt.imshow(image, cmap='gray')
# plt.show()

# Model

In [None]:
model = models.Sequential([
    layers.Input(shape=(200, 200, 1)),
    # layers.Input(shape=(400, 400, 1)),
    layers.Conv2D(32, 3, activation='relu'),
    # layers.Conv2D(32, 3, kernel_regularizer=l2(0.0001), activation='relu'),
    layers.MaxPooling2D(2),
    layers.Conv2D(64, 3, activation='relu'),
    # layers.Conv2D(64, 3, kernel_regularizer=l2(0.0001), activation='relu'),
    layers.MaxPooling2D(2),
    layers.Flatten(),
    layers.Dropout(0.2),
    layers.Dense(64, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(17, activation='softmax')
])


reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=np.sqrt(0.1), patience=5)

# f1_metric = tf.metrics.F1Score(average='macro')
f1_metric = tf.metrics.F1Score(average='weighted') # if no data augmentation


model.compile(optimizer=Adam(0.001),
              loss=CategoricalCrossentropy(),
              metrics=['accuracy', f1_metric])


model.summary()

In [None]:
# history = model.fit(X_balanced, y_balanced, epochs=20, validation_data=(X_test, y_test_class))
history = model.fit(X_train, y_train_class, epochs=50, validation_data=(X_test, y_test_class), class_weight=class_weights_dict, callbacks=[reduce_lr]) # if no data augmentation

In [None]:
# Convert model history dictionary to DataFrame
df_loss_acc = pd.DataFrame(model.history.history)

# Select loss columns and rename
df_loss = df_loss_acc[['loss', 'val_loss']].copy()
df_loss.rename(columns={'loss': 'train', 'val_loss': 'validation'}, inplace=True)

# Select accuracy columns and rename
df_acc = df_loss_acc[['accuracy', 'val_accuracy']].copy()
df_acc.rename(columns={'accuracy': 'train', 'val_accuracy': 'validation'}, inplace=True)

# Plot model loss
df_loss.plot(title='Model loss', figsize=(12, 8)).set(xlabel='Epoch', ylabel='Loss')

# Plot model accuracy
df_acc.plot(title='Model Accuracy', figsize=(12, 8)).set(xlabel='Epoch', ylabel='Accuracy')


In [None]:
result = model.evaluate(X_test, y_test_class)
print("Result:", result)

In [None]:
model = models.Sequential([
    layers.Input(shape=(200, 200, 1)),
    layers.Conv2D(32, 3, activation='relu'),
    layers.MaxPooling2D(2),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(2),
    layers.Flatten(),
    layers.Dropout(0.2),
    layers.Dense(64, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(17, activation='softmax'),
])


# f1_metric = tf.metrics.F1Score(average='macro')
f1_metric = tf.metrics.F1Score(average='weighted') # if no data augmentation

model.compile(optimizer=Adam(learning_rate=0.00001),
              loss=CategoricalCrossentropy(),
              metrics=['accuracy', f1_metric])


model.summary()


In [None]:
history2 = model.fit(X_train, y_train_class, epochs=50, validation_data=(X_test, y_test_class), class_weight=class_weights_dict) # if no data augmentation

In [None]:
history2_continued = model.fit(X_train, y_train_class, epochs=100, initial_epoch=50, validation_data=(X_test, y_test_class), class_weight=class_weights_dict) # if no data augmentation