In [2]:
import os, json, hashlib, random, warnings
import numpy as np
import pandas as pd

In [3]:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, Conv2D, MaxPooling2D, GlobalAveragePooling2D,
    Dense, Dropout, BatchNormalization, ReLU
)
from tensorflow.keras.callbacks import (
    ReduceLROnPlateau, ModelCheckpoint, CSVLogger, EarlyStopping
)

In [4]:
SEED = 42
np.random.seed(SEED); random.seed(SEED); tf.random.set_seed(SEED)
os.environ["TF_DETERMINISTIC_OPS"] = "1"
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2")
warnings.filterwarnings("ignore")

In [5]:
EMO_MODE = "7"
CSV_PATH = "masked_face_dataset.csv"
USE_GENDER = True
USE_MBV2   = True
MBV2_UNFREEZE_FROM = -20

In [6]:
IMG_SIZE = 48
IMG_CH   = 1
EPOCHS_FULL    = 60
EPOCHS_STAGE_A = 40
BATCH_SIZE_CNN  = 64
BATCH_SIZE_MBV2 = 64
MBV2_SIZE = 128

In [7]:
os.makedirs("reports", exist_ok=True)
os.makedirs("figures", exist_ok=True)

In [8]:
PIXELS_EXPECTED = IMG_SIZE * IMG_SIZE
EMO7 = ["angry","disgust","fear","happy","neutral","sad","surprise"]
EMO5 = ["angry","fear","happy","sad","surprise"]

In [9]:
def parse_pixels(pixels: str):
    try:
        arr = np.array([int(p) for p in str(pixels).strip().split()], dtype='uint8')
        if arr.size != PIXELS_EXPECTED: return np.nan
        return arr
    except Exception:
        return np.nan

In [10]:
def normalize_gender(x):
    if pd.isna(x): return np.nan
    s = str(x).strip().lower()
    if s in {"m","male","man","0","male(0)","mask_m"} or s.startswith("m"): return 0
    if s in {"f","female","woman","1","female(1)","mask_f","w","woman(f)"} or s.startswith(("f","w")): return 1
    return np.nan

In [11]:
def normalize_ethnicity(x):
    if pd.isna(x): return np.nan
    s = str(x).strip().lower().replace("-", "_").replace(" ", "_")
    if "asian" in s: return "asian"
    if "black" in s or "african" in s: return "black"
    if "latino" in s or "hispanic" in s: return "latino_hispanic"
    if ("middle" in s and "east" in s) or "middle_eastern" in s: return "middle_eastern"
    if "white" in s: return "white"
    return np.nan

In [12]:
def map_to_7_emotions(x):
    if pd.isna(x): return np.nan
    s = str(x).strip().lower()
    return s if s in EMO7 else np.nan

In [13]:
def map_to_5_emotions(x):
    if pd.isna(x): return np.nan
    s = str(x).strip().lower()
    if s == "disgust": return "angry"
    if s == "neutral": return np.nan
    if s in EMO5: return s
    return np.nan

In [14]:
def report_counts(name, series):
    vc = series.value_counts(dropna=False)
    print(f"\n{name} distribution:"); print(vc.to_string()); return vc

In [15]:
def bar_chart(series, title, out_png, rotate_xticks=True):
    plt.figure(figsize=(7,5)); s = series.sort_values(ascending=False)
    plt.bar(s.index.astype(str), s.values); plt.title(title); plt.ylabel("Count")
    if rotate_xticks: plt.xticks(rotation=30, ha='right')
    plt.grid(axis='y', alpha=0.3); plt.tight_layout()
    plt.savefig(os.path.join("figures", out_png), dpi=160); plt.close()

In [16]:
df_raw = pd.read_csv(CSV_PATH)
print("\nCSV dtypes:"); print(df_raw.dtypes.to_string())
print("\nFirst 5 rows:"); print(df_raw.head(5).to_string())
print(f"\nRows (raw CSV): {len(df_raw):,}")


CSV dtypes:
pixels               object
emotion               int64
emotion_label        object
age                   int64
age_range             int64
gender               object
ethnicity            object
filename             object
split                object
augmentation_type     int64
original_filename    object

First 5 rows:
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        

In [17]:
required_cols = ['pixels', 'emotion_label', 'gender', 'age', 'ethnicity']
missing = [c for c in required_cols if c not in df_raw.columns]
if missing: raise ValueError(f"Missing required columns: {missing}")

In [18]:
df = df_raw.copy()
if EMO_MODE == "7":
    df['emotion_used'] = df['emotion_label'].apply(map_to_7_emotions)
    EMOS = EMO7
else:
    df['emotion_used'] = df['emotion_label'].apply(map_to_5_emotions)
    EMOS = EMO5

In [19]:
dropped = df['emotion_used'].isna().sum()
print(f"\nDropped rows due to neutral/unknown mapping policy: {dropped}")
df = df.dropna(subset=['emotion_used'])


Dropped rows due to neutral/unknown mapping policy: 0


