In [20]:
import tensorflow as tf
import pandas as pd
import numpy as np
import os
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, SeparableConv2D, BatchNormalization, LeakyReLU
from tensorflow.keras.layers import MaxPooling2D, GlobalAveragePooling2D, Dense, SpatialDropout2D
from tensorflow.keras.preprocessing.image import load_img, img_to_array

def create_exact_matching_model(input_shape=(48, 48, 1)):
    inputs = Input(shape=input_shape)
    
    # Block 1
    x = SeparableConv2D(48, (3, 3), padding='same', name='separable_conv2d_20')(inputs)
    x = BatchNormalization(name='batch_normalization_18')(x)
    x = LeakyReLU(name='leaky_re_lu_18')(x)
    x = SeparableConv2D(48, (3, 3), padding='same', name='separable_conv2d_21')(x)
    x = BatchNormalization(name='batch_normalization_19')(x)
    x = LeakyReLU(name='leaky_re_lu_19')(x)
    x = SeparableConv2D(48, (3, 3), padding='same', name='separable_conv2d_22')(x)
    x = BatchNormalization(name='batch_normalization_20')(x)
    x = MaxPooling2D(pool_size=(2, 2), name='max_pooling2d_8')(x)
    x = SpatialDropout2D(0.1, name='spatial_dropout2d_8')(x)
    x = LeakyReLU(name='leaky_re_lu_20')(x)

    # Block 2
    x = SeparableConv2D(96, (3, 3), padding='same', name='separable_conv2d_23')(x)
    x = BatchNormalization(name='batch_normalization_21')(x)
    x = LeakyReLU(name='leaky_re_lu_21')(x)
    x = SeparableConv2D(96, (3, 3), padding='same', name='separable_conv2d_24')(x)
    x = BatchNormalization(name='batch_normalization_22')(x)
    x = LeakyReLU(name='leaky_re_lu_22')(x)
    x = SeparableConv2D(96, (3, 3), padding='same', name='separable_conv2d_25')(x)
    x = BatchNormalization(name='batch_normalization_23')(x)
    x = MaxPooling2D(pool_size=(2, 2), name='max_pooling2d_9')(x)
    x = SpatialDropout2D(0.1, name='spatial_dropout2d_9')(x)
    x = LeakyReLU(name='leaky_re_lu_23')(x)

    # Block 3
    x = SeparableConv2D(192, (3, 3), padding='same', name='separable_conv2d_26')(x)
    x = BatchNormalization(name='batch_normalization_24')(x)
    x = LeakyReLU(name='leaky_re_lu_24')(x)
    x = SeparableConv2D(192, (3, 3), padding='same', name='separable_conv2d_27')(x)
    x = BatchNormalization(name='batch_normalization_25')(x)
    x = MaxPooling2D(pool_size=(2, 2), name='max_pooling2d_10')(x)
    x = SpatialDropout2D(0.1, name='spatial_dropout2d_10')(x)
    x = LeakyReLU(name='leaky_re_lu_25')(x)

    # Block 4
    x = SeparableConv2D(384, (3, 3), padding='same', name='separable_conv2d_28')(x)
    x = BatchNormalization(name='batch_normalization_26')(x)
    x = MaxPooling2D(pool_size=(2, 2), name='max_pooling2d_11')(x)
    x = SpatialDropout2D(0.1, name='spatial_dropout2d_11')(x)
    x = LeakyReLU(name='leaky_re_lu_26')(x)

    # Final layers
    x = SeparableConv2D(
        filters=8,
        kernel_size=(1, 1),
        strides=(1, 1),
        padding='same',
        data_format='channels_last',
        dilation_rate=(1, 1),
        depth_multiplier=1,
        activation='linear',
        use_bias=True,
        depthwise_initializer=tf.keras.initializers.GlorotUniform(),
        pointwise_initializer=tf.keras.initializers.GlorotUniform(),
        bias_initializer=tf.keras.initializers.Zeros(),
        name='separable_conv2d_29'
    )(x)
    
    x = GlobalAveragePooling2D(name='global_average_pooling2d_2')(x)
    outputs = Dense(7, activation='softmax', name='predictions')(x)
    
    return Model(inputs=inputs, outputs=outputs)

