In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("subhaditya/fer2013plus")

print("Path to dataset files:", path)
import os
import cv2
import h5py
import numpy as np
import h5py
import datetime
import numpy as np
import tensorflow as tf
from tensorflow import keras
from sklearn.utils import shuffle
from sklearn.utils.class_weight import compute_class_weight

def load_data(
    path_prefix,
    dataset_name,
    splits=['train', 'test'],
):
    X, y = {}, {}

    IMG_SIZE = 224 if 'RAFDB' in dataset_name else 120
    splits = ['train', 'test'] if 'RAFDB' in dataset_name else splits
    classNames = ['anger', 'disgust', 'fear', 'happiness', 'neutral', 'sadness', 'surprise'] if 'RAFDB' in dataset_name else ['anger', 'contempt', 'disgust', 'fear', 'happiness', 'neutral', 'sadness', 'surprise']

    for split in splits:
        PATH = os.path.join(path_prefix, dataset_name, split)
        X[split], y[split] = [], []
        for classes in os.listdir(PATH):
            class_path = os.path.join(PATH, classes)
            class_numeric = classNames.index(classes)
            for sample in os.listdir(class_path):
                sample_path = os.path.join(class_path, sample)
                image = cv2.imread(sample_path, cv2.IMREAD_COLOR)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                image = cv2.resize(image, (IMG_SIZE, IMG_SIZE))
                X[split].append(image)
                y[split].append(class_numeric)

    # Convert to numpy arrays
    for split in splits:
        X[split] = np.array(X[split])
        y[split] = np.array(y[split])

    return X, y

X, y = load_data('../kaggle/input/fer2013plus/fer2013plus', dataset_name='fer2013')

with h5py.File('ferp.h5', 'w') as dataset:
    for split in X.keys():
        dataset.create_dataset(f'X_{split}', data=X[split])
        dataset.create_dataset(f'y_{split}', data=y[split])

del X, y

In [None]:
NUM_CLASSES = 8
IMG_SHAPE = (120, 120, 3)
BATCH_SIZE = 8

TRAIN_EPOCH = 100
TRAIN_LR = 1e-3
TRAIN_ES_PATIENCE = 5
TRAIN_LR_PATIENCE = 3
TRAIN_MIN_LR = 1e-6
TRAIN_DROPOUT = 0.1

FT_EPOCH = 500
FT_LR = 1e-5
FT_LR_DECAY_STEP = 80.0
FT_LR_DECAY_RATE = 1
FT_ES_PATIENCE = 20
FT_DROPOUT = 0.2

ES_LR_MIN_DELTA = 0.003

# Load your data here, PAtt-Lite was trained with h5py for shorter loading time
with h5py.File('ferp.h5', 'r') as dataset:
    X_train = np.array(dataset['X_train'])
    y_train = np.array(dataset['y_train'])
    X_test = np.array(dataset['X_test'])
    y_test = np.array(dataset['y_test'])
X_train, y_train = shuffle(X_train, y_train)

print("Shape of train_sample: {}".format(X_train.shape))
print("Shape of train_label: {}".format(y_train.shape))
print("Shape of test_sample: {}".format(X_test.shape))
print("Shape of test_label: {}".format(y_test.shape))

class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights = dict(enumerate(class_weights))

# Model Building - PAtt-Lite Architecture
input_layer = tf.keras.Input(shape=IMG_SHAPE, name='universal_input')
sample_resizing = tf.keras.layers.Resizing(224, 224, name="resize")
data_augmentation = tf.keras.Sequential([tf.keras.layers.RandomFlip(mode='horizontal'),
                                        tf.keras.layers.RandomContrast(factor=0.3)], name="augmentation")
preprocess_input = tf.keras.applications.mobilenet.preprocess_input

backbone = tf.keras.applications.mobilenet.MobileNet(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
backbone.trainable = False
base_model = tf.keras.Model(backbone.input, backbone.layers[-29].output, name='base_model')

# Self-attention layer for PAtt-Lite
self_attention = tf.keras.layers.Attention(use_scale=True, name='attention')

patch_extraction = tf.keras.Sequential([
    tf.keras.layers.SeparableConv2D(256, kernel_size=4, strides=4, padding='same', activation='relu'),
    tf.keras.layers.SeparableConv2D(256, kernel_size=2, strides=2, padding='valid', activation='relu'),
    tf.keras.layers.Conv2D(256, kernel_size=1, strides=1, padding='valid', activation='relu')
], name='patch_extraction')


In [None]:
# Custom layer for reshaping spatial features for attention
class SpatialAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, use_scale=True, name=None):
        super(SpatialAttentionLayer, self).__init__(name=name)
        self.attention = tf.keras.layers.Attention(use_scale=use_scale)

    def call(self, inputs):
        # inputs shape: (batch, height, width, channels)
        batch_size = tf.shape(inputs)[0]
        height = tf.shape(inputs)[1]
        width = tf.shape(inputs)[2]
        channels = tf.shape(inputs)[3]

        # Reshape to (batch, height*width, channels)
        x_reshaped = tf.reshape(inputs, (batch_size, height * width, channels))

        # Apply attention
        x_attention = self.attention([x_reshaped, x_reshaped])

        # Reshape back to (batch, height, width, channels)
        x_output = tf.reshape(x_attention, (batch_size, height, width, channels))

        return x_output

