In [48]:
import numpy as np
import pandas as pd
import os
import cv2
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Flatten, Dense, Dropout, concatenate, add
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import EarlyStopping

In [2]:
# Load OpenCV Haar Cascade classifier for face detection
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

In [3]:
# Data Loading and Preprocessing from Excel
base_dataset_path = "E:/FacialMicroExpression/data"
excel_file_path = "Section A.xls"
output_size = (112, 112)
motion_threshold = 1e-3

# Read the Excel file to get the emotion labels for each sub folder
def load_data_from_excel(excel_file_path):
    """Reads the Excel file containing filenames and labels."""
    data = pd.read_excel(excel_file_path)
    return data
print("Excel columns:", load_data_from_excel(excel_file_path).columns)

Excel columns: Index(['Subject', 'Filename', 'Unnamed: 2', 'OnsetF', 'ApexF1', 'ApexF2',
       'OffsetF', 'Unnamed: 7', 'Onset', 'Total', 'AU', 'Emotion'],
      dtype='object')


In [4]:
# Data cleaning and normalization
# Path to the Excel file
excel_file_path = "Section A.xls"

# Load the Excel file
data = pd.read_excel(excel_file_path)

# Display the original DataFrame for reference
print("Original Data:")
print(data.head())

# Data cleaning and normalization
# Normalize columns: Subject, Filename, and Emotion
data['Subject'] = data['Subject'].astype(str).str.strip().str.lower()  # Normalize Subject column
data['Filename'] = data['Filename'].astype(str).str.strip().str.lower()  # Normalize Filename column
data['Emotion'] = data['Emotion'].astype(str).str.strip().str.lower()  # Normalize Emotion column

# Handle missing values in critical columns
data.dropna(subset=['Subject', 'Filename', 'Emotion'], inplace=True)  # Drop rows with missing values in these columns

# Remove duplicates if any (considering all columns for uniqueness)
data.drop_duplicates(inplace=True)

# Display the cleaned DataFrame
print("\nCleaned Data:")
print(data.head())

# Display unique emotions for verification
print("\nUnique Emotions:")
print(data['Emotion'].unique())

# Display the count of unique subjects and filenames for debugging
print("\nUnique Subjects:")
print(data['Subject'].nunique())
print("\nUnique Filenames:")
print(data['Filename'].nunique())
data = data.drop(columns=['Unnamed: 2', 'Unnamed: 7'], errors='ignore')
data.head()

Original Data:
   Subject Filename  Unnamed: 2  OnsetF  ApexF1 ApexF2 OffsetF  Unnamed: 7  \
0        1  EP01_12         NaN      73      81      \      91         NaN   
1        1  EP01_12         NaN     163     169      \     177         NaN   
2        1   EP01_5         NaN     113     121    125     133         NaN   
3        1   EP01_8         NaN      67      75      \      81         NaN   
4        1   EP03_1         NaN      79      91     95     105         NaN   

        Onset       Total  AU     Emotion  
0  150.000000  316.666667   4       tense  
1  116.666667         250   4       tense  
2  150.000000         350  12   happiness  
3  150.000000         250  14  repression  
4  216.666667         450  17  repression  

Cleaned Data:
  Subject Filename  Unnamed: 2  OnsetF  ApexF1 ApexF2 OffsetF  Unnamed: 7  \
0       1  ep01_12         NaN      73      81      \      91         NaN   
1       1  ep01_12         NaN     163     169      \     177         NaN   
2     

Unnamed: 0,Subject,Filename,OnsetF,ApexF1,ApexF2,OffsetF,Onset,Total,AU,Emotion
0,1,ep01_12,73,81,\,91,150.0,316.666667,4,tense
1,1,ep01_12,163,169,\,177,116.666667,250.0,4,tense
2,1,ep01_5,113,121,125,133,150.0,350.0,12,happiness
3,1,ep01_8,67,75,\,81,150.0,250.0,14,repression
4,1,ep03_1,79,91,95,105,216.666667,450.0,17,repression


