<a href="https://colab.research.google.com/github/GoncalSousa20/NAS_IPL/blob/main/ConvNeXtXLarge_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
# Define the paths to your three folders
train_path = '/content/drive/MyDrive/ProjetoCurso/train'
validation_path = '/content/drive/MyDrive/ProjetoCurso/validation'
test_path = '/content/drive/MyDrive/ProjetoCurso/test'

from keras.preprocessing.image import ImageDataGenerator
# Augment data
batch_size = 16
train_input_shape = (224, 224, 3)

train_datagen = ImageDataGenerator(rescale=1./255.,
                                   #rotation_range=45,
                                   #width_shift_range=0.5,
                                   #height_shift_range=0.5,
                                   shear_range=5,
                                   #zoom_range=0.7,
                                   horizontal_flip=True,
                                   vertical_flip=True,
                                  )

train_generator = train_datagen.flow_from_directory(directory='/content/drive/MyDrive/ProjetoCurso/train',
                                                    class_mode='categorical',
                                                    target_size=train_input_shape[0:2],
                                                    batch_size=batch_size,
                                                    shuffle=True,
                                                   )

valid_generator = train_datagen.flow_from_directory(directory='/content/drive/MyDrive/ProjetoCurso/validation',
                                                    class_mode='categorical',
                                                    target_size=train_input_shape[0:2],
                                                    batch_size=batch_size,
                                                    shuffle=True,
                                                   )

test_generator = train_datagen.flow_from_directory(directory='/content/drive/MyDrive/ProjetoCurso/test',
                                                    class_mode='categorical',
                                                    target_size=train_input_shape[0:2],
                                                    batch_size=1,
                                                    shuffle=False,
                                                   )

STEP_SIZE_TRAIN = train_generator.n//train_generator.batch_size
STEP_SIZE_VALID = valid_generator.n//valid_generator.batch_size
STEP_SIZE_TEST = test_generator.n//test_generator.batch_size
print("Total number of batches =", STEP_SIZE_TRAIN, "and", STEP_SIZE_VALID, "and", STEP_SIZE_TEST)

from keras.applications import ConvNeXtXLarge
base_model = ConvNeXtXLarge(weights='imagenet', include_top=False, input_shape=train_input_shape)
for layer in base_model.layers:
    layer.trainable = False

from keras.layers import Dense, Flatten, BatchNormalization, Activation, Dropout
from keras.models import Model

n_classes = 11

# Add layers at the end
X = base_model.output
X = Flatten()(X)

# First dense block
X = Dense(512, kernel_initializer='he_uniform')(X)
X = BatchNormalization()(X)
X = Activation('relu')(X)
# Optionally add dropout for regularization
# X = Dropout(0.5)(X)

# Second dense block
X = Dense(256, kernel_initializer='he_uniform')(X)
X = BatchNormalization()(X)
X = Activation('relu')(X)
# Optionally add dropout for regularization
# X = Dropout(0.5)(X)

# Third dense block
X = Dense(128, kernel_initializer='he_uniform')(X)
X = BatchNormalization()(X)
X = Activation('relu')(X)
# Optionally add dropout for regularization
# X = Dropout(0.5)(X)

# Fourth dense block
X = Dense(64, kernel_initializer='he_uniform')(X)
X = BatchNormalization()(X)
X = Activation('relu')(X)
# Optionally add dropout for regularization
# X = Dropout(0.5)(X)

# Fifth dense block
X = Dense(32, kernel_initializer='he_uniform')(X)
X = BatchNormalization()(X)
X = Activation('relu')(X)
# Optionally add dropout for regularization
# X = Dropout(0.5)(X)

# Final dense layer before output
X = Dense(16, kernel_initializer='he_uniform')(X)
X = BatchNormalization()(X)
X = Activation('relu')(X)
# Optionally add dropout for regularization
# X = Dropout(0.5)(X)

# Output layer
output = Dense(n_classes, activation='softmax')(X)

# Create the model
model = Model(inputs=base_model.input, outputs=output)