In [20]:
df['pixels'] = df['pixels'].apply(parse_pixels)
bad_pix = df['pixels'].isna().sum()
if bad_pix: print(f"⚠️ Dropping {bad_pix} rows with malformed pixel strings (not {PIXELS_EXPECTED}).")
df = df.dropna(subset=['pixels'])

In [21]:
df['gender'] = df['gender'].apply(normalize_gender)
df['ethnicity_norm'] = df['ethnicity'].apply(normalize_ethnicity)
valid_eth = {'asian','black','latino_hispanic','middle_eastern','white'}
df = df[df['ethnicity_norm'].isin(valid_eth)].copy()

In [22]:
df['age'] = pd.to_numeric(df['age'], errors='coerce')
df = df.dropna(subset=['age']); df['age'] = df['age'].astype(int)
age_bins = [0, 25, 40, 200]
df['age_range3'] = pd.cut(df['age'], bins=age_bins, labels=[0,1,2], right=False)
df = df.dropna(subset=['age_range3']); df['age_range3'] = df['age_range3'].astype(int)

In [23]:
if USE_GENDER:
    before = len(df)
    df = df.dropna(subset=['gender'])
    print(f"Dropped {before - len(df)} rows with unknown gender (USE_GENDER=True).")

Dropped 0 rows with unknown gender (USE_GENDER=True).


In [24]:
raw_emotion_counts = df_raw['emotion_label'].value_counts(dropna=False)
bar_chart(raw_emotion_counts, "Emotion Count (CSV raw)", "csv_raw_emotion_count.png")

In [25]:
print("\nAfter cleaning:")
report_counts("Emotion_used", df['emotion_used'])
report_counts("Gender (0/1)", df['gender'])
report_counts("Ethnicity (5)", df['ethnicity_norm'])


After cleaning:

Emotion_used distribution:
emotion_used
angry       3000
sad         3000
fear        3000
surprise    3000
happy       3000
neutral     3000
disgust     3000

Gender (0/1) distribution:
gender
0    16892
1     4108

Ethnicity (5) distribution:
ethnicity_norm
white              15856
middle_eastern      3066
asian               1323
latino_hispanic      523
black                232


ethnicity_norm
white              15856
middle_eastern      3066
asian               1323
latino_hispanic      523
black                232
Name: count, dtype: int64

In [26]:
emo_to_idx = {e:i for i,e in enumerate(EMOS)}
df['emotion_idx'] = df['emotion_used'].map(emo_to_idx)

In [27]:
from sklearn.preprocessing import LabelEncoder
eth_le = LabelEncoder()
df['ethnicity_label'] = eth_le.fit_transform(df['ethnicity_norm'])

In [28]:
emo_counts = df['emotion_used'].value_counts().reindex(EMOS, fill_value=0)
bar_chart(emo_counts, f"Emotion Count (Processed, {len(EMOS)} classes)", "processed_emotion_count.png")
gender_counts = df['gender'].value_counts()
bar_chart(gender_counts, "Gender Count (Processed)", "processed_gender_count.png", rotate_xticks=False)
age_counts = df['age_range3'].value_counts().sort_index(); age_counts.index = ["0-24","25-39","40+"]
bar_chart(age_counts, "Ethnicity Count (Processed)", "processed_ethnicity_count.png")
eth_counts = df['ethnicity_label'].value_counts().sort_index(); eth_counts.index = eth_le.inverse_transform(eth_counts.index)
bar_chart(eth_counts, "Ethnicity Count (Processed)", "processed_ethnicity_count.png")

In [29]:
print("\n🔒 Group-by-pixel split (hashing pixel bytes)…")
df['pix_hash'] = df['pixels'].apply(lambda a: hashlib.md5(a.tobytes()).hexdigest())
g = (df.groupby('pix_hash')['emotion_idx']
       .agg(lambda xs: pd.Series(xs).value_counts().idxmax())
       .reset_index(name='majority'))


🔒 Group-by-pixel split (hashing pixel bytes)…


In [30]:
g_train, g_tmp = train_test_split(
    g, test_size=0.40, random_state=SEED, stratify=g['majority']
)
g_val, g_test = train_test_split(
    g_tmp, test_size=0.50, random_state=SEED, stratify=g_tmp['majority']
)

In [31]:
H_train, H_val, H_test = set(g_train['pix_hash']), set(g_val['pix_hash']), set(g_test['pix_hash'])
assert not (H_train & H_val or H_train & H_test or H_val & H_test), "Hash leakage across splits!"

In [32]:
m_tr  = df['pix_hash'].isin(H_train).to_numpy()
m_val = df['pix_hash'].isin(H_val).to_numpy()
m_te  = df['pix_hash'].isin(H_test).to_numpy()
print("Sizes (grouped, no leakage):", m_tr.sum(), m_val.sum(), m_te.sum())

Sizes (grouped, no leakage): 12635 4198 4167


In [33]:
X_all  = np.stack(df['pixels'].values).reshape(-1, IMG_SIZE, IMG_SIZE, IMG_CH).astype('float32') / 255.0
yE_int_all = df['emotion_idx'].astype(int).to_numpy()
yE_all = to_categorical(yE_int_all, num_classes=len(EMOS))
yA_all = to_categorical(df['age_range3'])
yT_all = to_categorical(df['ethnicity_label'])
if USE_GENDER: yG_all = df['gender'].astype(int).values