In [5]:
def load_images_from_EP_folders(base_dataset_path, data):
    image_data = []
    image_labels = []
    unmatched_folders = []

    for subfolder in os.listdir(base_dataset_path):
        subfolder_path = os.path.join(base_dataset_path, subfolder)

        # Format the subject (e.g., 1 -> sub01, 2 -> sub02)
        subject_id = subfolder.strip().lower().replace("sub", "")  # Remove 'sub' and match
        matching_subjects = data[data['Subject'].astype(str).str.zfill(2) == subject_id]

        if not matching_subjects.empty:
            for ep_folder in os.listdir(subfolder_path):
                ep_folder_path = os.path.join(subfolder_path, ep_folder)

                # Match ep_folder with Filename column in Excel (normalize case)
                matching_filenames = matching_subjects[matching_subjects['Filename'].str.strip().str.lower() == ep_folder.strip().lower()]

                if not matching_filenames.empty and os.path.isdir(ep_folder_path):
                    for file_name in os.listdir(ep_folder_path):
                        if file_name.endswith(('.jpg', '.jpeg', '.png')):
                            image_path = os.path.join(ep_folder_path, file_name)

                            image = cv2.imread(image_path)
                            if image is not None:
                                resized_image = cv2.resize(image, (112, 112))
                                image_data.append(resized_image)

                                # Extract emotion label from matching row in Excel
                                label = matching_filenames['Emotion'].values[0]
                                image_labels.append(label)
                else:
                    unmatched_folders.append(ep_folder)
        else:
            unmatched_folders.append(subfolder)

    
    print(f"Unique labels: {set(image_labels)}")

    return image_data, image_labels

In [6]:
image_data, image_labels = load_images_from_EP_folders(base_dataset_path, data)
for idx, image in enumerate(image_data[:5]):  # Check first 5 images
    print(f"Image {idx + 1} shape: {image.shape}")
unique_labels = data['Emotion'].unique()
print(f"Unique labels in the dataset: {unique_labels}")

Unique labels: {'sadness', 'surprise', 'comtempt', 'disgust', 'tense', 'happiness', 'fear', 'repression'}
Image 1 shape: (112, 112, 3)
Image 2 shape: (112, 112, 3)
Image 3 shape: (112, 112, 3)
Image 4 shape: (112, 112, 3)
Image 5 shape: (112, 112, 3)
Unique labels in the dataset: ['tense' 'happiness' 'repression' 'disgust' 'surprise' 'comtempt' 'fear'
 'sadness']


In [7]:
for i, (image, label) in enumerate(zip(image_data[:5], image_labels[:5])):  # First 5 pairs
    print(f"Image {i + 1}: Label = {label}")

Image 1: Label = tense
Image 2: Label = tense
Image 3: Label = tense
Image 4: Label = tense
Image 5: Label = tense


In [13]:
def get_dynamic_image(frames, normalized=True):
    """ Takes a list of frames and returns either a raw or normalized dynamic image."""
    num_channels = frames[0].shape[2]
    channel_frames = _get_channel_frames(frames, num_channels)
    channel_dynamic_images = [_compute_dynamic_image(channel) for channel in channel_frames]

    dynamic_image = cv2.merge(tuple(channel_dynamic_images))
    if normalized:
        dynamic_image = cv2.normalize(dynamic_image, None, 0, 255, norm_type=cv2.NORM_MINMAX)
        dynamic_image = dynamic_image.astype('uint8')

    return dynamic_image


def _get_channel_frames(iter_frames, num_channels):
    """ Takes a list of frames and returns a list of frame lists split by channel. """
    frames = [[] for channel in range(num_channels)]

    for frame in iter_frames:
        for channel_frames, channel in zip(frames, cv2.split(frame)):
            channel_frames.append(channel.reshape((*channel.shape[0:2], 1)))
    for i in range(len(frames)):
        frames[i] = np.array(frames[i])
    return frames


def _compute_dynamic_image(frames):
    num_frames, h, w, depth = frames.shape

    y = np.zeros((num_frames, h, w, depth))

    ids = np.ones(num_frames)

    fw = np.zeros(num_frames)
    for n in range(num_frames):
        cumulative_indices = np.array(range(n, num_frames)) + 1
        fw[n] = np.sum(((2*cumulative_indices) - num_frames) / (cumulative_indices+ 1e-8))

    for v in range(int(np.max(ids))):
        indv = np.array(np.where(ids == v+1))

        a1 = frames[indv, :, :, :]
        a2 = np.reshape(fw, (indv.shape[1], 1, 1, 1))
        a3 = a1 * a2
        print(indv.shape[1])

        y = np.sum(a3[0], axis=0)
        print(y.shape)

    return y

