Libraries

In [2]:
import tensorflow as tf

from keras import Input, Model
from keras.callbacks import EarlyStopping
from keras.layers import Dense, Flatten, BatchNormalization, LSTM
from keras.optimizers import Adam
from keras.regularizers import l2, l1, l1_l2

# from spektral.layers import GraphConv
# GRaphConv is deprecated, use GCNConv or GCSConv instead
from spektral.utils.sparse import sp_matrix_to_sp_tensor
from spektral.utils import normalized_laplacian
from spektral.layers import GCSConv  # as GraphConv
from spektral.layers import GINConv # as GraphConv
from spektral.layers import GCNConv  # as GraphConv

from spektral.utils.convolution import gcn_filter  # For GCNConv
from spektral.utils.convolution import normalized_adjacency  # For GCSConv

import numpy as np
import scipy.sparse
import pandas as pd
import pathlib
import os
import json

from sklearn.model_selection import train_test_split
from mediapipe.python.solutions.face_mesh_connections import FACEMESH_TESSELATION

from utils import *

import numpy as np

ModuleNotFoundError: No module named 'utils'

Functions

In [3]:
# Parameters
l2_reg = 5e-4
learning_rate = 1e-3
batch_size = 32
epochs = 5
es_patience = 200
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

Function

In [None]:
def get_mediapipe_adjacency_matrix():
    """Create adjacency matrix from MediaPipe face mesh connections"""
    # MediaPipe face mesh has 468 landmarks
    n_nodes = 468
    adj_matrix = np.zeros((n_nodes, n_nodes), dtype=np.float32)
    
    # Add edges based on MediaPipe face mesh connections
    for connection in FACEMESH_TESSELATION:
        i, j = connection[0], connection[1]
        if i < n_nodes and j < n_nodes:  # Ensure indices are valid
            adj_matrix[i, j] = 1.0
            adj_matrix[j, i] = 1.0  # Undirected graph
    
    # Add self-loops
    np.fill_diagonal(adj_matrix, 1.0)
    
    return adj_matrix

def normalize_mesh_points(mesh_points):
    """Normalize mesh points to unit scale"""
    mesh_points = np.array(mesh_points, dtype=np.float32)
    
    # Center the points
    center = np.mean(mesh_points, axis=0)
    mesh_points = mesh_points - center
    
    # Scale to unit variance
    scale = np.std(mesh_points)
    if scale > 0:
        mesh_points = mesh_points / scale
    
    return mesh_points

def load_mesh_data(path_list, limit=1.0):
    """Load and preprocess mesh data from directories"""
    all_meshes = []
    all_labels = []
    
    for emotion_idx, path in enumerate(path_list):
        if not path.exists():
            print(f"Path does not exist: {path}")
            continue
            
        files = os.listdir(path)
        num_files = len(files)
        files_to_process = int(num_files * limit)
        
        emotion_meshes = []
        processed = 0
        
        for file in files[:files_to_process]:
            if not file.endswith('.json'):
                continue
                
            file_path = path / file
            try:
                with open(file_path, 'r') as f:
                    data = json.load(f)
                    
                # Ensure we have exactly 468 landmarks
                if len(data) != 468:
                    print(f"Skipping {file}: expected 468 landmarks, got {len(data)}")
                    continue
                
                # Normalize the mesh points
                normalized_mesh = normalize_mesh_points(data)
                emotion_meshes.append(normalized_mesh)
                processed += 1
                
            except Exception as e:
                print(f"Error loading {file}: {e}")
                continue
        
        print(f"Loaded {processed} samples for emotion {emotion_idx}")
        
        # Add to overall lists
        all_meshes.extend(emotion_meshes)
        all_labels.extend([emotion_idx] * len(emotion_meshes))
    
    return np.array(all_meshes, dtype=np.float32), np.array(all_labels, dtype=np.int32)



Load Dataset

In [None]:
# Define paths
current_path = pathlib.Path().absolute()
parent_path = current_path.parent