from keras.optimizers import Adam
optimizer = Adam(lr=0.0001)
model.compile(loss='categorical_crossentropy',
              optimizer=optimizer,
              metrics=['accuracy'])

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    matthews_corrcoef, cohen_kappa_score, hamming_loss, zero_one_loss, roc_auc_score,
    average_precision_score, brier_score_loss, classification_report
)
import pickle
import numpy as np
import tensorflow as tf
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)
# Additional custom metrics
def fbeta_score(precision, recall, beta=1.0):
    if precision + recall == 0:
        return 0.0
    return (1 + beta*2) * (precision * recall) / (beta*2 * precision + recall)

def g_mean(precision, recall):
    return np.sqrt(precision * recall)

# Example usage
dictionary_metrics = {
    'train_acc': [],
    'train_loss': [],
    'precision': [],
    'recall': [],
    'val_loss': [],
    'test_acc': [],
    'test_loss': [],
    'val_accuracy': [],
    'f1_score': [],
    'mcc': [],
    'cohen_kappa': [],
    'hamming_loss': [],
    'zero_one_loss': [],
    'auc_roc': [],
    'brier_score': [],
    'mAP': [],
    'f1_beta': [],
    'g_mean': []
}

# Function to get predictions and labels in batches
def get_predictions_and_labels(generator, model, steps):
    y_true = []
    y_pred = []
    y_pred_prob = []

    for i in range(steps):
        (X, y) = next(generator)
        y_pred_prob_batch = model.predict(X)
        y_pred_batch = np.argmax(y_pred_prob_batch, axis=1)

        y_pred.append(y_pred_batch)
        y_pred_prob.append(y_pred_prob_batch)
        y_true.append(y)

    # Flatten the lists
    y_pred = np.concatenate(y_pred, axis=0)
    y_pred_prob = np.concatenate(y_pred_prob, axis=0)
    y_true = np.concatenate(y_true, axis=0)

    # Update Truth vector based on argmax if y is one-hot encoded
    if y_true.ndim > 1 and y_true.shape[1] > 1:
        y_true = np.argmax(y_true, axis=1)

    # Convert to numpy arrays
    y_pred = np.asarray(y_pred).ravel()
    y_pred_prob = np.asarray(y_pred_prob)

    return y_true, y_pred,y_pred_prob

Mounted at /content/drive
Found 3862 images belonging to 11 classes.
Found 216 images belonging to 11 classes.
Found 219 images belonging to 11 classes.
Total number of batches = 241 and 13 and 219
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/convnext/convnext_xlarge_notop.h5