In [14]:
def load_videos(base_dataset_path, data, output_size=(112, 112)):
    video_data = []
    video_labels = []

    # Iterate through subject folders (sub01, sub02, ...)
    for subject_folder in os.listdir(base_dataset_path):
        subject_path = os.path.join(base_dataset_path, subject_folder)

        # Check if it's a valid directory (e.g., sub01, sub02)
        if os.path.isdir(subject_path):
            # Match subject with Excel data (e.g., sub01 with Subject 1)
            matching_subjects = data[
                data['Subject'].astype(str).str.strip().str.lower() == subject_folder.strip().lower().replace('sub', '')
            ]
            label = matching_subjects['Emotion'].values[0] if not matching_subjects.empty else None

            # Process video files inside the subject folder
            for file_name in os.listdir(subject_path):
                if file_name.endswith('.avi'):  # Look for .avi video files
                    video_filename = file_name.split('.')[0]  # Get EPXXXX part of the filename (e.g., EP01)

                    # Match video filename with Excel 'Filename' column
                    matching_video = matching_subjects[matching_subjects['Filename'].str.strip().str.lower() == video_filename.strip().lower()]
                    if not matching_video.empty:
                        video_path = os.path.join(subject_path, file_name)
                        cap = cv2.VideoCapture(video_path)
                        frames = []

                        # Extract frames from the video
                        while cap.isOpened():
                            ret, frame = cap.read()
                            if not ret:
                                break
                            resized_frame = cv2.resize(frame, output_size)  # Resize the frame
                            frames.append(resized_frame)
                        cap.release()

                        # Generate dynamic image from frames (optional)
                        if len(frames) > 0:
                            dynamic_image = get_dynamic_image(frames)
                            video_data.append(dynamic_image)
                            video_labels.append(label)

    return video_data, video_labels

In [15]:
video_data, video_labels = load_videos(base_dataset_path, data)
print(f"Unique labels: {set(video_labels)}")

168
(112, 112, 1)
168
(112, 112, 1)
168
(112, 112, 1)
94
(112, 112, 1)
94
(112, 112, 1)
94
(112, 112, 1)
93
(112, 112, 1)
93
(112, 112, 1)
93
(112, 112, 1)
76
(112, 112, 1)
76
(112, 112, 1)
76
(112, 112, 1)
141
(112, 112, 1)
141
(112, 112, 1)
141
(112, 112, 1)
172
(112, 112, 1)
172
(112, 112, 1)
172
(112, 112, 1)
131
(112, 112, 1)
131
(112, 112, 1)
131
(112, 112, 1)
204
(112, 112, 1)
204
(112, 112, 1)
204
(112, 112, 1)
141
(112, 112, 1)
141
(112, 112, 1)
141
(112, 112, 1)
178
(112, 112, 1)
178
(112, 112, 1)
178
(112, 112, 1)
295
(112, 112, 1)
295
(112, 112, 1)
295
(112, 112, 1)
255
(112, 112, 1)
255
(112, 112, 1)
255
(112, 112, 1)
128
(112, 112, 1)
128
(112, 112, 1)
128
(112, 112, 1)
199
(112, 112, 1)
199
(112, 112, 1)
199
(112, 112, 1)
348
(112, 112, 1)
348
(112, 112, 1)
348
(112, 112, 1)
174
(112, 112, 1)
174
(112, 112, 1)
174
(112, 112, 1)
164
(112, 112, 1)
164
(112, 112, 1)
164
(112, 112, 1)
302
(112, 112, 1)
302
(112, 112, 1)
302
(112, 112, 1)
447
(112, 112, 1)
447
(112, 112, 1)
4

In [16]:
# Preprocess Data
def preprocess_data(X, y):
    # Convert list of images to numpy array
    X = np.array(X, dtype='float32')
    
    # Normalize pixel values to be between 0 and 1
    X = X / 255.0
    
    # Resize images if needed (for example, to 112 x 112)
    output_size = (112, 112)  # Example target size
    X = X.reshape(-1, output_size[0], output_size[1], 3)  # 3 channels for RGB
    
    # Encode labels
    encoder = LabelEncoder()
    y = encoder.fit_transform(y)  # Transform the labels into numeric values
    
    return X, y
# Preprocess Data
X_images, y_images = preprocess_data(image_data, image_labels)
X_videos, y_videos = preprocess_data(video_data, video_labels)
# Combine image and video data
X = np.concatenate([X_images, X_videos])
y = np.concatenate([y_images, y_videos])

In [17]:
# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Ensure labels are one-hot encoded
num_classes = len(set(y))  # Dynamically infer number of classes
y_train_onehot = to_categorical(y_train, num_classes=num_classes)
y_test_onehot = to_categorical(y_test, num_classes=num_classes)

