## Install **Dependencies**

In [1]:
## Download packages ##
# -q means "quiet", which suppresses output
%pip install -q opencv-python # Img. proc and video
%pip install -q tensorflow    # Machine learning library
%pip install -q pandas        # Dataframe/tables
%pip install -q matplotlib    # Plotting
%pip install -q openpyxl      # Excel support

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


Import Packages

In [2]:
## Machine leraning imports ##

# Language of Gods
import tensorflow as tf
# high-level API within TensorFlow that makes it easier to define and work with neural networks
import tensorflow.keras as keras # type: ignore
# This provides tools for creating different types of neural network models, like Sequential and Model
from tensorflow.keras.models import Sequential, Model # type: ignore
# Contains functions for preprocessing images, such as resizing, data augmentation, etc
from tensorflow.keras.preprocessing.image import ImageDataGenerator # type: ignore
# Building blocks for creating layers within neural networks
from tensorflow.keras.layers import Concatenate, LSTM, GlobalAveragePooling2D, MaxPooling2D, BatchNormalization, Conv2D, Conv3D, Dense, Dropout, Flatten, TimeDistributed # type: ignore
# Provides algorithms for optimizing the training process of a model
from tensorflow.keras.optimizers import Adam # type: ignore
# Includes functions to evaluate model performance
from tensorflow.keras.metrics import categorical_crossentropy # type: ignore
# Provides tools to prevent overfitting during training
from tensorflow.keras import regularizers, layers, Input, Model # type: ignore
# Slap categories from codes onto data
from tensorflow.keras.utils import to_categorical # type: ignore
# Optimize learning
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping # type: ignore
# This provides tools for splitting data into training and testing sets.
from sklearn.model_selection import train_test_split


## Misc imports ##
# Image loading or something
import glob
# Importing images from local directory
from pathlib import Path
# Creating directories or checking file paths.cnn
import os
# Number operations
import numpy as np
# Image read helper. OpenCV could also be used.
from PIL import Image
# Plot within the Colab notebook
%matplotlib inline
# Carl-related stuffs
import cv2 as cv
# Dataframes for adding classification labels and reading Excel (.csv) datasets
import pandas as pd

print("All Imports Successful")

2025-04-23 22:12:20.154513: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-23 22:12:20.156652: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-23 22:12:20.188038: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-23 22:12:20.188983: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


All Imports Successful


## Import **Datasets**

**Datasets used**
1. MMEW, CASME II, SAMM (in progres of obtaining)
2. SAMMv2 (ripped from Kaggle, stored in mt9485/Datasets)


