## test

In [None]:
# Setting the seed value for random number generation
seed_value = 42

# Setting seed for the random module in Python
import random

random.seed(seed_value)

# Setting seed for random number generation in NumPy
import numpy as np

np.random.seed(seed_value)

# Setting seed for TensorFlow's random number generation
import tensorflow as tf

tf.random.set_seed(seed_value)

# Setting seed for random number generation in Keras (TensorFlow's high-level API)
tf.keras.utils.set_random_seed(seed_value)

In [None]:
import os

# Set the environment variable to restrict TensorFlow to use only the first GPU (index 0)
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

import tensorflow as tf

# Print the TensorFlow version
print(tf.__version__)

# List available physical GPU devices for TensorFlow
print(tf.config.list_physical_devices('GPU'))

## import

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing import image
from tensorflow.keras.layers import Input, Dense, Dropout, GlobalAveragePooling2D, Convolution2D, Flatten, Resizing
from tensorflow.keras.layers import MaxPooling2D, AveragePooling2D, BatchNormalization, Conv2DTranspose, concatenate
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.utils import to_categorical
from livelossplot.inputs.tf_keras import PlotLossesCallback

## lable preparation

In [None]:
# Read CSV files using Pandas (file names)
tr_df = pd.read_csv('F:/poorya/datasets/keypoint dataset/new pretext dataset/train_aug.csv')
vl_df = pd.read_csv('F:/poorya/datasets/keypoint dataset/new pretext dataset/val.csv')
ts_df = pd.read_csv('F:/poorya/datasets/keypoint dataset/new pretext dataset/test.csv')

# Load NumPy arrays using numpy.load() (labels)
tr_ary = np.load('F:/poorya/datasets/keypoint dataset/new pretext dataset/train_aug.npy')
ts_ary = np.load('F:/poorya/datasets/keypoint dataset/new pretext dataset/test.npy')
vl_ary = np.load('F:/poorya/datasets/keypoint dataset/new pretext dataset/val.npy')

## functions

In [None]:
sub = []  # Initialize an empty list to store subject identifiers
my_subjects = {}  # Initialize an empty dictionary to map subject IDs to a counter
counter = 0  # Initialize a counter

# List files in the specified directory
files = os.listdir('F:/poorya/datasets/keypoint dataset/new pretext dataset/train_aug/')

# Extract three-digit subject identifiers from file names and store them in the 'sub' list
for i in files:
    sub.append(i[-7:-4])

# Iterate through a range of numbers up to 95
for i in range(95):
    # Check if the zero-padded string representation of the number exists in the 'sub' list
    if str(i).zfill(3) in sub:
        # Map the subject identifier to a counter value in the 'my_subjects' dictionary
        my_subjects[str(i).zfill(3)] = counter
        counter = counter + 1  # Increment the counter for the next subject

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    '''Generates data for the model'''

    def __init__(self, folder, df, ary, batch_size, my_subjects):
        '''Initialization'''
        self.folder = folder
        self.n = df['images'].to_list()
        self.ary = ary
        self.batch_size = batch_size
        self.my_subjects = my_subjects
        self.on_epoch_end()

    def __len__(self):
        '''Denotes the number of batches per epoch'''
        return int(np.floor(len(self.n) / self.batch_size))

    def __getitem__(self, index):
        '''Generate one batch of data'''
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        list_IDs_temp = [self.n[k] for k in indexes]
        x, y = self.__data_generation(list_IDs_temp, indexes)
        return x, y

    def on_epoch_end(self):
        '''Updates indexes after each epoch'''
        self.indexes = np.arange(len(self.n))

    def __data_generation(self, list_IDs_temp, myindexes):
        '''Generates data containing batch_size samples'''
        images = np.empty((self.batch_size, 768, 768, 1))
        rotation = np.empty((self.batch_size), dtype=int)
        subject = np.empty((self.batch_size), dtype=int)
        keypoint = np.empty((self.batch_size, 10), dtype=float)

        for i, ID in enumerate(list_IDs_temp):
            train_img_2 = image.img_to_array(image.load_img(self.folder + ID, color_mode="grayscale"))
            train_img_2 /= 255.0
            images[i,] = train_img_2

            rotation[i] = int(ID[-9])
            subject[i] = int(self.my_subjects[ID[-7:-4]])
            keypoint[i] = self.ary[int(myindexes[i]), :] / 768

        return images, [
            to_categorical(subject, num_classes=89),
            to_categorical(rotation, num_classes=4),
            keypoint
        ]