In [35]:
def LearNet_Modelbuild(height=112, width=112, channels=3, classes=8):
    im = Input(shape=(height, width, channels))
    Conv_S = Conv2D(16, (3, 3), activation='relu', padding='same', strides=2, name='Conv_S')(im)

    Conv_1_1 = Conv2D(16, (1, 1), activation='relu', padding='same', strides=2, name='Conv_1_1')(Conv_S)
    Conv_1_2 = Conv2D(32, (3, 3), activation='relu', padding='same', strides=2, name='Conv_1_2')(Conv_1_1)
    Conv_1_3 = Conv2D(64, (5, 5), activation='relu', padding='same', strides=2, name='Conv_1_3')(Conv_1_2)

    Conv_2_1 = Conv2D(16, (1, 1), activation='relu', padding='same', strides=2, name='Conv_2_1')(Conv_S)
    add_2_1 = add([Conv_1_1, Conv_2_1])
    batch_r11 = BatchNormalization()(add_2_1)
    Conv_2_2 = Conv2D(32, (3, 3), activation='relu', padding='same', strides=2, name='Conv_2_2')(batch_r11)
    add_2_2 = add([Conv_1_2, Conv_2_2])
    batch_r12 = BatchNormalization()(add_2_2)
    Conv_x_2 = Conv2D(64, (5, 5), activation='relu', padding='same', strides=2, name='Conv_x_2')(batch_r12)

    Conv_3_1 = Conv2D(16, (1, 1), activation='relu', padding='same', strides=2, name='Conv_3_1')(Conv_S)
    Conv_3_2 = Conv2D(32, (3, 3), activation='relu', padding='same', strides=2, name='Conv_3_2')(Conv_3_1)
    Conv_3_3 = Conv2D(64, (5, 5), activation='relu', padding='same', strides=2, name='Conv_3_3')(Conv_3_2)

    Conv_4_1 = Conv2D(16, (1, 1), activation='relu', padding='same', strides=2, name='Conv_4_1')(Conv_S)
    add_4_1 = add([Conv_3_1, Conv_4_1])
    batch_r13 = BatchNormalization()(add_4_1)
    Conv_4_2 = Conv2D(32, (3, 3), activation='relu', padding='same', strides=2, name='Conv_4_2')(batch_r13)
    add_4_2 = add([Conv_3_2, Conv_4_2])
    batch_r14 = BatchNormalization()(add_4_2)
    Conv_x_4 = Conv2D(64, (5, 5), activation='relu', padding='same', strides=2, name='Conv_x_4')(batch_r14)

    concta1 = concatenate([Conv_1_3, Conv_x_2, Conv_3_3, Conv_x_4])
    batch_X = BatchNormalization()(concta1)

    Conv_5_1 = Conv2D(256, (3, 3), activation='relu', padding='same', strides=2, name='Conv_5_1')(batch_X)

    F1 = Flatten()(Conv_5_1)
    FC1 = Dense(256, activation='relu')(F1)
    drop = Dropout(0.5)(FC1)

    out = Dense(classes, activation='softmax')(drop)

    model = Model(inputs=[im], outputs=out)
    model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [36]:
lear_net = LearNet_Modelbuild()
lear_net.summary()

In [37]:
# Extract intermediate features (output from "Conv_5_1")
feature_extractor = Model(inputs=lear_net.input, outputs=lear_net.get_layer('Conv_5_1').output)

In [38]:
def extract_image_features(image_path, feature_extractor):
    image = cv2.imread(image_path)
    image = cv2.resize(image, (112, 112)).astype('float32') / 255.0  # Resize and normalize
    image = np.expand_dims(image, axis=0)          # Add batch dimension
    features = feature_extractor.predict(np.expand_dims(image, axis=0))[0]   # Extract features
    return features.flatten()

In [39]:
def extract_video_features(video_path, feature_extractor):
    cap = cv2.VideoCapture(video_path)
    features = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (112, 112)).astype('float32') / 255.0 # Resize and normalize
        frame = np.expand_dims(frame, axis=0)          # Add batch dimension
        frame_features = feature_extractor.predict(np.expand_dims(frame, axis=0))[0]
        features.append(frame_features.flatten())      # Store flattened features
    cap.release()
    return np.array(features)

In [40]:
datagen = ImageDataGenerator(
    rotation_range=10,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.1,
    horizontal_flip=True,
    fill_mode='nearest'
)
#datagen.fit(X_train)
#train_generator = datagen.flow(X_train, y_train_onehot, batch_size=32)