In [34]:
X_tr, X_val, X_te = X_all[m_tr], X_all[m_val], X_all[m_te]
yE_tr, yE_val, yE_te = yE_all[m_tr], yE_all[m_val], yE_all[m_te]
yE_tr_int, yE_val_int, yE_te_int = yE_int_all[m_tr], yE_int_all[m_val], yE_int_all[m_te]
yA_tr, yA_val, yA_te = yA_all[m_tr], yA_all[m_val], yA_all[m_te]
yT_tr, yT_val, yT_te = yT_all[m_tr], yT_all[m_val], yT_all[m_te]
if USE_GENDER: yG_tr, yG_val, yG_te = yG_all[m_tr], yG_all[m_val], yG_all[m_te]

In [35]:
print("\nTensors ready with shapes:")
print("  X_tr:", X_tr.shape, "| X_val:", X_val.shape, "| X_te:", X_te.shape)


Tensors ready with shapes:
  X_tr: (12635, 48, 48, 1) | X_val: (4198, 48, 48, 1) | X_te: (4167, 48, 48, 1)


In [36]:
cw_age     = compute_class_weight("balanced", classes=np.arange(yA_tr.shape[1]), y=np.argmax(yA_tr,1)).astype('float32')
if USE_GENDER:
    cw_gender = compute_class_weight("balanced", classes=np.arange(2), y=yG_tr).astype('float32')

In [37]:
sw_train = {
    'emotion_output':   np.ones(len(yE_tr), dtype='float32'),
    'age_output':       cw_age[np.argmax(yA_tr, axis=1)],
    'ethnicity_output': np.ones(len(yT_tr), dtype='float32'),
}
sw_val  = {k: np.ones(len(X_val), dtype='float32') for k in sw_train.keys()}
sw_test = {k: np.ones(len(X_te),  dtype='float32') for k in sw_train.keys()}
if USE_GENDER:
    sw_train['gender_output'] = cw_gender[yG_tr]
    sw_val['gender_output']   = np.ones(len(X_val), dtype='float32')
    sw_test['gender_output']  = np.ones(len(X_te),  dtype='float32')

In [38]:
AUG = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.08),
    tf.keras.layers.RandomZoom(0.12),
    tf.keras.layers.RandomContrast(0.12),
])

In [39]:
def make_ds(X, yE, yA, yT, yG=None, sw=None, batch=64, shuffle=False):
    n = len(X)
    def gen():
        for i in range(n):
            x = X[i]
            y = {
                'emotion_output':   yE[i].astype('float32'),
                'age_output':       yA[i].astype('float32'),
                'ethnicity_output': yT[i].astype('float32'),
            }
            if yG is not None: y['gender_output'] = np.asarray([yG[i]], dtype=np.float32)
            if sw is None: yield x.astype('float32'), y
            else:
                w = {k: np.float32(sw[k][i]) for k in sw.keys()}
                yield x.astype('float32'), y, w

    sig_x = tf.TensorSpec(shape=(IMG_SIZE, IMG_SIZE, IMG_CH), dtype=tf.float32)
    sig_y = {
        'emotion_output':   tf.TensorSpec(shape=(yE.shape[1],), dtype=tf.float32),
        'age_output':       tf.TensorSpec(shape=(yA.shape[1],), dtype=tf.float32),
        'ethnicity_output': tf.TensorSpec(shape=(yT.shape[1],), dtype=tf.float32),
    }
    if yG is not None: sig_y['gender_output'] = tf.TensorSpec(shape=(1,), dtype=tf.float32)
    if sw is None:
        output_signature = (sig_x, sig_y)
    else:
        sig_w = {k: tf.TensorSpec(shape=(), dtype=tf.float32) for k in sw.keys()}
        output_signature = (sig_x, sig_y, sig_w)
    ds = tf.data.Dataset.from_generator(gen, output_signature=output_signature)
    if shuffle:
        ds = ds.shuffle(buffer_size=min(len(X), 8192), seed=SEED, reshuffle_each_iteration=True)
    ds = ds.batch(batch).prefetch(tf.data.AUTOTUNE)
    if shuffle:
        def _aug(x, *rest):
            return (AUG(x, training=True),) + rest
        ds = ds.map(_aug, num_parallel_calls=tf.data.AUTOTUNE)
    return ds

In [40]:
train_ds = make_ds(X_tr, yE_tr, yA_tr, yT_tr, yG_tr if USE_GENDER else None,
                   sw=sw_train, batch=BATCH_SIZE_CNN, shuffle=True)
val_ds   = make_ds(X_val, yE_val, yA_val, yT_val, yG_val if USE_GENDER else None,
                   sw=sw_val,   batch=BATCH_SIZE_CNN)
test_ds  = make_ds(X_te, yE_te, yA_te, yT_te, yG_te if USE_GENDER else None,
                   sw=sw_test,  batch=BATCH_SIZE_CNN)