In [None]:
for epoch in range(30):
  print(f"Starting Epoch {epoch+1}")
  history = model.fit(
      train_generator,
      validation_data=valid_generator,
      epochs=1
  )
  print(history.history)
  print(f"Finished Epoch {epoch+1}")

  # Extract validation labels
  val_labels = []
  for i in range(len(valid_generator)):
      _, batch_labels = next(valid_generator)
      val_labels.extend(batch_labels)

  val_labels = np.array(val_labels)
  dictionary_metrics['train_acc'].append(history.history["accuracy"])
  dictionary_metrics['train_loss'].append(history.history["loss"])

  # Calculate metrics for validation and test sets
  val_loss, val_accuracy = model.evaluate(valid_generator)
  dictionary_metrics['val_accuracy'].append(val_accuracy)
  dictionary_metrics['val_loss'].append(val_loss)
  test_loss, test_acc = model.evaluate(test_generator)
  dictionary_metrics['test_loss'].append(test_loss)
  dictionary_metrics['test_acc'].append(test_acc)

  test_labels, test_pred_labels, test_pred_prob = get_predictions_and_labels(test_generator, model, steps=STEP_SIZE_TEST)

  num_classes = len(np.unique(test_labels))

  test_classification_report = classification_report(test_labels, test_pred_labels, output_dict=True)
  test_recall_list = []
  test_precision_list = []
  test_f1_score_list = []
  test_f1_beta_list = []
  test_g_mean_list = []

  for class_label in range(num_classes):
      recall = test_classification_report[str(class_label)]['recall']
      precision = test_classification_report[str(class_label)]['precision']
      f1 = test_classification_report[str(class_label)]['f1-score']
      test_recall_list.append(recall)
      test_precision_list.append(precision)
      test_f1_score_list.append(f1)
      test_f1_beta_list.append(fbeta_score(precision, recall, beta=2.0))  # Using beta=2 for F1-beta score
      test_g_mean_list.append(g_mean(precision, recall))

  dictionary_metrics['recall'].append(test_recall_list)
  dictionary_metrics['precision'].append(test_precision_list)
  dictionary_metrics['f1_score'].append(test_f1_score_list)
  dictionary_metrics['f1_beta'].append(test_f1_beta_list)
  dictionary_metrics['g_mean'].append(test_g_mean_list)
  dictionary_metrics['mcc'].append(matthews_corrcoef(test_labels, test_pred_labels))
  dictionary_metrics['cohen_kappa'].append(cohen_kappa_score(test_labels, test_pred_labels))
  dictionary_metrics['hamming_loss'].append(hamming_loss(test_labels, test_pred_labels))
  dictionary_metrics['zero_one_loss'].append(zero_one_loss(test_labels, test_pred_labels, normalize=True))

  try:
      test_roc_auc = roc_auc_score(test_labels, test_pred_prob, multi_class='ovr')
      dictionary_metrics['auc_roc'].append(test_roc_auc)
  except ValueError:
      print("Error in calculating ROC AUC for test set")

  try:
      avg_precision = []
      for i in range(num_classes):
          avg_precision.append(average_precision_score((test_labels == i).astype(int), test_pred_prob[:, i]))
      avg_precision = np.mean(avg_precision)
      dictionary_metrics['mAP'].append(avg_precision)
  except ValueError as e:
      print(f"Error in calculating Average Precision: {e}")

  # Calculate Brier score for each class and average them
  brier_scores = []
  for i in range(num_classes):
      true_binary = (test_labels == i).astype(float)
      probas = test_pred_prob[:, i]
      brier_score = brier_score_loss(true_binary, probas)
      brier_scores.append(brier_score)

  average_brier_score = np.mean(brier_scores)
  dictionary_metrics['brier_score'].append(average_brier_score)

with open('/content/drive/MyDrive/ProjetoCurso/metrics_ConvNeXtXLarge.pkl', 'wb') as fp:
    pickle.dump(dictionary_metrics,fp)

Starting Epoch 1
{'loss': [2.2320728302001953], 'accuracy': [0.2206110805273056], 'val_loss': [2.172376871109009], 'val_accuracy': [0.24074074625968933]}
Finished Epoch 1
Starting Epoch 2


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


{'loss': [1.9696239233016968], 'accuracy': [0.3441222310066223], 'val_loss': [1.950013279914856], 'val_accuracy': [0.35648149251937866]}
Finished Epoch 2
Starting Epoch 3


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


{'loss': [1.8097660541534424], 'accuracy': [0.3995339274406433], 'val_loss': [1.8689196109771729], 'val_accuracy': [0.37962964177131653]}
Finished Epoch 3
Starting Epoch 4


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


{'loss': [1.6551166772842407], 'accuracy': [0.45028483867645264], 'val_loss': [1.6975497007369995], 'val_accuracy': [0.3888888955116272]}
Finished Epoch 4
Starting Epoch 5


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


{'loss': [1.5358167886734009], 'accuracy': [0.49870532751083374], 'val_loss': [1.766494631767273], 'val_accuracy': [0.4305555522441864]}
Finished Epoch 5
Starting Epoch 6


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