In [41]:
# Model Checkpoint Callback
checkpoint = ModelCheckpoint('lear_net.keras', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

In [51]:
# Early Stopping Callback - Stops training if validation loss doesn't improve for 5 epochs
early_stopping = EarlyStopping(
    monitor='val_loss',  # Monitor validation loss
    patience=10,  # Stop training if no improvement for 5 epochs
    restore_best_weights=True  # Restore best weights when stopping
)

In [52]:
# Training the Model
history = lear_net.fit(datagen.flow(X_train, y_train_onehot, batch_size=32),
                       epochs=35,
                       validation_data=(X_test, y_test_onehot),
                       callbacks=[checkpoint, early_stopping])

Epoch 1/35
[1m832/832[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 134ms/step - accuracy: 0.9301 - loss: 0.1525
Epoch 1: val_accuracy did not improve from 0.95703
[1m832/832[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m118s[0m 141ms/step - accuracy: 0.9301 - loss: 0.1525 - val_accuracy: 0.9183 - val_loss: 0.2088
Epoch 2/35
[1m832/832[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 132ms/step - accuracy: 0.9289 - loss: 0.1549
Epoch 2: val_accuracy did not improve from 0.95703
[1m832/832[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m117s[0m 141ms/step - accuracy: 0.9289 - loss: 0.1549 - val_accuracy: 0.8628 - val_loss: 0.5037
Epoch 3/35
[1m832/832[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 132ms/step - accuracy: 0.9278 - loss: 0.1616
Epoch 3: val_accuracy did not improve from 0.95703
[1m832/832[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m117s[0m 140ms/step - accuracy: 0.9278 - loss: 0.1616 - val_accuracy: 0.8430 - val_loss: 0.7516
Epoch 4/35
[1m8

In [53]:
# Function to save the model
def save_model(model, filename='lear_net.h5'):
    model.save(filename)
    print(f"Model saved as {filename}")

In [54]:
save_model(lear_net)



Model saved as lear_net.h5


In [55]:
# Load the saved model (after training)
def load_trained_model(filename='lear_net.h5'):
    model = load_model(filename)
    print(f"Model loaded from {filename}")
    return model

In [56]:
# Function to evaluate the model
def evaluate_model(model, X_test, y_test):
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f"Test loss: {loss}")
    print(f"Test accuracy: {accuracy}")
    return loss, accuracy

In [57]:
# Function to predict the micro facial expression for a single image
def predict_expression(image_path, model, target_size=(112, 112)):
    # Load and preprocess the image
    image = cv2.imread(image_path)
    image = cv2.resize(image, target_size)  # Resize to match model input
    image = img_to_array(image) / 255.0  # Normalize the image
    image = np.expand_dims(image, axis=0)  # Add batch dimension

    # Predict using the model
    prediction = model.predict(image)

    # Get the predicted label (assuming the labels are one-hot encoded)
    predicted_label = np.argmax(prediction, axis=1)[0]
    return predicted_label

In [61]:
#Mapping Numerical Prediction to Emotion:
def get_emotion_label(predicted_label, label_encoder):
    emotion = label_encoder.classes_[predicted_label]
    return emotion

In [64]:
#Testing Prediction on a Single Image:
def test_single_prediction(image_path, model, label_encoder, target_size=(112, 112)):
    predicted_label = predict_expression(image_path, model, target_size)
    emotion = get_emotion_label(predicted_label, label_encoder)
    print(f"Predicted Micro Expression: {emotion}")

In [65]:
# Emotion labels (these are the 8 emotions you're interested in)
emotion_labels = ['tense', 'happiness', 'repression', 'disgust', 'surprise', 'contempt', 'fear', 'sadness']

In [66]:
label_encoder = LabelEncoder()
label_encoder.fit(emotion_labels)  # Fit on the 8 emotion labels

In [67]:
evaluate_model(lear_net, X_test, y_test_onehot)

[1m208/208[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 21ms/step - accuracy: 0.9490 - loss: 0.1192
Test loss: 0.1387115716934204
Test accuracy: 0.9507211446762085


(0.1387115716934204, 0.9507211446762085)

In [68]:
image_path ="data/sub08/EP12_2_1/EP12_2_1-006.jpg"
# Load the image
image = cv2.imread(image_path)

if image is None:
    print(f"Error: Image not found or unable to load at {image_path}")
else:
    print("Image loaded successfully.")

Image loaded successfully.


In [76]:
#Test on a Single Image:
#image_path = "FacialMicroExpression/data/sub01/EP01_5/EP01_5-2.jpg"
test_single_prediction(image_path, lear_net, label_encoder)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
Predicted Micro Expression: tense
