In [1]:
import sys
!{sys.executable} -m pip install keras-cv



In [None]:
# 2. IMPORT LIBRARIES
import os

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns

from tensorflow import keras

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.optimizers import AdamW

from sklearn.metrics import classification_report, confusion_matrix


from src.config import train_dir, test_dir, val_dir
print("✅ Libraries installed and imported.")

In [None]:
transfer_img_height = 224
transfer_img_width = 224
transfer_color_mode = 'rgb'
batch_size = 32

In [None]:
train_ds = tf.keras.utils.image_dataset_from_directory(
    train_dir,
    labels='inferred',
    label_mode = 'categorical',
    image_size = (transfer_img_height, transfer_img_width),
    color_mode = transfer_color_mode,
    batch_size = batch_size,
    shuffle = True,
    seed = 42
)

val_ds = tf.keras.utils.image_dataset_from_directory(
    val_dir,
    labels='inferred',
    label_mode = 'categorical',
    image_size = (transfer_img_height, transfer_img_width),
    color_mode = transfer_color_mode,
    batch_size = batch_size,
    shuffle = False
)

test_ds = tf.keras.utils.image_dataset_from_directory(
    test_dir,
    labels='inferred',
    label_mode = 'categorical',
    image_size = (transfer_img_height, transfer_img_width),
    color_mode = transfer_color_mode,
    batch_size = batch_size,
    shuffle = False
)

In [None]:
from keras_cv.layers import RandomBrightness, RandomContrast, RandomSaturation, RandomHue, RandomFlip, RandomRotation, RandomZoom

data_augmentation = Sequential([
    RandomContrast(value_range=(0, 255), factor=0.2),
    RandomZoom(0.05),
    RandomFlip("horizontal"),
    RandomRotation(0.05),
], name="keras_cv_augmentation")

train_ds_augmented = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
preprocess_input_resnet50 = tf.keras.applications.resnet50.preprocess_input

train_ds_final = train_ds_augmented.map(lambda x, y: (preprocess_input_resnet50(x), y), num_parallel_calls=tf.data.AUTOTUNE)
val_ds_final = val_ds.map(lambda x, y: (preprocess_input_resnet50(x), y), num_parallel_calls=tf.data.AUTOTUNE)
test_ds_final = test_ds.map(lambda x, y: (preprocess_input_resnet50(x), y), num_parallel_calls=tf.data.AUTOTUNE)
print("✅ Datasets for transfer learning are ready.")

In [None]:
def calculate_weights_from_dir(directory):
    """
    Calculates class weights by counting files in subdirectories.
    This is much faster than iterating through a tf.data.Dataset.

    Args:
        directory (str): The path to the directory containing class subfolders
                         (e.g., your TRAIN_DIR).

    Returns:
        dict: A dictionary mapping class index to its calculated weight.
    """
    print(f"Calculating class weights from directory: {directory}")

    # Get class names from subfolder names, sorted alphabetically
    # This ensures consistent ordering with what image_dataset_from_directory does
    class_names = sorted([d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))])

    num_classes = len(class_names)
    class_counts = []

    # Count files in each class subfolder
    for class_name in class_names:
        class_path = os.path.join(directory, class_name)
        count = len(os.listdir(class_path))
        class_counts.append(count)
        print(f"- Found {count} images for class '{class_name}'")

    class_counts = np.array(class_counts)
    total_samples = np.sum(class_counts)

    # Calculate weights using the 'balanced' formula:
    # weight = total_samples / (num_classes * count_for_that_class)
    class_weights_array = total_samples / (num_classes * class_counts)

    # Create the dictionary mapping class index to weight
    class_weights_dict = {i: weight for i, weight in enumerate(class_weights_array)}

    print("\nCalculated Class Weights:", class_weights_dict)
    return class_weights_dict

In [None]:
class_weights = calculate_weights_from_dir(train_dir)

print(class_weights)

