In [14]:
# =============================================================================
# 1. IMPORTS & CONFIGURATION
# =============================================================================
import os
import numpy as np
import cv2
import glob
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, applications, optimizers, callbacks
from tensorflow_addons.optimizers import AdamW
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
from vit_keras import vit  # Ensure you have installed vit-keras

# Global configuration
IMG_SIZE = 224         # Use 224x224 resolution for transfer learning
BATCH_SIZE = 32
NUM_CLASSES = 7        # Update if you merge datasets with a different number of emotions

In [15]:
# =============================================================================
# 2. DATA LOADING & PREPROCESSING FUNCTIONS
#    (Reworking the AffectNet and FER2013 pipelines)
# =============================================================================
def load_and_preprocess_image(path, target_size=(IMG_SIZE, IMG_SIZE)):
    """
    Loads an image from disk, converts grayscale images to RGB,
    resizes to target_size, and applies EfficientNet preprocessing.
    """
    img = cv2.imread(path)
    if img is None:
        raise ValueError(f"Unable to load image at: {path}")
    # If image is grayscale, convert to RGB
    if len(img.shape) == 2 or img.shape[2] == 1:
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
    else:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, target_size)
    # Use the EfficientNet preprocessing (assumes model input is in range [-1,1])
    img = applications.efficientnet.preprocess_input(img)
    return img

def load_dataset_from_directory(root_dir, extensions=('.jpg', '.jpeg', '.png', '.bmp', '.tiff')):
    """
    Loads images from a directory whose subdirectories are emotion labels.
    Converts images to 224x224, 3-channel format.
    Returns NumPy arrays for images and integer labels.
    """
    X, y = [], []
    # Assume each subdirectory is named with the emotion label
    classes = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
    class_to_idx = {cls: idx for idx, cls in enumerate(classes)}
    for cls in classes:
        cls_dir = os.path.join(root_dir, cls)
        for file in os.listdir(cls_dir):
            if file.lower().endswith(extensions):
                try:
                    img = load_and_preprocess_image(os.path.join(cls_dir, file))
                    X.append(img)
                    y.append(class_to_idx[cls])
                except Exception as e:
                    print(f"Error processing {file}: {e}")
    return np.array(X), np.array(y)

In [16]:
# =============================================================================
# 3. LOAD & PREPROCESS JAFFE DATASET
#    (Assumes the dataset is organized into subdirectories named after the 7 emotions)
# =============================================================================

# Global configuration for image processing
IMG_SIZE = 224  # Target size for model input

# Define the emotion classes (as expected in the dataset folders)
CLASSES = ['anger', 'disgust', 'fear', 'happiness', 'neutral', 'sadness', 'surprise']
# Create a mapping from emotion (folder name) to label index
class_to_idx = {emotion: i for i, emotion in enumerate(CLASSES)}

# Path to the curated JAFFE dataset
jaffe_dir = "/home/natalyagrokh/img_datasets/jaffe_dataset"  # Adjust if necessary

X_list = []
y_list = []

# Iterate over each folder in the JAFFE dataset directory
for emotion in os.listdir(jaffe_dir):
    # Ensure the folder corresponds to one of the expected emotion classes
    if emotion.lower() in CLASSES:
        emotion_path = os.path.join(jaffe_dir, emotion)
        if os.path.isdir(emotion_path):
            for img_file in os.listdir(emotion_path):
                # Consider common image file extensions
                if img_file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
                    img_path = os.path.join(emotion_path, img_file)
                    # Read the image in grayscale (JAFFE images are typically grayscale)
                    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                    if img is None:
                        continue  # Skip if the image fails to load
                    # Resize the image to the target dimensions
                    img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
                    # Convert the single-channel image to 3 channels by duplicating the grayscale data
                    img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
                    # Apply EfficientNet preprocessing (which scales pixel values appropriately)
                    img = applications.efficientnet.preprocess_input(img)
                    X_list.append(img)
                    y_list.append(class_to_idx[emotion.lower()])

# Convert the lists to NumPy arrays
X_jaffe = np.array(X_list)
y_jaffe = np.array(y_list)

print(f"JAFFE dataset: Loaded {X_jaffe.shape[0]} samples with {len(np.unique(y_jaffe))} unique classes.")

JAFFE dataset: Loaded 213 samples with 7 unique classes.


In [17]:
# =============================================================================
# 4. DATA AUGMENTATION
# =============================================================================

# Split the JAFFE dataset into training and validation sets (80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(
    X_jaffe, 
    y_jaffe, 
    test_size=0.2, 
    random_state=42, 
    stratify=y_jaffe
)

# Create a training data generator with augmentation
train_datagen = ImageDataGenerator(
    rotation_range=25,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    brightness_range=[0.7, 1.3]
)

# Create the training generator. We convert the labels to one-hot encoding.
train_generator = train_datagen.flow(
    X_train,
    tf.keras.utils.to_categorical(y_train, NUM_CLASSES),
    batch_size=BATCH_SIZE
)

# (Optional) Create a validation generator without augmentation (only preprocessing)
val_datagen = ImageDataGenerator()  # No augmentation here
val_generator = val_datagen.flow(
    X_val,
    tf.keras.utils.to_categorical(y_val, NUM_CLASSES),
    batch_size=BATCH_SIZE
)

print(f"Training samples: {X_train.shape[0]}, Validation samples: {X_val.shape[0]}")

Training samples: 170, Validation samples: 43


In [18]:
# =============================================================================
# 5. MODEL ARCHITECTURE & IMPROVEMENTS
#    (Based on your LATEST CODE with ensemble & teacherâ€“student distillation)
# =============================================================================
# --- Base Model (EfficientNetB3) ---
# Set global constants (ensure these are defined in your notebook)
IMG_SIZE = 224         # e.g., 224
NUM_CLASSES = 7        # e.g., 7 emotion classes