bs = 8  # Define batch size

# Create instances of DataGenerator for training, validation, and testing data
train_gen = DataGenerator('F:/poorya/datasets/keypoint dataset/new pretext dataset/train_aug/', tr_df, tr_ary, bs, my_subjects)
val_gen = DataGenerator('F:/poorya/datasets/keypoint dataset/new pretext dataset/val/', vl_df, vl_ary, bs, my_subjects)
test_gen = DataGenerator('F:/poorya/datasets/keypoint dataset/new pretext dataset/test/', ts_df, ts_ary, bs, my_subjects)

## model structure

In [None]:
def vgg_block(layer_in, n_filters, n_conv):
    '''
    Create a VGG block with multiple convolutional layers followed by pooling and normalization.

    Arguments:
    layer_in: Input layer to the VGG block.
    n_filters: Number of filters for the convolutional layers.
    n_conv: Number of convolutional layers to be added.

    Returns:
    layer_in: Output of the VGG block.
    '''
    for _ in range(n_conv):
        layer_in = Convolution2D(n_filters, (3, 3), padding = 'same', activation = 'relu')(layer_in)
    layer_in = AveragePooling2D((2, 2), strides = (2, 2))(layer_in)
    layer_in = BatchNormalization()(layer_in)
    return layer_in


def get_model():
    '''
    Create a deep learning model for prediction.

    Returns:
    model: A Keras model with specified architecture for multi-task prediction.
    '''
    # Initialization section
    loss_list = {'subject': 'categorical_crossentropy', 'rotation': 'categorical_crossentropy', 'keypoint': 'mse'}
    test_metrics = {'subject': 'accuracy', 'rotation': 'accuracy', 'keypoint': tf.keras.metrics.MeanAbsolutePercentageError()}
    opt = tf.keras.optimizers.Adamax(learning_rate = 0.0001)
    drp = 0.0
    act = 'relu'
    eact = 'softmax'

    # Backbone of the model
    model_input = Input(shape = (768, 768, 1))

    # Build VGG-style blocks
    layer = vgg_block(model_input, 8, 2)
    layer = vgg_block(layer, 16, 2)
    layer = vgg_block(layer, 32, 2)
    layer = vgg_block(layer, 64, 2)
    layer = vgg_block(layer, 128, 2)
    layer = vgg_block(layer, 256, 2)
    layer = vgg_block(layer, 512, 2)

    # Flatten the features
    xx = Flatten()(layer)

    # Subject prediction
    x = Dense(512, activation = act)(xx)
    x = Dropout(drp)(x)
    y1 = Dense(256, activation = act)(x)
    y1 = Dropout(drp)(y1)
    y1 = Dense(89, activation = eact, name = 'subject')(y1)

    # Rotation prediction
    y2 = Dense(256, activation = act)(x)
    y2 = Dropout(drp)(y2)
    y2 = Dense(4, activation = eact, name = 'rotation')(y2)

    # Keypoint prediction
    y3 = Dense(1024, activation = act)(xx)
    y3 = Dropout(drp)(y3)
    # ... additional layers for keypoint prediction ...
    y3 = Dense(10, name = 'keypoint')(y3)

    # Create the model and compile it
    model = Model(inputs = model_input, outputs = [y1, y2, y3])
    model.compile(loss = loss_list, optimizer = opt, metrics = test_metrics)

    return model


# Create the model using the defined function
model = get_model()
# Display a summary of the model architecture
model.summary()

## train

In [None]:
def tr():
    '''
    Define callbacks for model training.

    Returns:
    list: A list containing instances of ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, and PlotLossesCallback.
    '''
    main_chk = ModelCheckpoint(filepath = 'checkpoint', monitor = 'val_keypoint_loss', mode = 'min', verbose = 1, save_best_only = True)
    early_st = EarlyStopping(monitor = "val_loss", patience = 30, verbose = 1, mode = "min")
    rduce_lr = ReduceLROnPlateau(monitor = "val_loss", factor = 0.8, patience = 5, verbose = 1, mode = "min", min_lr = 0.00001)
    plot_tr = PlotLossesCallback()
    return [main_chk, rduce_lr, plot_tr]


# Get the list of callbacks
cll = tr()

# Fit the model using the defined callbacks and training data
history = model.fit(
    train_gen,
    validation_data = val_gen,
    batch_size = bs,
    epochs = 300,
    verbose = 1,
    callbacks = cll,
    steps_per_epoch = len(tr_df) // bs,
    validation_steps = len(vl_df) // bs
    )

## evaluation

