In [41]:
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

print('No. of GPUs:', len(tf.config.experimental.list_physical_devices('GPU')))

No. of GPUs: 0


In [42]:
def preprocess_image(image, target_size=(200, 200)):
    resized_image = cv2.resize(image, target_size, interpolation=cv2.INTER_AREA)
    normalized_image = np.array(resized_image, dtype=np.float32) / 255.0  # Normalize to [0, 1]
    return normalized_image

# Function to compute optical flow
def compute_optical_flow(frames):
    flows = []
    for i in range(1, len(frames)):
        prev_frame = frames[i-1]
        next_frame = frames[i]
        flow = cv2.calcOpticalFlowFarneback(prev_frame, next_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        flows.append(flow)
    return flows

In [43]:
# Function to apply optical flow and preprocess images
def applyOpticalFlow(path, target_size=(200, 200)):
    file_names = sorted(os.listdir(path))
    spatial_features = []
    for fn in file_names:
        img = cv2.imread(f"{path}/{fn}", cv2.IMREAD_GRAYSCALE)
        if img is not None:
            preprocessed_img = preprocess_image(img, target_size)
            spatial_features.append(preprocessed_img)
    flows = compute_optical_flow(spatial_features)
    return spatial_features, flows

In [44]:
df = pd.read_csv('./casme/CASME2-coding-20140508.csv').iloc[:12,:]
rootPath = "./casme/Cropped/Cropped"
X = []
labels = []
for index, row in df.iterrows():
    X.append(f"{rootPath}/sub{'0' if row['Subject'] < 10 else ''}{row['Subject']}/{row['Filename']}")
    labels.append(row['Estimated Emotion'])

data = pd.DataFrame({'Path': X, 'Emotion': labels})
data

Unnamed: 0,Path,Emotion
0,./casme/Cropped/Cropped/sub01/EP02_01f,happiness
1,./casme/Cropped/Cropped/sub01/EP03_02,others
2,./casme/Cropped/Cropped/sub01/EP04_02,others
3,./casme/Cropped/Cropped/sub01/EP04_03,others
4,./casme/Cropped/Cropped/sub01/EP04_04,others
5,./casme/Cropped/Cropped/sub01/EP19_01,others
6,./casme/Cropped/Cropped/sub01/EP19_03f,others
7,./casme/Cropped/Cropped/sub01/EP19_05f,disgust
8,./casme/Cropped/Cropped/sub01/EP19_06f,disgust
9,./casme/Cropped/Cropped/sub02/EP01_11f,repression


In [45]:
# Preprocess data
spatial_features, motion_features = [], []
for idx, row in data.iterrows():
    spatial, motion = applyOpticalFlow(row['Path'])
    spatial_features.append(spatial)
    motion_features.append(motion)

spatial_features = [seq for seq in spatial_features]
motion_features = [seq for seq in motion_features]

# Ensure all sequences have the same length
max_length = max(len(seq) for seq in motion_features)
motion_features = np.array([np.pad(seq, ((0, max_length - len(seq)), (0, 0), (0, 0), (0, 0)), mode='constant') for seq in motion_features], dtype=np.float32)

print(len(labels))
# Encode labels
le = LabelEncoder()
labels = le.fit_transform(labels)
num_classes = 4
labels_one_hot = tf.keras.utils.to_categorical(labels, num_classes=num_classes)


12


In [46]:
# Split the data
X_train_spatial, X_test_spatial, X_train_motion, X_test_motion, y_train, y_test = train_test_split(
    spatial_features, motion_features, labels_one_hot, test_size=0.2, random_state=42
)

In [47]:
# Pad sequences to ensure they have the same shape
X_train_spatial_padded = tf.keras.preprocessing.sequence.pad_sequences(X_train_spatial, padding='post', dtype='float32')
X_train_motion_padded = tf.keras.preprocessing.sequence.pad_sequences(X_train_motion, padding='post', dtype='float32')
X_test_spatial_padded = tf.keras.preprocessing.sequence.pad_sequences(X_test_spatial, padding='post', dtype='float32')
X_test_motion_padded = tf.keras.preprocessing.sequence.pad_sequences(X_test_motion, padding='post', dtype='float32')

# Convert data to TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices(((X_train_spatial_padded, X_train_motion_padded), y_train)).batch(32).prefetch(tf.data.AUTOTUNE)
test_dataset = tf.data.Dataset.from_tensor_slices(((X_test_spatial_padded, X_test_motion_padded), y_test)).batch(32).prefetch(tf.data.AUTOTUNE)

In [48]:
# Define the model
input_shape_motion = (max_length, 200, 200, 2)
input_motion = tf.keras.layers.Input(shape=input_shape_motion)

y = tf.keras.layers.TimeDistributed(tf.keras.layers.Conv2D(32, (3, 3), activation='relu'))(input_motion)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPooling2D((2, 2)))(y)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.Conv2D(64, (3, 3), activation='relu'))(y)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPooling2D((2, 2)))(y)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.Flatten())(y)