class WarmupScheduler(tf.keras.callbacks.Callback):
    def __init__(self, warmup_epochs=5, initial_lr=1e-6, target_lr=1e-4):
        super().__init__()
        self.warmup_epochs = warmup_epochs
        self.initial_lr = initial_lr
        self.target_lr = target_lr
        
    def on_epoch_begin(self, epoch, logs=None):
        if epoch < self.warmup_epochs:
            lr = self.initial_lr + (self.target_lr - self.initial_lr) * (epoch / self.warmup_epochs)
            tf.keras.backend.set_value(self.model.optimizer.learning_rate, lr)

def custom_normalization(image):
    """Normalize image using same method as FER+ training"""
    image = image.astype('float32')
    image = image / 255.0
    image = (image - 0.5) * 2.0
    return image

def load_and_transfer_weights(base_model_path):
    base_model = tf.keras.models.load_model(base_model_path)
    new_model = create_exact_matching_model()
    
    for layer in new_model.layers:
        try:
            layer_idx = [i for i, l in enumerate(base_model.layers) if l.name == layer.name]
            if layer_idx:
                layer.set_weights(base_model.layers[layer_idx[0]].get_weights())
                print(f"Transferred weights for layer: {layer.name}")
        except:
            print(f"Could not transfer weights for layer: {layer.name}")
    
    return new_model

def prepare_dataset(labels_path, dataset_path):
    labels_df = pd.read_csv(labels_path)
    images = []
    labels = []
    demographic_data = []
    
    for idx, row in labels_df.iterrows():
        img_path = os.path.join(dataset_path, str(row['label']), row['image'])
        try:
            img = load_img(img_path, color_mode='grayscale', target_size=(48, 48))
            img_array = img_to_array(img)
            img_array = custom_normalization(img_array)
            images.append(img_array)
            labels.append(row['label'] - 1)  # Zero-based indexing
            demographic_data.append([row['Gender'], row['Age_Group']])
        except Exception as e:
            print(f"Error processing {img_path}: {str(e)}")
    
    X = np.array(images)
    y = tf.keras.utils.to_categorical(labels)
    demographic_data = np.array(demographic_data)
    
    return X, y, demographic_data

def train_model(base_model_path, labels_path, dataset_path, output_path):
    # Load and prepare data
    X, y, demographic_data = prepare_dataset(labels_path, dataset_path)
    
    # Create and compile model
    model = load_and_transfer_weights(base_model_path)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-6),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # Callbacks
    callbacks = [
        WarmupScheduler(),
        tf.keras.callbacks.EarlyStopping(
            monitor='val_accuracy',
            patience=10,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_accuracy',
            factor=0.5,
            patience=5,
            min_lr=1e-6
        )
    ]
    
    # Create train/val split
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    val_size = int(0.1 * len(X))
    val_indices = indices[:val_size]
    train_indices = indices[val_size:]

    X_train, X_val = X[train_indices], X[val_indices]
    y_train, y_val = y[train_indices], y[val_indices]

    # Train
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=1000,
        batch_size=16,  # Reduced batch size
        callbacks=callbacks,
        shuffle=True
    )
    
    # Save model
    model.save(output_path)
    return model, history

# Execute training
if __name__ == "__main__":
    BASE_MODEL_PATH = 'ferplus_model.h5'
    LABELS_PATH = 'archive/test_labels.csv'
    DATASET_PATH = 'archive/DATASET/test'
    OUTPUT_PATH = '2/DenseConvNet.h5'
    
    model, history = train_model(BASE_MODEL_PATH, LABELS_PATH, DATASET_PATH, OUTPUT_PATH)