# --- Base Model (EfficientNetB3) ---
base_model = applications.EfficientNetB3(
    weights='imagenet',
    include_top=False,
    input_shape=(IMG_SIZE, IMG_SIZE, 3)
)
# Freeze early layers
for layer in base_model.layers[:150]:
    layer.trainable = False

# Build classification head on top of base_model
x = base_model.output
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(512, activation='swish', kernel_regularizer=l2(0.001))(x)
x = layers.Dropout(0.5)(x)
x = layers.BatchNormalization()(x)
outputs = layers.Dense(NUM_CLASSES, activation='softmax')(x)
model = models.Model(inputs=base_model.input, outputs=outputs)

# --- Focal Loss Function ---
def focal_loss(gamma=2., alpha=0.25):
    def loss_fn(y_true, y_pred):
        ce = tf.keras.losses.categorical_crossentropy(y_true, y_pred)
        pt = tf.exp(-ce)
        return tf.reduce_mean(alpha * (1 - pt) ** gamma * ce)
    return loss_fn

# --- Compile Base Model with Focal Loss and AdamW Optimizer ---
optimizer = AdamW(learning_rate=1e-3)
model.compile(optimizer=optimizer, loss=focal_loss(), metrics=['accuracy'])
lr_schedule = optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-3, decay_steps=1000, decay_rate=0.9
)

# --- Load Pre-Trained Models for Ensemble (Teacher) ---
# (Assuming you have saved models with ~0.600 and ~0.622 accuracy)
model1 = tf.keras.models.load_model('/home/natalyagrokh/img_expressions/pre-trained models/final_affectnet_model.keras', compile=False)
model2 = tf.keras.models.load_model('/home/natalyagrokh/img_expressions/final_efficientnet_trained_model', compile=False)
ensemble_input = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
out1 = model1(ensemble_input)
out2 = model2(ensemble_input)
ensemble_outputs = layers.Average()([out1, out2])
ensemble_model = models.Model(ensemble_input, ensemble_outputs)
teacher_model = ensemble_model  # Teacher for distillation

# --- Student Model (A Smaller Architecture) ---
def build_smaller_model():
    base = applications.EfficientNetB0(
        weights='imagenet',
        include_top=False,
        input_shape=(IMG_SIZE, IMG_SIZE, 3)
    )
    for layer in base.layers:
        layer.trainable = False
    y_sm = layers.GlobalAveragePooling2D()(base.output)
    y_sm = layers.Dense(256, activation='relu')(y_sm)
    y_sm = layers.Dropout(0.5)(y_sm)
    outputs_sm = layers.Dense(NUM_CLASSES, activation='softmax')(y_sm)
    return models.Model(base.input, outputs_sm)

student_model = build_smaller_model()

# Compile the student model with a combined loss:
# 50% categorical crossentropy + 50% KL divergence from teacher predictions
student_model.compile(
    optimizer='adam',
    loss=lambda y_true, y_pred: 0.5 * tf.keras.losses.categorical_crossentropy(y_true, y_pred)
           + 0.5 * tf.keras.losses.KLDivergence()(teacher_model(tf.keras.backend.stop_gradient(y_true)), y_pred),
    metrics=['accuracy']
)

# --- (Optional) Integrate a Vision Transformer (ViT) Branch ---
vit_model = vit.vit_b16(
    image_size=IMG_SIZE,
    activation='softmax',
    pretrained=True,
    include_top=False
)
# For example, you might combine CNN and ViT features as follows:
x_vit = base_model.output
x_vit = vit_model(x_vit)
x_vit = layers.MultiHeadAttention(num_heads=4, key_dim=64)(x_vit, x_vit)
# (This branch is optional and can be merged with your primary branch as needed.)

TypeError: missing a required argument: 'weight_decay'

In [None]:
# =============================================================================
# 6. TRAINING & EVALUATION
# =============================================================================
# Define callbacks
early_stop = callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3)
checkpoint = callbacks.ModelCheckpoint('best_model.keras', monitor='val_accuracy', save_best_only=True)

# Train the base model (or student model) using the training generator and validation set
history = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // BATCH_SIZE,
    validation_data=(X_val, tf.keras.utils.to_categorical(y_val, NUM_CLASSES)),
    epochs=50,
    callbacks=[early_stop, reduce_lr, checkpoint],
    class_weight=class_weights
)

# Evaluate on test data
test_loss, test_acc = model.evaluate(X_test, tf.keras.utils.to_categorical(y_test, NUM_CLASSES))
print(f"Test Accuracy: {test_acc:.4f}")

In [None]:
# =============================================================================
# 7. OPTIONAL: VISUALIZATION (e.g., Confusion Matrix, CAM)
# =============================================================================
from sklearn.metrics import ConfusionMatrixDisplay

y_pred = model.predict(X_test)
ConfusionMatrixDisplay.from_predictions(np.argmax(tf.keras.utils.to_categorical(y_test, NUM_CLASSES), axis=1),
                                          np.argmax(y_pred, axis=1))
plt.show()

# For Class Activation Mapping (CAM), you can extract the penultimate features and dot with final dense weights.
gap_weights = model.layers[-1].get_weights()[0]
cam_model = models.Model(inputs=model.input, outputs=(model.layers[-3].output, model.output))
# For a given image (img_array should be shape (1, IMG_SIZE, IMG_SIZE, 3)):
# features, results = cam_model.predict(img_array)
# heatmap = np.dot(features, gap_weights)
# plt.imshow(heatmap, cmap='jet'); plt.show()