In [41]:
def conv_block(x, filters):
    x = Conv2D(filters, (3,3), padding="same", use_bias=False)(x)
    x = BatchNormalization()(x); x = ReLU()(x)
    x = Conv2D(filters, (3,3), padding="same", use_bias=False)(x)
    x = BatchNormalization()(x); x = ReLU()(x)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(0.25)(x)
    return x

In [42]:
inputs = Input(shape=(IMG_SIZE, IMG_SIZE, IMG_CH), name="input_layer")
x = conv_block(inputs, 64)
x = conv_block(x, 128)
x = conv_block(x, 256)

In [43]:
gap = tf.keras.layers.GlobalAveragePooling2D()(x)
ch  = tf.keras.layers.Dense(256, activation="relu")(gap)
ch  = tf.keras.layers.Dense(int(x.shape[-1]), activation="sigmoid")(ch)
ch  = tf.keras.layers.Reshape((1,1,int(x.shape[-1])))(ch)
x   = tf.keras.layers.Multiply()([x, ch])

In [44]:
x = GlobalAveragePooling2D()(x)
x = Dropout(0.30)(x)
shared = Dense(512, activation="relu")(x)
shared = BatchNormalization()(shared)
shared = Dropout(0.30)(shared)

In [45]:
emotion_output   = Dense(len(EMOS),               activation='softmax', name='emotion_output')(shared)
age_output       = Dense(3,                        activation='softmax', name='age_output')(shared)
ethnicity_output = Dense(len(eth_le.classes_),     activation='softmax', name='ethnicity_output')(shared)
outputs = {'emotion_output': emotion_output, 'age_output': age_output, 'ethnicity_output': ethnicity_output}
if USE_GENDER:
    gender_output = Dense(1, activation='sigmoid', name='gender_output')(shared)
    outputs['gender_output'] = gender_output

In [46]:
cnn_model = Model(inputs=inputs, outputs=outputs, name="CNN_MTL")

In [47]:
emo_loss = tf.keras.losses.CategoricalCrossentropy(from_logits=False, label_smoothing=0.0)
age_loss = tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.05)
eth_loss = tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.05)

In [48]:
callbacks_A = [
    ModelCheckpoint("masked_face_multitask_model.keras",
                    monitor='val_emotion_output_accuracy', mode='max',
                    save_best_only=True, verbose=1),
    ReduceLROnPlateau(monitor='val_emotion_output_accuracy', mode='max',
                      factor=0.5, patience=4, min_lr=1e-6, verbose=1),
    CSVLogger("training_log.csv", append=False)
]
callbacks_B = [
    ModelCheckpoint("masked_face_multitask_model.keras",
                    monitor='val_emotion_output_accuracy', mode='max',
                    save_best_only=True, verbose=1),
    ReduceLROnPlateau(monitor='val_emotion_output_accuracy', mode='max',
                      factor=0.5, patience=4, min_lr=1e-6, verbose=1),
    EarlyStopping(monitor='val_emotion_output_accuracy', mode='max',
                  patience=10, min_delta=1e-3, restore_best_weights=True, verbose=1),
    CSVLogger("training_log.csv", append=True)
]

In [49]:
print("\n🔁 CNN — Stage A (emotion-only, 40 epochs)…")
losses = {'emotion_output': emo_loss, 'age_output': age_loss, 'ethnicity_output': eth_loss}
metrics= {'emotion_output': 'accuracy','age_output': 'accuracy','ethnicity_output': 'accuracy'}
loss_w = {'emotion_output': 1.0, 'age_output': 0.0, 'ethnicity_output': 0.0}
if USE_GENDER:
    losses['gender_output'] = 'binary_crossentropy'
    metrics['gender_output'] = 'accuracy'
    loss_w['gender_output'] = 0.0


🔁 CNN — Stage A (emotion-only, 40 epochs)…


In [50]:
cnn_model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
                  loss=losses, loss_weights=loss_w, metrics=metrics)

In [51]:
history_A = cnn_model.fit(
    train_ds, validation_data=val_ds,
    epochs=EPOCHS_STAGE_A, callbacks=callbacks_A, verbose=1
)

