In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint


In [None]:
# Set paths
BASE_DIR = Path("/app/data/processed/FC211002_Hirunika")
TRAIN_DIR = BASE_DIR / "train"
VAL_DIR = BASE_DIR / "test"
MODEL_SAVE_DIR = Path("/app/model/FC211002_Hirunika")
MODEL_SAVE_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
# Define Constants
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 20
NUM_CLASSES = 5
CLASS_NAMES = ['angry', 'happy', 'sad', 'stressed', 'neutral']

In [None]:
# Compute Class Weights (for imbalance)
class_counts = {
    'angry': 3995 + 436,
    'stressed': 4097 + 3171,
    'happy': 7215,
    'neutral': 4965,
    'sad': 4830
}
total_images = sum(class_counts.values())

class_weights = {
    i: total_images / (NUM_CLASSES * count)
    for i, count in enumerate([class_counts[cls] for cls in TARGET_CLASSES])
}

In [None]:
# Data Generators
train_datagen = ImageDataGenerator(
    rotation_range=10,
    zoom_range=0.1,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True
)

val_datagen = ImageDataGenerator()

train_generator = train_datagen.flow_from_directory(
    directory=str(TRAIN_DIR),
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True
)

val_generator = val_datagen.flow_from_directory(
    directory=str(VAL_DIR),
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

In [None]:
# EfficientNetB0 from Scratch
# def build_efficientnetb0(input_shape=INPUT_SHAPE, num_classes=NUM_CLASSES):
#     inputs = layers.Input(shape=input_shape)

#     # Stem
#     x = layers.Conv2D(32, 3, strides=2, padding='same', use_bias=False)(inputs)
#     x = layers.BatchNormalization()(x)
#     x = layers.Activation('swish')(x)

#     # MBConv block
#     def mbconv_block(x, filters, kernel_size, strides, expansion):
#         shortcut = x
#         x = layers.Conv2D(filters * expansion, 1, padding='same', use_bias=False)(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.Activation('swish')(x)

#         x = layers.DepthwiseConv2D(kernel_size, strides=strides, padding='same', use_bias=False)(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.Activation('swish')(x)

#         x = layers.Conv2D(filters, 1, padding='same', use_bias=False)(x)
#         x = layers.BatchNormalization()(x)

#         if strides == 1 and shortcut.shape[-1] == filters:
#             x = layers.Add()([x, shortcut])
#         return x

#     # EfficientNetB0 structure
#     x = mbconv_block(x, 16, 3, 1, 1)
#     x = mbconv_block(x, 24, 3, 2, 6)
#     x = mbconv_block(x, 24, 3, 1, 6)
#     x = mbconv_block(x, 40, 5, 2, 6)
#     x = mbconv_block(x, 40, 5, 1, 6)
#     x = mbconv_block(x, 80, 3, 2, 6)
#     x = mbconv_block(x, 80, 3, 1, 6)
#     x = mbconv_block(x, 112, 5, 1, 6)
#     x = mbconv_block(x, 112, 5, 1, 6)
#     x = mbconv_block(x, 192, 3, 2, 6)
#     x = mbconv_block(x, 192, 3, 1, 6)
#     x = mbconv_block(x, 320, 3, 1, 6)

    # # Head
    # x = layers.Conv2D(1280, 1, padding='same', use_bias=False)(x)
    # x = layers.BatchNormalization()(x)
    # x = layers.Activation('swish')(x)
    # x = layers.GlobalAveragePooling2D()(x)
    # x = layers.Dropout(0.5)(x)
    # outputs = layers.Dense(num_classes, activation='softmax')(x)

    # return models.Model(inputs, outputs)


In [None]:
# Build EfficientNetB0 Model (from scratch)

# Create input layer
inputs = Input(shape=(224, 224, 3))

# Build EfficientNetB0 base (no weights from ImageNet)
base_model = EfficientNetB0(include_top=False, weights=None, input_tensor=inputs)

# Add custom classification layers
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.4)(x)
outputs = Dense(NUM_CLASSES, activation='softmax')(x)

model = Model(inputs, outputs)

# Compile
model.compile(optimizer=Adam(learning_rate=0.001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

model.summary()

In [None]:
# Compile Model
# model = build_efficientnetb0()
# model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
#     loss='categorical_crossentropy',
#     metrics=['accuracy']
# )
# model.summary()

model.compile(optimizer=Adam(learning_rate=0.001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

model.summary()


In [None]:

# Callbacks
checkpoint = ModelCheckpoint(
    filepath=str(MODEL_SAVE_DIR / "efficientnetb0_model.h5"),
    monitor='val_accuracy',
    save_best_only=True,
    verbose=1
)

early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
#     str(CHECKPOINT_DIR / "efficientnetb0_best.h5"),
#     save_best_only=True,
#     monitor='val_loss',
#     mode='min',
#     verbose=1
# )

# lr_scheduler_cb = tf.keras.callbacks.ReduceLROnPlateau(
#     monitor='val_loss',
#     factor=0.5,
#     patience=5,
#     min_lr=1e-6,
#     verbose=1
# )

# early_stop_cb = tf.keras.callbacks.EarlyStopping(
#     monitor='val_loss',
#     patience=10,
#     restore_best_weights=True,
#     verbose=1
# )

In [None]:
# Train Model
# history = model.fit(
#     train_generator,
#     steps_per_epoch=train_generator.samples // BATCH_SIZE,
#     validation_data=val_generator,
#     validation_steps=val_generator.samples // BATCH_SIZE,
#     epochs=EPOCHS,
#     class_weight=class_weights,
#     callbacks=[checkpoint_cb, lr_scheduler_cb, early_stop_cb],
#     verbose=1
# )

history = model.fit(
    train_generator,
    epochs=EPOCHS,
    validation_data=val_generator,
    callbacks=[checkpoint, early_stop]
)

In [None]:
# Visualize training performance
def plot_history(hist):
    plt.figure(figsize=(12, 5))

    # Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(hist.history['accuracy'], label='Train Accuracy')
    plt.plot(hist.history['val_accuracy'], label='Val Accuracy')
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    # Loss
    plt.subplot(1, 2, 2)
    plt.plot(hist.history['loss'], label='Train Loss')
    plt.plot(hist.history['val_loss'], label='Val Loss')
    plt.title('Loss over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

plot_history(history)

In [None]:
# Save model architecture & class indices
# Save class index mapping
import json
with open(MODEL_SAVE_DIR / "class_indices.json", "w") as f:
    json.dump(train_generator.class_indices, f)

# Save model config
model_json = model.to_json()
with open(MODEL_SAVE_DIR / "efficientnetb0_model.json", "w") as f:
    f.write(model_json)

print(f" Model training complete and saved to: {MODEL_SAVE_DIR}")