In [None]:
# Create an empty image array of a specific size
img = np.zeros((1, 768, 768, 1)).astype('double')

# Load an image for testing
train_igg = image.load_img('F:/poorya/datasets/keypoint dataset/new pretext dataset/test/' + ts_df['images'][20], color_mode="grayscale")

# Convert the loaded image to an array and normalize the pixel values
train_img = image.img_to_array(train_igg)
train_img /= 255.0

# Assign the loaded and processed image to the empty image array
img[0, :, :, :] = train_img
img = np.array(img)

# Load the trained model for prediction
classifier = load_model('checkpoint')

# Perform prediction using the model on the test image
subj, rott, kpoint = classifier.predict(img)

# Extract x and y coordinates from the predicted keypoint
x = kpoint[0, 0:5] * 768
y = kpoint[0, 5:10] * 768

# Visualize the test image and predicted keypoints
plt.figure(figsize=(20, 10))
plt.imshow(img[0], cmap='gray')
plt.scatter(x, y)
plt.show()

In [None]:
# Load the trained model for testing
testmodel = load_model('checkpoint')

# Evaluate the model's performance on the test data
# This calculates the loss and accuracy of the model
tst_loss, tst_acc = testmodel.evaluate(test_gen, steps=len(ts_df) // bs)

# Classification parameters

In [None]:
# Initialize arrays to store image data, rotation, and subject labels
images = np.empty((len(ts_df), 768, 768, 1))
rotation = np.empty((len(ts_df)), dtype = int)
subject = np.empty((len(ts_df)), dtype = int)

# Loop through the test dataset
for i in range(len(ts_df)):
    # Load each test image, convert it to an array, and normalize pixel values
    train_igg = image.load_img('F:/poorya/datasets/keypoint dataset/new pretext dataset/test/' + ts_df['images'][
        i], color_mode = "grayscale")
    train_img = image.img_to_array(train_igg)
    train_img /= 255.0
    images[i, :, :, :] = train_img  # Store the preprocessed image

    # Extract rotation label from the image filename and convert it to an integer
    rotation[i] = int(ts_df['images'][i][-9])

    # Extract subject label from the image filename and convert it to an integer
    subject[i] = int(my_subjects[ts_df['images'][i][-7:-4]])

# Convert the 'images' array to x_test for prediction
x_test = np.array(images)

# Make predictions using the loaded test model
# This predicts rotation, subject, and keypoints
predictions = testmodel.predict(x_test, batch_size = 1, verbose = 1, steps = len(ts_df) // 1)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, multilabel_confusion_matrix


def change(x):
    '''
    Determine the most probable class index for each sample in the prediction.

    Arguments:
    x (numpy.ndarray): Array containing the model's predicted probabilities for each class.

    Returns:
    numpy.ndarray: An array containing the index of the most probable class for each sample.
    '''
    answer = np.zeros((np.shape(x)[0]), dtype = int)
    for i in range(np.shape(x)[0]):
        max_value = max(x[i, :])
        max_index = list(x[i, :]).index(max_value)
        answer[i] = max_index
    return answer


# Define labels for rotation and subject classes
labels = [['0 degrees', '90 degrees', '180 degrees', '270 degrees'], [str(k + 1) for k in range(89)]]

# Visualize confusion matrix for 'subject' prediction
cm = confusion_matrix(subject, change(predictions[1]))
disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = labels[1])
disp.plot(cmap = plt.cm.Blues)
plt.show()

# Visualize confusion matrix for 'rotation' prediction
fig, ax = plt.subplots(figsize = (20, 20))
plt.rcParams.update({'font.size': 8})
cm = confusion_matrix(rotation, change(predictions[0]))
disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = labels[0])
disp.plot(cmap = plt.cm.Blues, ax = ax)
plt.show()

In [None]:
# Generate and visualize confusion matrix for 'rotation' and predicted 'subject'
cm = confusion_matrix(rotation, change(predictions[1]))
disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = labels[0])
disp.plot(cmap = plt.cm.Blues)
plt.show()

# Compare the true and predicted 'subject' classes with appended values
sub_pred = np.append(change(predictions[0]), [73, 78])
sub_true = np.append(subject, [73, 78])

# Plot the confusion matrix for 'subject' with appended values
fig, ax = plt.subplots(figsize = (20, 20))
plt.rcParams.update({'font.size': 8})
cm = confusion_matrix(sub_true, sub_pred)
disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = labels[1])
disp.plot(cmap = plt.cm.Blues, ax = ax)
plt.show()