In [3]:
# Create class holding dataset
class Dataset:
    def __init__(self, root_dir, label_file, target_size=(128, 128),
                 sequence_length=50, skiprows=0, dataframe=None, augment=False):
        self.root_dir = root_dir
        self.target_size = target_size
        self.sequence_length = sequence_length
        self.augment = augment

        # Load and clean the label file
        if dataframe is not None:
            self.labels_df = dataframe.reset_index(drop=True)
        else:
            # Read from Excel
            self.labels_df = pd.read_excel(label_file, skiprows=skiprows)
            self.labels_df = self.labels_df.dropna(subset=["Filename", "Onset Frame", "Offset Frame", "Objective Classes"])
            self.labels_df["Subject"] = self.labels_df["Subject"].astype(str).str.zfill(3)

    def __len__(self):
        return len(self.labels_df)

    def _load_sequence(self, subject_id, clip_name, onset, offset):
        clip_path = os.path.join(self.root_dir, subject_id, clip_name)

        if not os.path.isdir(clip_path):
            raise ValueError(f"Clip folder does not exist: {clip_path}")

        frames = []

        for frame_num in range(onset, offset + 1):
            frame_str = str(frame_num).zfill(5)
            
            # Match any file ending in the frame number (4 or 5 digits)
            pattern_1 = os.path.join(clip_path, f"{clip_name}_*{frame_num}.jpg")
            pattern_2 = os.path.join(clip_path, f"{subject_id}_*{frame_num}.jpg")
            
            matches = glob.glob(pattern_1) + glob.glob(pattern_2)
            if not matches:
                continue  # No match found for this frame

            # Use first matching file (usually only one)
            img_path = matches[0]

            try:
                img = Image.open(img_path).convert("L")
                img = img.resize(self.target_size)
                img = np.array(img).astype(np.uint8)

                if self.augment:
                    img = self.apply_augmentation(img)

                img = img.astype('float32') / 255.0
                
                frames.append(img)
            except Exception as e:
                print(f"[!] Error loading image {img_path}: {e}")
                continue

        if not frames:
            raise ValueError(f"No valid frames in: {clip_path} for range {onset}-{offset}")

        sequence = np.stack(frames, axis=0)
        sequence = self._pad_or_truncate(sequence)

        return sequence[..., np.newaxis]  # (frames, H, W, 1)


    def _pad_or_truncate(self, sequence):
        num_frames = sequence.shape[0]
        if num_frames == self.sequence_length:
            return sequence
        elif num_frames < self.sequence_length:
            pad_len = self.sequence_length - num_frames
            pad = np.zeros((pad_len, *sequence.shape[1:]), dtype=sequence.dtype)
            return np.concatenate([sequence, pad], axis=0)
        else:
            return sequence[:self.sequence_length]

    def get_dataset(self):
        sequences = []
        labels = []

        for idx, row in self.labels_df.iterrows():
            try:
                subject = row["Subject"]
                filename = row["Filename"]
                onset = int(row["Onset Frame"])
                offset = int(row["Offset Frame"])
                label = label = int(row["Objective Classes"]) - 1  # Shift 1–7 → 0–6

                sequence = self._load_sequence(subject, filename, onset, offset)
                sequences.append(sequence)
                labels.append(label)
            except Exception as e:
                print(f"[!] Skipped row {idx} due to error: {e}")

        x = np.array(sequences)
        y = to_categorical(labels, num_classes=7)
        
        # print(f"Min label: {min(labels)}, Max label: {max(labels)}")

        return x, y
    
    @staticmethod
    def split_face_regions(sequence_batch):
        """
        Takes in a batch of image sequences 
        and returns two batches:
        - eye_seq: top half (N, T, 64, 128, 1)
        - mouth_seq: bottom half (N, T, 64, 128, 1)
        - N: Number of sequences
        - T: Number of frame in each sequence
        - H: Original height of each frame
        - W: Original width of each frame
        - C: Number of channels
        """
        top_seqs = []
        bottom_seqs = []

        for seq in sequence_batch:
            top_seq = []
            bottom_seq = []

            for frame in seq:
                frame = frame.squeeze()  # (H, W)
                h, w = frame.shape
                
                # Ensure height is even before splitting
                if h % 2 != 0:
                    frame = frame[:h-1, :]  # Crop one row if height is odd
                    h -= 1

                top = frame[:h//2, :]      # (H/2, W)
                bottom = frame[h//2:, :]   # (H/2, W)

                top_seq.append(top)
                bottom_seq.append(bottom)

            # Add channel dim back
            top_seq = np.expand_dims(np.array(top_seq), -1)
            bottom_seq = np.expand_dims(np.array(bottom_seq), -1)

            top_seqs.append(top_seq)
            bottom_seqs.append(bottom_seq)

        return np.array(top_seqs), np.array(bottom_seqs)
    
    @staticmethod
    def apply_augmentation(img):
        """Apply random augmentations to a grayscale image"""
        # Horizontal flip (50%)
        if np.random.rand() < 0.5:
            img = np.flip(img, axis=1)

        # Random brightness & contrast
        alpha = np.random.uniform(0.9, 1.1)
        beta = np.random.uniform(-10, 10)
        img = np.clip(alpha * img + beta, 0, 255)

        # Gaussian noise (very light)
        if np.random.rand() < 0.25:
            noise = np.random.normal(0, 3, img.shape)
            img = np.clip(img + noise, 0, 255)

        # Slight zoom (random crop & resize)
        if np.random.rand() < 0.25:
            h, w = img.shape
            zoom_factor = np.random.uniform(1.0, 1.05)
            zh, zw = int(h / zoom_factor), int(w / zoom_factor)
            top = (h - zh) // 2
            left = (w - zw) // 2
            cropped = img[top:top+zh, left:left+zw]
            img = cv.resize(cropped, (w, h))

        return img.astype(np.uint8)


In [4]:
## Load SAMM Dataset ##

# Load the full label file
full_df = pd.read_excel("./SAMM_Micro_FACS_Codes_v2.xlsx", skiprows=13)
full_df = full_df.dropna(subset=["Filename", "Onset Frame", "Offset Frame", "Objective Classes"])
full_df["Subject"] = full_df["Subject"].astype(str).str.zfill(3)

# Split training and val into separate pieces; one to augment, other not
train_df, val_df = train_test_split(
    full_df,
    test_size=0.2,
    stratify=full_df["Objective Classes"],
    random_state=42
)

# Local dataset path
local_dataset_path = "./SAMMv2"

# Instantiate dataset 
train_dataset = Dataset(
    root_dir=local_dataset_path,
    label_file="./SAMM_Micro_FACS_Codes_v2.xlsx",  # Still needed in case dataframe is None
    target_size=(128, 128),
    sequence_length=50,
    dataframe=train_df,
    augment=True
)

val_dataset = Dataset(
    root_dir=local_dataset_path,
    label_file="./SAMM_Micro_FACS_Codes_v2.xlsx",
    target_size=(128, 128),
    sequence_length=50,
    dataframe=val_df,
    augment=False
)

# Load the data (80% train, 20% val)
SAMMv2_x_train, SAMMv2_y_train = train_dataset.get_dataset()
SAMMv2_x_val, SAMMv2_y_val = val_dataset.get_dataset()

# For your model
SAMMv2_input_shape = (50, 128, 128, 1)
SAMMv2_num_classes = 7


## Build Model **Architecture**

In [5]:
# Inception Block
class InceptionBlock(tf.keras.layers.Layer):
    def __init__(self, num_channels=64):
        super().__init__()

        self.conv1x3 = Conv2D(num_channels, kernel_size=(1,3), padding='same', activation='relu')
        self.conv3x1 = Conv2D(num_channels, kernel_size=(3,1), padding='same', activation='relu')
        self.conv3x3 = Conv2D(num_channels, kernel_size=(3), padding='same', activation='relu')
        self.conv1x1 = Conv2D(num_channels, kernel_size=(1), padding='same', activation='relu')
        self.maxpool = MaxPooling2D(pool_size=(3,3), strides=1, padding='same')


    def call(self, x):
      # path a
        a = self.conv1x1(x)
        a = self.conv3x3(a)
      ## path a_1
        a_1 = self.conv1x3(a)
      ## path a_2
        a_2 = self.conv3x1(a)

      # path b
        b = self.conv1x1(x)
      ## path b_1
        b_1 = self.conv1x3(b)
      ## path b_2
        b_2 = self.conv3x1(b)

      # path c
        c = self.maxpool(x)
        c = self.conv1x1(c)

      # path d
        d = self.conv1x1(x)

        return tf.keras.layers.concatenate([a, a_1, a_2, b, b_1, b_2, c, d])
    

In [15]:
class CNNBlock(tf.keras.layers.Layer):
    def __init__(self, num_channels=64):
        super().__init__()
        self.model = tf.keras.Sequential([
            InceptionBlock(num_channels),
            Conv2D(num_channels, kernel_size=3, padding='same', activation='relu', kernel_regularizer=tf.keras.regularizers.l2(1e-4)),
            BatchNormalization(),
            Dropout(0.2),

            Conv2D(num_channels, kernel_size=3, padding='same', activation='relu', kernel_regularizer=tf.keras.regularizers.l2(1e-4)),
            BatchNormalization(),
            Dropout(0.1),
        ])

    def call(self, x):
        return self.model(x)


In [34]:
# Top and Bottom half branches. To Process eye and mouth data separately.
class CNN_LSTM_Branch(tf.keras.Model):
    def __init__(self, num_channels=64, lstm_units=128, name="cnn_lstm_branch"):
        super().__init__(name=name)
        
        # TimeDistributed applies CNNBlock to each frame
        self.frames = TimeDistributed(CNNBlock(num_channels))
        
        # Flatten output for dense layer
        self.flatten = Flatten()
        
        # Apply LSTM
        self.lstm = tf.keras.layers.ConvLSTM2D(lstm_units, kernel_size=(3), padding='same')

        
    def call(self, x):
        x = self.frames(x)  # Process frames
        x = self.lstm(x) # Flatten output
        x = self.flatten(x)    # Apply LSTSM
        return x

Finalize layers

In [35]:
## Instantiate and combine branches ##
# Inputs (#frames, img_width, img_height, channels)
eye_input = tf.keras.Input(shape=(50, 64, 128, 1), name="eye_input")
mouth_input = tf.keras.Input(shape=(50, 64, 128, 1), name="mouth_input")

# Top and Bottom branch processes eyes (top) and mouth (bottom), halves of the image
#  Need to be named uniquely, or else gets angry
eye_branch = CNN_LSTM_Branch(name="eye_branch")
mouth_branch = CNN_LSTM_Branch(name="mouth_branch")

# Call the model on the inputs — this builds the graph
eye_output = eye_branch(eye_input)
mouth_output = mouth_branch(mouth_input)



## Dense layers for classification ##
concatination = tf.keras.layers.Concatenate()([eye_output, mouth_output])

x = Flatten()(concatination)

x = Dense(128, activation='relu')(concatination)
x = Dropout(0.4)(x)
x = Dense(64, activation='relu')(x)
x = Dropout(0.4)(x)
x = Dense(32, activation='relu')(x)

# Classification - 7 classes = 7 emotions
cnnlstm_output = tf.keras.layers.Dense(7, activation='softmax')(x)


## **Construct and Test** Model Architectures

In [9]:
## Split the data ## 
SAMMv2_eye_train, SAMMv2_mouth_train = Dataset.split_face_regions(SAMMv2_x_train)
SAMMv2_eye_val, SAMMv2_mouth_val = Dataset.split_face_regions(SAMMv2_x_val)


In [36]:
# Store model into a variable
cnnlstm = tf.keras.Model(inputs=[eye_input,mouth_input], outputs=cnnlstm_output)

cnnlstm.compile(
    optimizer='adam',
    loss='categorical_crossentropy',   # not 'sparse' bcs using one-hot encoding
    metrics=['accuracy']
)

callbacks_cnnlstm = [
    # Stops if the loss function has 3 consequtive dips in performance
    EarlyStopping(patience=3, restore_best_weights=True),
    # Saves the best model after training
    ModelCheckpoint("cnnlstm_best_model.h5", save_best_only=True)
]

cnnlstm.summary()

print(SAMMv2_eye_train.shape)

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 eye_input (InputLayer)         [(None, 50, 64, 128  0           []                               
                                , 1)]                                                             
                                                                                                  
 mouth_input (InputLayer)       [(None, 50, 64, 128  0           []                               
                                , 1)]                                                             
                                                                                                  
 eye_branch (CNN_LSTM_Branch)   (None, 1048576)      1279424     ['eye_input[0][0]']              
                                                                                            

In [None]:
## Training the Model ##
cnnlstm.fit(
    [SAMMv2_eye_train, SAMMv2_mouth_train], SAMMv2_y_train,
    validation_data=([SAMMv2_eye_val, SAMMv2_mouth_val], SAMMv2_y_val),
    epochs=50,
    batch_size=16,
    callbacks=callbacks_cnnlstm
)


Epoch 1/50


Epoch 2/50
Epoch 3/50
1/8 [==>...........................] - ETA: 2:39 - loss: 126.2550 - accuracy: 0.2500