Epoch 1/40
    198/Unknown [1m82s[0m 389ms/step - age_output_accuracy: 0.3336 - age_output_loss: 1.4828 - emotion_output_accuracy: 0.1623 - emotion_output_loss: 2.3862 - ethnicity_output_accuracy: 0.2823 - ethnicity_output_loss: 2.4462 - gender_output_accuracy: 0.4455 - gender_output_loss: 0.9316 - loss: 2.3862
Epoch 1: val_emotion_output_accuracy improved from None to 0.13435, saving model to masked_face_multitask_model.keras
[1m198/198[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m88s[0m 418ms/step - age_output_accuracy: 0.3430 - age_output_loss: 1.4746 - emotion_output_accuracy: 0.1618 - emotion_output_loss: 2.2697 - ethnicity_output_accuracy: 0.2864 - ethnicity_output_loss: 2.5160 - gender_output_accuracy: 0.4393 - gender_output_loss: 0.8842 - loss: 2.2811 - val_age_output_accuracy: 0.2477 - val_age_output_loss: 1.5866 - val_emotion_output_accuracy: 0.1343 - val_emotion_output_loss: 1.9796 - val_ethnicity_output_accuracy: 0.7494 - val_ethnicity_output_loss: 1.2368 - val_gender

In [52]:
print("\n🔁 CNN — Stage B (very light multitask, emotion-dominant)…")
loss_w = {'emotion_output': 1.8, 'age_output': 0.05, 'ethnicity_output': 0.10}
if USE_GENDER: loss_w['gender_output'] = 0.05


🔁 CNN — Stage B (very light multitask, emotion-dominant)…


In [53]:
cnn_model.compile(optimizer=tf.keras.optimizers.Adam(2e-4),
                  loss=losses, loss_weights=loss_w, metrics=metrics)

In [54]:
history_B = cnn_model.fit(
    train_ds, validation_data=val_ds,
    epochs=EPOCHS_FULL - EPOCHS_STAGE_A,  # 20
    callbacks=callbacks_B, verbose=1
)

Epoch 1/20
    198/Unknown [1m47s[0m 216ms/step - age_output_accuracy: 0.3589 - age_output_loss: 1.3116 - emotion_output_accuracy: 0.5610 - emotion_output_loss: 1.1905 - ethnicity_output_accuracy: 0.3587 - ethnicity_output_loss: 1.7180 - gender_output_accuracy: 0.5230 - gender_output_loss: 0.8185 - loss: 2.4212
Epoch 1: val_emotion_output_accuracy improved from None to 0.47308, saving model to masked_face_multitask_model.keras
[1m198/198[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 233ms/step - age_output_accuracy: 0.3729 - age_output_loss: 1.2310 - emotion_output_accuracy: 0.5702 - emotion_output_loss: 1.1614 - ethnicity_output_accuracy: 0.4551 - ethnicity_output_loss: 1.4801 - gender_output_accuracy: 0.5433 - gender_output_loss: 0.7571 - loss: 2.3503 - val_age_output_accuracy: 0.3328 - val_age_output_loss: 1.1751 - val_emotion_output_accuracy: 0.4731 - val_emotion_output_loss: 1.4990 - val_ethnicity_output_accuracy: 0.5319 - val_ethnicity_output_loss: 1.3211 - val_gender

In [55]:
cnn_model.save("masked_face_multitask_model_last.keras")

In [56]:
mb_model = None
if USE_MBV2:
    from tensorflow.keras.applications import MobileNetV2
    from tensorflow.keras.applications.mobilenet_v2 import preprocess_input

    def make_mbv2_ds(X, yE_int, batch=BATCH_SIZE_MBV2, shuffle=False):
        n = len(X)
        def gen():
            for i in range(n):
                yield np.asarray(X[i], dtype=np.float32), np.int32(yE_int[i])

        sig_x = tf.TensorSpec(shape=(IMG_SIZE, IMG_SIZE, 1), dtype=tf.float32)
        sig_y = tf.TensorSpec(shape=(), dtype=tf.int32)
        ds = tf.data.Dataset.from_generator(gen, output_signature=(sig_x, sig_y))
        if shuffle:
            ds = ds.shuffle(buffer_size=min(n, 4096), seed=SEED, reshuffle_each_iteration=True)

        def prep(x, y):
            x = tf.image.grayscale_to_rgb(x)
            x = tf.image.resize(x, [MBV2_SIZE, MBV2_SIZE])
            x = x * 255.0
            x = preprocess_input(x)
            return x, y

        ds = ds.map(prep, num_parallel_calls=tf.data.AUTOTUNE)
        ds = ds.batch(batch).prefetch(tf.data.AUTOTUNE)
        return ds

    def build_mbv2_model(n_emotions, unfreeze_from=MBV2_UNFREEZE_FROM, input_size=MBV2_SIZE):
        base = MobileNetV2(include_top=False, input_shape=(input_size, input_size, 3), weights='imagenet')
        for lyr in base.layers:
            lyr.trainable = False
        if isinstance(unfreeze_from, int) and unfreeze_from < 0:
            for lyr in base.layers[unfreeze_from:]:
                if not isinstance(lyr, tf.keras.layers.BatchNormalization):
                    lyr.trainable = True

        inp = Input(shape=(input_size, input_size, 3), name="mbv2_input")
        aug = tf.keras.Sequential([
            tf.keras.layers.RandomFlip("horizontal"),
            tf.keras.layers.RandomRotation(0.04),
            tf.keras.layers.RandomZoom(0.10),
            tf.keras.layers.RandomContrast(0.10),
        ])(inp)
        x = base(aug, training=False)
        x = GlobalAveragePooling2D()(x)
        x = Dropout(0.30)(x)
        out = Dense(n_emotions, activation='softmax', name='emotion_output')(x)
        return Model(inputs=inp, outputs=out, name="MobileNetV2_Emotion")

    train_ds_mbv2 = make_mbv2_ds(X_tr,  yE_tr_int,  shuffle=True)
    val_ds_mbv2   = make_mbv2_ds(X_val, yE_val_int)
    test_ds_mbv2  = make_mbv2_ds(X_te,  yE_te_int)

    mb_model = build_mbv2_model(len(EMOS))
    mb_model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
                     loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    emo_classes = np.arange(len(EMOS))
    emo_weights = compute_class_weight('balanced', classes=emo_classes, y=yE_tr_int)
    emo_weights = {int(c): float(w) for c, w in zip(emo_classes, emo_weights)}

    mb_callbacks = [
        ModelCheckpoint("mbv2_best.keras", monitor='val_accuracy', mode='max', save_best_only=True, verbose=1),
        ReduceLROnPlateau(monitor='val_accuracy', mode='max', factor=0.5, patience=4, min_lr=1e-6, verbose=1),
        EarlyStopping(monitor='val_accuracy', mode='max', patience=8, restore_best_weights=True, verbose=1, min_delta=1e-3),
        CSVLogger("mbv2_training_log.csv", append=False)
    ]

    print("\n🔁 MobileNetV2 — emotion training…")
    mb_history = mb_model.fit(train_ds_mbv2, validation_data=val_ds_mbv2,
                              class_weight=emo_weights,
                              epochs=EPOCHS_FULL, callbacks=mb_callbacks, verbose=1)
    mb_model.save("mbv2_last.keras")


🔁 MobileNetV2 — emotion training…
Epoch 1/60
    198/Unknown [1m37s[0m 162ms/step - accuracy: 0.2186 - loss: 1.9586
Epoch 1: val_accuracy improved from None to 0.28633, saving model to mbv2_best.keras
[1m198/198[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 216ms/step - accuracy: 0.2512 - loss: 1.8552 - val_accuracy: 0.2863 - val_loss: 1.8309 - learning_rate: 1.0000e-04
Epoch 2/60
[1m198/198[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 147ms/step - accuracy: 0.3165 - loss: 1.7267
Epoch 2: val_accuracy did not improve from 0.28633
[1m198/198[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 180ms/step - accuracy: 0.3373 - loss: 1.6889 - val_accuracy: 0.2549 - val_loss: 1.9387 - learning_rate: 1.0000e-04
Epoch 3/60
[1m198/198[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 142ms/step - accuracy: 0.3852 - loss: 1.5890
Epoch 3: val_accuracy improved from 0.28633 to 0.29967, saving model to mbv2_best.keras
[1m198/198[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37

In [57]:
sidecar = {
    "emotion":        EMOS,
    "age":            ["0–24","25–39","40+"],
    "ethnicity":      list(eth_le.classes_),
    "gender":         ["Male","Female"]
}
with open("masked_face_multitask_model.keras.labels.json","w") as f: json.dump(sidecar, f, indent=2)
with open("masked_face_multitask_model_last.keras.labels.json","w") as f: json.dump(sidecar, f, indent=2)

In [58]:
print("\n🎯 CNN Final Test Metrics:")
results = cnn_model.evaluate(test_ds, verbose=1)
for name, val in zip(cnn_model.metrics_names, results):
    print(f"{name}: {float(val):.4f}")


🎯 CNN Final Test Metrics:
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 53ms/step - age_output_accuracy: 0.4795 - age_output_loss: 0.9222 - emotion_output_accuracy: 0.5520 - emotion_output_loss: 1.2458 - ethnicity_output_accuracy: 0.7773 - ethnicity_output_loss: 0.7782 - gender_output_accuracy: 0.8195 - gender_output_loss: 0.4103 - loss: 2.4315
loss: 2.4315
compile_metrics: 1.2458
emotion_output_loss: 0.9222
age_output_loss: 0.7782
ethnicity_output_loss: 0.4103
gender_output_loss: 0.4795


In [78]:
# ---------- PREDICT (head-specific, x-only) ----------
x_only_test = test_ds.map(lambda x, *rest: x)

In [80]:
import tensorflow as tf

# --- Ensure cnn_model is loaded ---
try:
    _ = cnn_model.input  # check if model exists
except Exception:
    cnn_model = tf.keras.models.load_model("masked_face_multitask_model.keras", compile=False)

# --- Create separate heads ---
emotion_head   = tf.keras.Model(cnn_model.input, cnn_model.get_layer('emotion_output').output)
age_head       = tf.keras.Model(cnn_model.input, cnn_model.get_layer('age_output').output)
ethnicity_head = tf.keras.Model(cnn_model.input, cnn_model.get_layer('ethnicity_output').output)
gender_head    = (
    tf.keras.Model(cnn_model.input, cnn_model.get_layer('gender_output').output)
    if USE_GENDER else None
)


In [81]:
emotion_logits   = emotion_head.predict(x_only_test, verbose=0)
age_logits       = age_head.predict(x_only_test, verbose=0)
ethnicity_logits = ethnicity_head.predict(x_only_test, verbose=0)
gender_logits    = gender_head.predict(x_only_test, verbose=0) if gender_head is not None else None

In [62]:
print("logit shapes:",
      "emotion",   getattr(emotion_logits,   "shape", None),
      "age",       getattr(age_logits,       "shape", None),
      "ethnicity", getattr(ethnicity_logits, "shape", None),
      "gender",    getattr(gender_logits,    "shape", None))

logit shapes: emotion (4167, 7) age (4167, 3) ethnicity (4167, 5) gender (4167, 1)


In [63]:
# =========================================================
# Reports — CNN (Emotion/Age/Ethnicity[/Gender])
# =========================================================
def save_cm(yt, yp, labels, title_prefix, out_prefix, normalize=None):
    cm = confusion_matrix(yt, yp, labels=list(range(len(labels))), normalize=normalize)
    plt.figure(figsize=(6,5))
    fmt = '.2f' if normalize else 'd'
    sns.heatmap(cm, annot=True, fmt=fmt, cmap='Blues',
                xticklabels=labels, yticklabels=labels, vmin=0 if normalize else None, vmax=1 if normalize else None)
    title = f"{title_prefix} — Confusion Matrix" + (" (Normalized)" if normalize else " (Counts)")
    plt.title(title); plt.xlabel("Predicted"); plt.ylabel("Actual")
    plt.tight_layout()
    suffix = "normalized" if normalize else "counts"
    plt.savefig(os.path.join("figures", f"{out_prefix}_cm_{suffix}.png"), dpi=160); plt.close()

In [64]:
# Emotion
y_true_emotion = yE_te_int
y_pred_emotion = np.argmax(emotion_logits, axis=1)
save_cm(y_true_emotion, y_pred_emotion, EMOS, "Emotion (CNN)", "emotion", normalize=None)
save_cm(y_true_emotion, y_pred_emotion, EMOS, "Emotion (CNN)", "emotion", normalize='true')
with open(os.path.join("reports", "emotion_classification_report.txt"), "w") as f:
    f.write(classification_report(y_true_emotion, y_pred_emotion, target_names=EMOS, digits=4, zero_division=0))

In [65]:
# Age
y_true_age = np.argmax(yA_te, axis=1)
y_pred_age = np.argmax(age_logits, axis=1)
age_labels = ["0-24","25-39","40+"]
save_cm(y_true_age, y_pred_age, age_labels, "Age (CNN)", "age", normalize=None)
save_cm(y_true_age, y_pred_age, age_labels, "Age (CNN)", "age", normalize='true')
with open(os.path.join("reports", "age_classification_report.txt"), "w") as f:
    f.write(classification_report(y_true_age, y_pred_age, target_names=age_labels, digits=4, zero_division=0))

In [66]:
# Ethnicity
y_true_eth = np.argmax(yT_te, axis=1)
y_pred_eth = np.argmax(ethnicity_logits, axis=1)
eth_labels = list(eth_le.classes_)
save_cm(y_true_eth, y_pred_eth, eth_labels, "Ethnicity (CNN)", "ethnicity", normalize=None)
save_cm(y_true_eth, y_pred_eth, eth_labels, "Ethnicity (CNN)", "ethnicity", normalize='true')
with open(os.path.join("reports", "ethnicity_classification_report.txt"), "w") as f:
    f.write(classification_report(y_true_eth, y_pred_eth, target_names=eth_labels, digits=4, zero_division=0))

In [67]:
# Gender
if USE_GENDER and (gender_logits is not None):
    y_true_gender = yG_te.astype(int)
    y_pred_gender = (gender_logits.ravel() >= 0.5).astype(int)
    save_cm(y_true_gender, y_pred_gender, ["male","female"], "Gender (CNN)", "gender", normalize=None)
    save_cm(y_true_gender, y_pred_gender, ["male","female"], "Gender (CNN)", "gender", normalize='true')
    with open(os.path.join("reports", "gender_classification_report.txt"), "w") as f:
        f.write(classification_report(y_true_gender, y_pred_gender, target_names=["male","female"], digits=4, zero_division=0))

In [68]:
# =========================================================
# Training curves
# =========================================================
def _merge_hist(hA, hB):
    hist = {}
    for k, v in hA.history.items():
        hist[k] = list(v) + list(hB.history.get(k, []))
    for k, v in hB.history.items():
        if k not in hist: hist[k] = list(v)
    return hist

In [69]:
history = _merge_hist(history_A, history_B)

In [70]:
if isinstance(history, dict) and len(history) > 0:
    plt.figure(figsize=(12,6))
    for key in ['emotion_output_accuracy','age_output_accuracy','ethnicity_output_accuracy'] + (['gender_output_accuracy'] if USE_GENDER else []):
        if key in history:
            plt.plot(history[key], label=f"{key.replace('_accuracy','')} Train")
            vkey = f"val_{key}"
            if vkey in history:
                plt.plot(history[vkey], label=f"{key.replace('_accuracy','')} Val", linestyle="--")
    plt.title(f"CNN Accuracy ({len(EMOS)}-class emotion)"); plt.xlabel("Epoch"); plt.ylabel("Accuracy")
    plt.grid(True, alpha=.3); plt.legend(); plt.tight_layout()
    plt.savefig(os.path.join("figures","training_accuracy_curves.png"), dpi=160); plt.close()

    plt.figure(figsize=(12,6))
    for key in ['loss','emotion_output_loss','age_output_loss','ethnicity_output_loss'] + (['gender_output_loss'] if USE_GENDER else []):
        if key in history:
            plt.plot(history[key], label=f"{key} Train")
            vkey = f"val_{key}"
            if vkey in history:
                plt.plot(history[vkey], label=f"{key} Val", linestyle="--")
    plt.title("CNN Loss"); plt.xlabel("Epoch"); plt.ylabel("Loss")
    plt.grid(True, alpha=.3); plt.legend(); plt.tight_layout()
    plt.savefig(os.path.join("figures","training_loss_curves.png"), dpi=160); plt.close()

In [71]:
# =========================================================
# Per-class metrics & MBV2 comparison (if trained)
# =========================================================
def perclass_metrics(y_true, y_pred, labels):
    prec, rec, f1, sup = precision_recall_fscore_support(y_true, y_pred,
                                                         labels=np.arange(len(labels)), average=None, zero_division=0)
    cm = confusion_matrix(y_true, y_pred, labels=np.arange(len(labels)))
    with np.errstate(divide='ignore', invalid='ignore'):
        row_sums = cm.sum(axis=1, keepdims=True)
        acc = np.divide(cm.diagonal()[:, None], row_sums, where=row_sums>0).flatten()
        acc = np.nan_to_num(acc, nan=0.0)
    dfm = pd.DataFrame({"class_id": np.arange(len(labels)),
                        "class_label": labels,
                        "precision": prec, "recall": rec, "f1": f1, "support": sup, "accuracy": acc})
    return dfm

In [72]:
df_cnn = perclass_metrics(y_true_emotion, y_pred_emotion, EMOS)
df_cnn.to_csv(os.path.join("figures","emotion_metrics_cnn.csv"), index=False)

In [73]:
if USE_MBV2 and 'test_ds_mbv2' in locals():
    print("\n MobileNetV2 Test Metrics:")
    mb_results = mb_model.evaluate(test_ds_mbv2, verbose=1)
    for name, val in zip(mb_model.metrics_names, mb_results):
        print(f"{name}: {float(val):.4f}")

    logits_mb = mb_model.predict(test_ds_mbv2, verbose=0)
    y_true_mb = np.array(list(test_ds_mbv2.unbatch().map(lambda x, y: y).as_numpy_iterator()), dtype=np.int32)
    y_pred_mb = np.argmax(logits_mb, axis=1)

    acc_cnn = (y_pred_emotion == y_true_emotion).mean()
    acc_mb  = (y_pred_mb      == y_true_mb).mean()

    x = np.arange(2); w = 0.6
    plt.figure(figsize=(6,4))
    plt.bar(["CNN","MBV2"], [acc_cnn, acc_mb], width=w)
    plt.ylim(0,1.0); plt.ylabel("Emotion Test Accuracy"); plt.title("CNN vs MobileNetV2")
    for i,v in enumerate([acc_cnn, acc_mb]): plt.text(i, min(0.98, v+0.02), f"{v:.3f}", ha='center')
    plt.tight_layout(); plt.savefig(os.path.join("figures","compare_test_accuracy.png"), dpi=160); plt.close()


 MobileNetV2 Test Metrics:
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 123ms/step - accuracy: 0.3602 - loss: 1.7369
loss: 1.7369
compile_metrics: 0.3602


In [74]:
print("\n✅ Done.")
print("Saved:")
print(" • CNN best model: masked_face_multitask_model.keras")
print(" • CNN last model: masked_face_multitask_model_last.keras")
if USE_MBV2:
    print(" • MBV2 last model: mbv2_last.keras")
print(" • Sidecar labels: masked_face_multitask_model*.labels.json")
print(" • Reports: training_log.csv, reports/*_classification_report.txt")
print(" • Figures: training_accuracy_curves.png, training_loss_curves.png")
print("            csv_raw_emotion_count.png, processed_emotion_count.png")
print("            processed_gender_count.png, processed_age_count.png, processed_ethnicity_count.png")
print("            emotion/age/ethnicity[/gender] cm_*.png and normalized cm_*.png")
print("            compare_test_accuracy.png (if MBV2 trained)")


✅ Done.
Saved:
 • CNN best model: masked_face_multitask_model.keras
 • CNN last model: masked_face_multitask_model_last.keras
 • MBV2 last model: mbv2_last.keras
 • Sidecar labels: masked_face_multitask_model*.labels.json
 • Reports: training_log.csv, reports/*_classification_report.txt
 • Figures: training_accuracy_curves.png, training_loss_curves.png
            csv_raw_emotion_count.png, processed_emotion_count.png
            processed_gender_count.png, processed_age_count.png, processed_ethnicity_count.png
            emotion/age/ethnicity[/gender] cm_*.png and normalized cm_*.png
            compare_test_accuracy.png (if MBV2 trained)


In [75]:
# --- graceful teardown to avoid AtomicFunction.__del__ noise ---
try:
    del emotion_head, age_head, ethnicity_head
except Exception:
    pass
try:
    del gender_head
except Exception:
    pass
try:
    del cnn_model
except Exception:
    pass
try:
    del mb_model
except Exception:
    pass

In [76]:
import gc
tf.keras.backend.clear_session()
gc.collect()

0

In [77]:
import matplotlib.pyplot as plt
plt.close('all')