# Modified approach: Apply attention to spatial features before pooling
def create_patt_lite_model(input_layer, training_phase=True):
    inputs = input_layer
    x = sample_resizing(inputs)
    x = data_augmentation(x)
    x = preprocess_input(x)
    x = base_model(x, training=False)
    x = patch_extraction(x)

    # Apply spatial attention using custom layer
    spatial_attention = SpatialAttentionLayer(use_scale=True, name='spatial_attention')
    x_attended = spatial_attention(x)

    # Apply dropout and pooling based on training phase
    if training_phase:
        x_pooled = tf.keras.layers.GlobalAveragePooling2D(name='gap')(x_attended)
        x_pooled = tf.keras.layers.Dropout(TRAIN_DROPOUT)(x_pooled)
    else:
        x_dropout = tf.keras.layers.SpatialDropout2D(FT_DROPOUT)(x_attended)
        x_pooled = tf.keras.layers.GlobalAveragePooling2D(name='gap_ft')(x_dropout)
        x_pooled = tf.keras.layers.Dropout(FT_DROPOUT)(x_pooled)

    # Pre-classification layers
    x_pre = tf.keras.layers.Dense(32, activation='relu', name='pre_dense')(x_pooled)
    x_pre = tf.keras.layers.BatchNormalization(name='pre_bn')(x_pre)

    if not training_phase:
        x_pre = tf.keras.layers.Dropout(FT_DROPOUT)(x_pre)

    # Final classification
    outputs = tf.keras.layers.Dense(NUM_CLASSES, activation="softmax", name='classification_head')(x_pre)

    return tf.keras.Model(inputs, outputs)

# Create initial training model
model = create_patt_lite_model(input_layer, training_phase=True)
model._name = 'train-head'

model.compile(optimizer=keras.optimizers.Adam(learning_rate=TRAIN_LR, global_clipnorm=3.0),
              loss='sparse_categorical_crossentropy', metrics=['accuracy'])

print("Model summary:")
model.summary()

# Training callbacks
early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=TRAIN_ES_PATIENCE,
                                                           min_delta=ES_LR_MIN_DELTA, restore_best_weights=True)
learning_rate_callback = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', patience=TRAIN_LR_PATIENCE,
                                                              verbose=0, min_delta=ES_LR_MIN_DELTA, min_lr=TRAIN_MIN_LR)

# Initial training
print("Starting initial training...")
history = model.fit(X_train, y_train, epochs=TRAIN_EPOCH, batch_size=BATCH_SIZE, verbose=1,
                    class_weight=class_weights, validation_split=0.2,
                    callbacks=[early_stopping_callback, learning_rate_callback])

test_loss, test_acc = model.evaluate(X_test, y_test)
print(f"Initial training - Test accuracy: {test_acc:.4f}")

In [None]:
# Model Finetuning
print("\nFinetuning ...")
unfreeze = 59
base_model.trainable = True
fine_tune_from = len(base_model.layers) - unfreeze
for layer in base_model.layers[:fine_tune_from]:
    layer.trainable = False
for layer in base_model.layers[fine_tune_from:]:
    if isinstance(layer, tf.keras.layers.BatchNormalization):
        layer.trainable = False

# Create finetuning model
model_ft = create_patt_lite_model(input_layer, training_phase=False)
model_ft._name = 'finetune-backbone'

model_ft.compile(optimizer=keras.optimizers.Adam(learning_rate=FT_LR, global_clipnorm=3.0),
                 loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Finetuning callbacks
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', min_delta=ES_LR_MIN_DELTA,
                                                           patience=FT_ES_PATIENCE, restore_best_weights=True)
scheduler = keras.optimizers.schedules.InverseTimeDecay(initial_learning_rate=FT_LR,
                                                       decay_steps=FT_LR_DECAY_STEP,
                                                       decay_rate=FT_LR_DECAY_RATE)
scheduler_callback = tf.keras.callbacks.LearningRateScheduler(schedule=scheduler)

# Fix: Use ReduceLROnPlateau instead of InverseTimeDecay to avoid float conversion issues
reduce_lr_callback = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy',
                                                          factor=0.5,
                                                          patience=5,
                                                          min_lr=1e-7,
                                                          verbose=1)

# Finetuning training
print("Starting finetuning...")
history_finetune = model_ft.fit(X_train, y_train, epochs=FT_EPOCH, batch_size=BATCH_SIZE, verbose=1,
                                validation_split=0.2,
                                initial_epoch=len(history.epoch) if history.epoch else 0,
                                callbacks=[early_stopping_callback, reduce_lr_callback, tensorboard_callback])

test_loss, test_acc = model_ft.evaluate(X_test, y_test)
print(f"Final test accuracy: {test_acc:.4f}")
model_ft.save('model.h5')
#supuestamente es mejor guardarlo de la siguiente manera
model.save('model.keras')