In [None]:
def evaluate_models(history, model):
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper right')
    plt.show()

    # summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()

    best_model = tf.keras.models.load_model(model)
    # Evaluation
    print("\nEvaluating model performance on the train set...\n")
    results = best_model.evaluate(train_ds_final, verbose=1)

    train_loss = results[0]
    train_accuracy = results[1]

    print(f"Final Train Loss: {train_loss:.4f}\n")
    print(f"Final Train Accuracy: {train_accuracy * 100:.2f}%\n")

    print("\nEvaluating model performance on the val set...\n")
    results = best_model.evaluate(val_ds_final, verbose=1)

    val_loss = results[0]
    val_accuracy = results[1]

    print(f"Final Val Loss: {val_loss:.4f}\n")
    print(f"Final Val Accuracy: {val_accuracy * 100:.2f}%\n")

    print("\nEvaluating model performance on the test set...\n")
    results = best_model.evaluate(test_ds_final, verbose=1)

    test_loss = results[0]
    test_accuracy = results[1]

    print(f"Final Test Loss: {test_loss:.4f}\n")
    print(f"Final Test Accuracy: {test_accuracy * 100:.2f}%\n")

    y_pred_probs  = best_model.predict(test_ds_final)
    y_pred = np.argmax(y_pred_probs, axis=1)

    y_true = []
    for images, labels in test_ds_final:
      y_true.extend(np.argmax(labels.numpy(), axis=1))

      image_classes = train_ds.class_names

    # Confusion matrix visualization
    conf_matrix = confusion_matrix(y_true, y_pred)

    plt.figure(figsize=(8, 8))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=image_classes, yticklabels=image_classes)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    print(f"\n\nClassification Report:\n{classification_report(y_true, y_pred, target_names=image_classes)}")

In [None]:
num_classes = 7
input_shape = (transfer_img_height, transfer_img_width, 3)

batch = 32
epochs = 30
optimizer = AdamW(learning_rate=0.001, weight_decay=0.0001)

In [None]:
def build_transfer_model(input_shape, num_classes):
  base_model = tf.keras.applications.ResNet50(
      include_top = False,
      weights = 'imagenet',
      input_shape = input_shape
  )

  base_model.trainable = False

  model = Sequential([
      base_model,
      GlobalAveragePooling2D(),
      Dense(256, activation='relu'),
      Dropout(0.5),
      Dense(num_classes, activation='softmax')
  ], name="ResNet50_Transfer_Model")

  return model

In [None]:
transfer_model = build_transfer_model(input_shape=input_shape, num_classes=num_classes)

In [None]:
save_model = ModelCheckpoint('/resnet_model_data_aug.keras', save_best_only=True, monitor='val_loss', mode='min', verbose=2)
early_stop = EarlyStopping(monitor='val_accuracy',  patience=10,  restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=3, min_lr=1e-6, verbose=1)
transfer_model.compile(
    optimizer = optimizer,
    loss = 'categorical_crossentropy',
    metrics = ['accuracy']
)
history = transfer_model.fit(train_ds_final, epochs=epochs, validation_data=val_ds_final,
                    callbacks=[save_model, early_stop, lr_scheduler], class_weight=class_weights, verbose=1)

evaluate_models(history, model='resnet_model_data_aug.keras')

In [None]:
base_model = transfer_model.layers[0]
base_model.trainable = True

fine_tune_at = -20

for layer in base_model.layers[:fine_tune_at]:
  layer.trainable = False

optimizer_finetune = AdamW(learning_rate=1e-5, weight_decay=0.0001)
fine_tune_epochs = 20
total_epochs = epochs + fine_tune_epochs

transfer_model.compile(
    optimizer = optimizer_finetune,
    loss = 'categorical_crossentropy',
    metrics = ['accuracy']
)

transfer_model.summary()

In [None]:
save_model_finetune = ModelCheckpoint('/finetuned_resnet_model_data_aug.keras', save_best_only=True, monitor='val_loss', mode='min', verbose=1)
history_finetune = transfer_model.fit(
    train_ds_final, epochs=total_epochs, initial_epoch=history.epoch[-1],
    validation_data=val_ds_final, callbacks=[save_model_finetune, early_stop, lr_scheduler],
    class_weight=class_weights)

evaluate_models(history_finetune, model='finetuned_resnet_model_data_aug.keras')