lstm_out = tf.keras.layers.LSTM(128)(y)

In [49]:
y = tf.keras.layers.Reshape((-1, y.shape[-1]))(y)  # Keep the batch dimension dynamic
lstm_out = tf.keras.layers.LSTM(128)(y)


In [51]:
# Adjust the input shapes to match the expected dimensions
input_shape_spatial = (56, 200, 200)
input_shape_motion = (max_length, 200, 200, 2)

input_spatial = tf.keras.layers.Input(shape=input_shape_spatial)
input_motion = tf.keras.layers.Input(shape=input_shape_motion)

# Define the CNN model for spatial features
cnn_out = tf.keras.layers.Conv2D(32, (3, 3), activation='relu')(input_spatial)
cnn_out = tf.keras.layers.MaxPooling2D((2, 2))(cnn_out)
cnn_out = tf.keras.layers.Conv2D(64, (3, 3), activation='relu')(cnn_out)
cnn_out = tf.keras.layers.MaxPooling2D((2, 2))(cnn_out)
cnn_out_flat = tf.keras.layers.Flatten()(cnn_out)

# Define the LSTM model for motion features
y = tf.keras.layers.TimeDistributed(tf.keras.layers.Conv2D(32, (3, 3), activation='relu'))(input_motion)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPooling2D((2, 2)))(y)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.Conv2D(64, (3, 3), activation='relu'))(y)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.MaxPooling2D((2, 2)))(y)
y = tf.keras.layers.TimeDistributed(tf.keras.layers.Flatten())(y)
lstm_out = tf.keras.layers.LSTM(128)(y)

# Concatenate spatial and motion features
combined = tf.keras.layers.concatenate([cnn_out_flat, lstm_out])

# Output layer
output = tf.keras.layers.Dense(num_classes, activation='softmax')(combined)

model = tf.keras.models.Model(inputs=[input_spatial, input_motion], outputs=output)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Pad spatial features to ensure they have the same shape
X_train_spatial_padded = tf.keras.preprocessing.sequence.pad_sequences(X_train_spatial, maxlen=56, padding='post', dtype='float32')
X_test_spatial_padded = tf.keras.preprocessing.sequence.pad_sequences(X_test_spatial, maxlen=56, padding='post', dtype='float32')

# Convert data to TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices(((X_train_spatial_padded, X_train_motion_padded), y_train)).batch(2).prefetch(tf.data.AUTOTUNE)
test_dataset = tf.data.Dataset.from_tensor_slices(((X_test_spatial_padded, X_test_motion_padded), y_test)).batch(2).prefetch(tf.data.AUTOTUNE)

print('No. of GPUs:', len(tf.config.experimental.list_physical_devices('GPU')))
# Train the model
history= model.fit(train_dataset, epochs=5, validation_data=test_dataset)
model.save('model.h5')

No. of GPUs: 0
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [52]:
history = history.history

In [58]:
print('Test Acc: ', max(history['accuracy'])*100, "%")
print('Test Loss: ', min(history['loss'])*100, "%")

Test Acc:  77.77777910232544 %
Test Loss:  48.71918857097626 %