{'loss': [1.4321595430374146], 'accuracy': [0.5300362706184387], 'val_loss': [1.6378564834594727], 'val_accuracy': [0.45370370149612427]}
Finished Epoch 6
Starting Epoch 7


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
Starting Epoch 9
{'loss': [1.216763973236084], 'accuracy': [0.6004660725593567], 'val_loss': [1.45369291305542], 'val_accuracy': [0.5277777910232544]}
Finished Epoch 9
Starting Epoch 10
{'loss': [1.1303728818893433], 'accuracy': [0.632055938243866], 'val_loss': [1.4929405450820923], 'val_accuracy': [0.5138888955116272]}
Finished Epoch 10
Starting Epoch 11
{'loss': [1.0853110551834106], 'accuracy': [0.646038293838501], 'val_loss': [1.4528231620788574], 'val_accuracy': [0.5416666865348816]}
Finished Epoch 11
Starting Epoch 12
{'loss': [1.0334734916687012], 'accuracy': [0.6693423390388489], 'val_loss': [1.756017804145813], 'val_accuracy': [0.45370370149612427]}
Finished Epoch 12
Starting Epoch 13
{'loss': [0.9913693070411682], 'accuracy': [0.6830657720565796], 'val_loss': [1.3440186977386475], 'val_accuracy': [0.5555555820465088]}
Finished Epoch 13
Starting Epoch 14
{'loss': [0.9723347425460815], 'accuracy': [0.68746

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import pickle

# Function to print the metrics in an organized way
def print_metrics(metrics):
    print("Metrics:")
    print("--------")
    for key, value in metrics.items():
        if isinstance(value, list):  # Check if the value is a list (e.g., confusion matrix)
            print(f"{key}:")
            for row in value:
                print(f"  {row}")
        else:
            print(f"{key}: {value}")

# Reading the metrics from the pickle file
try:
    with open('/content/drive/MyDrive/ProjetoCurso/metrics_ConvNeXtXLarge.pkl', 'rb') as fp:
        loaded_metrics = pickle.load(fp)
    print("Metrics successfully loaded from metrics.pkl")
    print_metrics(loaded_metrics)
except EOFError:
    print("Error: Ran out of input - the file may be empty or improperly written.")
except FileNotFoundError:
    print("Error: The file does not exist.")
except pickle.UnpicklingError:
    print("Error: The file contents are not a valid pickle.")


Metrics successfully loaded from metrics.pkl
Metrics:
--------
train_acc:
  [0.2206110805273056]
  [0.3441222310066223]
  [0.3995339274406433]
  [0.45028483867645264]
  [0.49870532751083374]
  [0.5300362706184387]
  [0.555411696434021]
  [0.5965820550918579]
  [0.6004660725593567]
  [0.632055938243866]
  [0.646038293838501]
  [0.6693423390388489]
  [0.6830657720565796]
  [0.687467634677887]
  [0.715691328048706]
  [0.7234593629837036]
  [0.7296737432479858]
  [0.7314862608909607]
  [0.7522009611129761]
  [0.7591921091079712]
  [0.7566028237342834]
  [0.7698084115982056]
  [0.7856032848358154]
  [0.7928534150123596]
  [0.8026928901672363]
  [0.8042464852333069]
  [0.8089073300361633]
  [0.812532365322113]
  [0.8094251751899719]
  [0.8293630480766296]
train_loss:
  [2.2320728302001953]
  [1.9696239233016968]
  [1.8097660541534424]
  [1.6551166772842407]
  [1.5358167886734009]
  [1.4321595430374146]
  [1.3441238403320312]
  [1.2538204193115234]
  [1.216763973236084]
  [1.1303728818893433]

## Training graph

In [None]:
# Merge history1 and history2
history = {}
history['loss'] = history1.history['loss'] + history2.history['loss']
history['acc'] = history1.history['acc'] + history2.history['acc']
history['val_loss'] = history1.history['val_loss'] + history2.history['val_loss']
history['val_acc'] = history1.history['val_acc'] + history2.history['val_acc']
history['lr'] = history1.history['lr'] + history2.history['lr']

In [None]:
# Plot the training graph
def plot_training(history):
    acc = history['acc']
    val_acc = history['val_acc']
    loss = history['loss']
    val_loss = history['val_loss']
    epochs = range(len(acc))

    fig, axes = plt.subplots(1, 2, figsize=(15,5))

    axes[0].plot(epochs, acc, 'r-', label='Training Accuracy')
    axes[0].plot(epochs, val_acc, 'b--', label='Validation Accuracy')
    axes[0].set_title('Training and Validation Accuracy')
    axes[0].legend(loc='best')

    axes[1].plot(epochs, loss, 'r-', label='Training Loss')
    axes[1].plot(epochs, val_loss, 'b--', label='Validation Loss')
    axes[1].set_title('Training and Validation Loss')
    axes[1].legend(loc='best')

    plt.show()

plot_training(history)

## Evaluate performance

In [None]:
# Prediction accuracy on train data
score = model.evaluate_generator(train_generator, verbose=1)
print("Prediction accuracy on train data =", score[1])

In [None]:
# Prediction accuracy on CV data
score = model.evaluate_generator(valid_generator, verbose=1)
print("Prediction accuracy on CV data =", score[1])

In [None]:
# Classification report and confusion matrix
from sklearn.metrics import *
import seaborn as sns

tick_labels = artists_top_name.tolist()

def showClassficationReport_Generator(model, valid_generator, STEP_SIZE_VALID):
    # Loop on each generator batch and predict
    y_pred, y_true = [], []
    for i in range(STEP_SIZE_VALID):
        (X,y) = next(valid_generator)
        y_pred.append(model.predict(X))
        y_true.append(y)

    # Create a flat list for y_true and y_pred
    y_pred = [subresult for result in y_pred for subresult in result]
    y_true = [subresult for result in y_true for subresult in result]

    # Update Truth vector based on argmax
    y_true = np.argmax(y_true, axis=1)
    y_true = np.asarray(y_true).ravel()

    # Update Prediction vector based on argmax
    y_pred = np.argmax(y_pred, axis=1)
    y_pred = np.asarray(y_pred).ravel()

    # Confusion Matrix
    fig, ax = plt.subplots(figsize=(10,10))
    conf_matrix = confusion_matrix(y_true, y_pred, labels=np.arange(n_classes))
    conf_matrix = conf_matrix/np.sum(conf_matrix, axis=1)
    sns.heatmap(conf_matrix, annot=True, fmt=".2f", square=True, cbar=False,
                cmap=plt.cm.jet, xticklabels=tick_labels, yticklabels=tick_labels,
                ax=ax)
    ax.set_ylabel('Actual')
    ax.set_xlabel('Predicted')
    ax.set_title('Confusion Matrix')
    plt.show()

    print('Classification Report:')
    print(classification_report(y_true, y_pred, labels=np.arange(n_classes), target_names=artists_top_name.tolist()))

showClassficationReport_Generator(model, valid_generator, STEP_SIZE_VALID)

# Evaluate performance by predicting on random images from dataset

In [None]:
# Prediction
from keras.preprocessing import *

n = 5
fig, axes = plt.subplots(1, n, figsize=(25,10))

for i in range(n):
    random_artist = random.choice(artists_top_name)
    random_image = random.choice(os.listdir(os.path.join(images_dir, random_artist)))
    random_image_file = os.path.join(images_dir, random_artist, random_image)

    # Original image

    test_image = image.load_img(random_image_file, target_size=(train_input_shape[0:2]))

    # Predict artist
    test_image = image.img_to_array(test_image)
    test_image /= 255.
    test_image = np.expand_dims(test_image, axis=0)

    prediction = model.predict(test_image)
    prediction_probability = np.amax(prediction)
    prediction_idx = np.argmax(prediction)

    labels = train_generator.class_indices
    labels = dict((v,k) for k,v in labels.items())

    #print("Actual artist =", random_artist.replace('_', ' '))
    #print("Predicted artist =", labels[prediction_idx].replace('_', ' '))
    #print("Prediction probability =", prediction_probability*100, "%")

    title = "Actual artist = {}\nPredicted artist = {}\nPrediction probability = {:.2f} %" \
                .format(random_artist.replace('_', ' '), labels[prediction_idx].replace('_', ' '),
                        prediction_probability*100)

    # Print image
    axes[i].imshow(plt.imread(random_image_file))
    axes[i].set_title(title)
    axes[i].axis('off')

plt.show()