Transferred weights for layer: separable_conv2d_20
Transferred weights for layer: batch_normalization_18
Transferred weights for layer: leaky_re_lu_18
Transferred weights for layer: separable_conv2d_21
Transferred weights for layer: batch_normalization_19
Transferred weights for layer: leaky_re_lu_19
Transferred weights for layer: separable_conv2d_22
Transferred weights for layer: batch_normalization_20
Transferred weights for layer: max_pooling2d_8
Transferred weights for layer: spatial_dropout2d_8
Transferred weights for layer: leaky_re_lu_20
Transferred weights for layer: separable_conv2d_23
Transferred weights for layer: batch_normalization_21
Transferred weights for layer: leaky_re_lu_21
Transferred weights for layer: separable_conv2d_24
Transferred weights for layer: batch_normalization_22
Transferred weights for layer: leaky_re_lu_22
Transferred weights for layer: separable_conv2d_25
Transferred weights for layer: batch_normalization_23
Transferred weights for layer: max_pooling

  saving_api.save_model(


In [7]:
import tensorflow as tf
import numpy as np
import pandas as pd
import os
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.metrics import accuracy_score

def custom_normalization(image):
    """Match training normalization"""
    image = image.astype('float32')
    image = image / 255.0
    image = (image - 0.5) * 2.0
    return image

def evaluate_model(model_path, test_labels_path, test_data_path):
    # Load model
    model = tf.keras.models.load_model(model_path)
    
    # Load test data with matching normalization
    test_df = pd.read_csv(test_labels_path)
    X_test = []
    y_test = []
    demo_data = []
    
    for idx, row in test_df.iterrows():
        img_path = os.path.join(test_data_path, str(row['label']), row['image'])
        try:
            img = load_img(img_path, color_mode='grayscale', target_size=(48, 48))
            img_array = img_to_array(img)
            img_array = custom_normalization(img_array)
            X_test.append(img_array)
            y_test.append(row['label'] - 1)  # Zero-based indexing
            demo_data.append([row['Gender'], row['Age_Group']])
        except Exception as e:
            print(f"Error processing {img_path}: {str(e)}")
    
    X_test = np.array(X_test)
    y_test = np.array(y_test)
    demo_data = np.array(demo_data)
    
    # Get predictions
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    
    # Calculate metrics
    overall_acc = accuracy_score(y_test, y_pred_classes)
    
    # Per-emotion accuracy
    emotion_accs = {}
    for i in range(7):
        mask = y_test == i
        if np.any(mask):
            acc = accuracy_score(y_test[mask], y_pred_classes[mask])
            emotion_accs[i] = acc
    
    # Demographic fairness
    gender_accs = {}
    for gender in [0, 1]:
        mask = demo_data[:, 0] == gender
        gender_accs[gender] = accuracy_score(y_test[mask], y_pred_classes[mask])
    
    age_accs = {}
    for age in range(1, 6):
        mask = demo_data[:, 1] == age
        age_accs[age] = accuracy_score(y_test[mask], y_pred_classes[mask])
    
    return overall_acc, emotion_accs, gender_accs, age_accs

# Usage
model_path = 'DenseConvNet.h5'
test_labels_path = '../archive/test_labels.csv'
test_data_path = '../archive/DATASET/test'

overall_acc, emotion_accs, gender_accs, age_accs = evaluate_model(
    model_path, test_labels_path, test_data_path)

# Print results
print("\nOverall Accuracy:", overall_acc)

print("\nEmotion-wise Accuracies:")
for emotion, acc in emotion_accs.items():
    print(f"Emotion {emotion}: {acc:.3f}")

print("\nGender Accuracies:")
for gender, acc in gender_accs.items():
    print(f"Gender {gender}: {acc:.3f}")

print("\nAge Group Accuracies:")
for age, acc in age_accs.items():
    print(f"Age Group {age}: {acc:.3f}")

# Calculate fairness scores
gender_fairness = min(gender_accs.values()) / max(gender_accs.values())
age_fairness = min(age_accs.values()) / max(age_accs.values())

print("\nFairness Scores:")
print(f"Gender Fairness: {gender_fairness:.3f}")
print(f"Age Fairness: {age_fairness:.3f}")


Overall Accuracy: 0.7900912646675359

Emotion-wise Accuracies:
Emotion 0: 0.742
Emotion 1: 0.257
Emotion 2: 0.231
Emotion 3: 0.931
Emotion 4: 0.713
Emotion 5: 0.599
Emotion 6: 0.857

Gender Accuracies:
Gender 0: 0.782
Gender 1: 0.798

Age Group Accuracies:
Age Group 1: 0.831
Age Group 2: 0.771
Age Group 3: 0.802
Age Group 4: 0.754
Age Group 5: 0.708

Fairness Scores:
Gender Fairness: 0.981
Age Fairness: 0.853