path_list = [
    parent_path / 'angry_meshpoints',
    parent_path / 'disgusted_meshpoints', 
    parent_path / 'happy_meshpoints',
    parent_path / 'neutral_meshpoints',
    parent_path / 'sad_meshpoints',
    parent_path / 'surprised_meshpoints'
]

emotion_names = ['angry', 'disgusted', 'happy', 'neutral', 'sad', 'surprised']

print("Loading mesh data...")
X_data, y_data = load_mesh_data(path_list, limit=1.0)

print(f"Total samples loaded: {len(X_data)}")
print(f"Data shape: {X_data.shape}")
print(f"Labels shape: {y_data.shape}")

# Print class distribution
unique, counts = np.unique(y_data, return_counts=True)
for emotion_idx, count in zip(unique, counts):
    print(f"{emotion_names[emotion_idx]}: {count} samples")

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(
    X_data, y_data, test_size=0.2, shuffle=True, random_state=RANDOM_SEED, stratify=y_data
)

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.2, shuffle=True, random_state=RANDOM_SEED, stratify=y_train
)

print(f"Train samples: {len(X_train)}")
print(f"Validation samples: {len(X_val)}")
print(f"Test samples: {len(X_test)}")

# Convert to TensorFlow tensors
X_train_tensor = tf.constant(X_train, dtype=tf.float32)
y_train_tensor = tf.constant(y_train, dtype=tf.int32)
X_val_tensor = tf.constant(X_val, dtype=tf.float32)
y_val_tensor = tf.constant(y_val, dtype=tf.int32)
X_test_tensor = tf.constant(X_test, dtype=tf.float32)
y_test_tensor = tf.constant(y_test, dtype=tf.int32)

print(f"Train tensor shape: {X_train_tensor.shape}")
print(f"Train labels shape: {y_train_tensor.shape}")

# Model parameters
n_out = 6  # Number of emotion classes
N = X_train_tensor.shape[1]  # Number of nodes (468)
F = X_train_tensor.shape[2]  # Feature dimensionality (2 for x,y coordinates)

print(f"Number of nodes: {N}")
print(f"Number of features per node: {F}")

# Create adjacency matrix
print("Creating adjacency matrix...")
adj_matrix = get_mediapipe_adjacency_matrix()
print(f"Adjacency matrix shape: {adj_matrix.shape}")
print(f"Number of edges: {np.sum(adj_matrix > 0) // 2}")  # Divide by 2 for undirected graph

# Convert to sparse tensor for efficiency
adj_sparse = scipy.sparse.csr_matrix(adj_matrix)
adj_tensor = sp_matrix_to_sp_tensor(adj_sparse)




Build Model

In [None]:
# Build the GNN model
print("Building model...")
X_in = Input(shape=(N, F), name='node_features')

# First GNN layer
x = GINConv(64, activation="relu", kernel_regularizer=l2(l2_reg))([X_in, adj_tensor])
x = BatchNormalization()(x)
x = Dropout(0.3)(x)

# Second GNN layer
x = GINConv(32, activation="relu", kernel_regularizer=l2(l2_reg))([x, adj_tensor])
x = BatchNormalization()(x)
x = Dropout(0.3)(x)

# Global pooling (flatten all node features)
x = Flatten()(x)

# Fully connected layers
x = Dense(256, activation="relu", kernel_regularizer=l2(l2_reg))(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)

x = Dense(128, activation="relu", kernel_regularizer=l2(l2_reg))(x)
x = BatchNormalization()(x)
x = Dropout(0.3)(x)

# Output layer
output = Dense(n_out, activation="softmax", name='emotion_output')(x)

# Create and compile model
model = Model(inputs=X_in, outputs=output)
optimizer = Adam(learning_rate=learning_rate)
model.compile(
    optimizer=optimizer,
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

model.summary()


Train Model

In [None]:
# Train the model
print("Starting training...")
history = model.fit(
    X_train_tensor,
    y_train_tensor,
    batch_size=batch_size,
    validation_data=(X_val_tensor, y_val_tensor),
    epochs=epochs*5,
    callbacks=[EarlyStopping(patience=es_patience, restore_best_weights=True)],
    verbose=1
)
print("Training completed!")
