In [None]:
import numpy as np
import cv2
import mediapipe as mp
from mediapipe.python.solutions.face_mesh_connections import FACEMESH_TESSELATION
import json
import networkx as nx
import matplotlib.pyplot as plt
import math
import scipy.sparse
import pathlib, os


In [2]:
# Initialize MediaPipe
mp_face_mesh = mp.solutions.face_mesh
mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
face_mesh = mp_face_mesh.FaceMesh(
    max_num_faces=1, 
    refine_landmarks=False,  # Set to False to get 468 landmarks without iris
    min_detection_confidence=0.5, 
    min_tracking_confidence=0.5
)

def preprocess_frame(frame):
    """
    Process frame and extract facial landmarks
    Returns None if no face is detected
    """
    H, W, _ = frame.shape
    rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results_mesh = face_mesh.process(rgb_image)
    
    # Check if face landmarks were detected
    if not results_mesh.multi_face_landmarks:
        return None, None, None
    
    # Extract mesh points
    mesh_points = np.array([
        np.multiply([p.x, p.y], [W, H]).astype(int) 
        for p in results_mesh.multi_face_landmarks[0].landmark
    ])
    
    # Calculate scale factor using nose tip and forehead
    nose_tip = mesh_points[4]
    forehead = mesh_points[151]
    scale_factor = np.linalg.norm(forehead - nose_tip)
    
    if np.isclose(scale_factor, 0):
        scale_factor = 1e-6
        
    return results_mesh, mesh_points, scale_factor

def gera_grafos(results_mesh, mesh_points, scale_factor):
    """Generate graph from mesh points"""
    graph = nx.Graph()
    
    # Add all nodes from mesh_points to the graph
    for i in range(len(mesh_points)):
        graph.add_node(i, pos=mesh_points[i])
    
    # Add edges based on MediaPipe face mesh connections
    for connection in FACEMESH_TESSELATION:
        graph.add_edge(connection[0], connection[1])
    
    return graph

def plot_graph(graph, mesh_points):
    """Plot the facial mesh graph"""
    plt.figure(figsize=(10, 10))
    pos_dict = {i: mesh_points[i] for i in range(len(mesh_points))}
    nx.draw_networkx(
        graph, 
        pos=pos_dict, 
        node_size=10, 
        node_color='black', 
        edge_color='gray', 
        with_labels=False,
        width=0.5
    )
    # Flip the image upside down to match face orientation
    plt.gca().invert_yaxis()
    plt.title("Facial Mesh Graph")
    plt.axis('equal')
    plt.show()

def get_matrix_adj(graph):
    """Get adjacency matrix from graph"""
    matrix_adj = nx.adjacency_matrix(graph)
    sparse_matrix = scipy.sparse.csr_matrix(matrix_adj)
    return sparse_matrix

def save_adjacency_matrix(adjacency_matrix, filename):
    """Save adjacency matrix to file"""
    path_name = pathlib.Path(filename).parent.absolute()
    emotion = path_name.name.split('_')[1]
    path_name = path_name.parent.absolute()
    path_name = path_name.joinpath(f'{emotion}_adj')
    
    filename = pathlib.Path(filename).absolute()
    path_name.mkdir(parents=True, exist_ok=True)
    
    file_out = path_name.joinpath(f'{filename.stem}.npz')
    scipy.sparse.save_npz(file_out, adjacency_matrix)

def save_meshpoints(mesh_points, filename):
    """Save mesh points to JSON file"""
    path_name = pathlib.Path(filename).parent.absolute()
    emotion = path_name.name.split('_')[1]
    path_name = path_name.parent.absolute()
    path_name = path_name.joinpath(f'{emotion}_meshpoints')
    
    filename = pathlib.Path(filename).absolute()
    path_name.mkdir(parents=True, exist_ok=True)
    
    file_out = path_name.joinpath(f'{filename.stem}.json')
    with open(file_out, 'w') as outfile:
        json.dump(mesh_points.tolist(), outfile)

def pipeline(img_path, plot=False, verbose=False):
    """Main processing pipeline"""
    try:
        # Read image
        frame = cv2.imread(img_path)
        if frame is None:
            if verbose:
                print(f"Could not read image: {img_path}")
            return False
        
        # Preprocess frame
        results_mesh, mesh_points, scale_factor = preprocess_frame(frame)
        
        if results_mesh is None:
            if verbose:
                print(f"No face detected in: {img_path}")
            return False
        
        # Generate graph
        graph = gera_grafos(results_mesh, mesh_points, scale_factor)
        
        # Save data
        name = pathlib.Path(img_path).stem
        adjacency_matrix = get_matrix_adj(graph)
        save_adjacency_matrix(adjacency_matrix, img_path)
        save_meshpoints(mesh_points, img_path)
        
        # Plot if requested
        if plot:
            plot_graph(graph, mesh_points)
        
        return True
        
    except Exception as e:
        if verbose:
            print(f"Error processing {img_path}: {str(e)}")
        return False

def process_dataset(verbose=False, show_progress=True):
    """Process the entire dataset - ALL images in ALL emotion folders"""
    print("Current path:", pathlib.Path().absolute())
    current_path = pathlib.Path().absolute()
    parent_path = current_path.parent
    
    # Define emotion paths
    emotion_folders = [
        'face_angry', 'face_disgusted', 'face_happy', 
        'face_neutral', 'face_sad', 'face_surprised'
    ]
    
    path_list = [parent_path / folder for folder in emotion_folders]
    
    total_processed = 0
    total_errors = 0
    
    for emotion_path in path_list:
        if not emotion_path.exists():
            print(f"Path does not exist: {emotion_path}")
            continue
            
        print(f"\nProcessing emotion: {emotion_path.name}")
        
        error_count = 0
        img_count = 0
        
        try:
            emotion_files = os.listdir(emotion_path)
            image_files = [f for f in emotion_files if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]
            
            print(f"Found {len(image_files)} image files")
            
            for img in image_files:
                try:
                    path_img = emotion_path / img
                    
                    # Show progress every 100 images (or based on show_progress)
                    if show_progress and (img_count % 100 == 0 or img_count < 10):
                        print(f"Processing {img_count+1}/{len(image_files)}: {img}")
                    
                    success = pipeline(str(path_img), plot=False, verbose=verbose)
                    
                    if success:
                        total_processed += 1
                        if verbose and img_count < 5:  # Only show first few if verbose
                            print(f"✅ Successfully processed: {img}")
                    else:
                        error_count += 1
                        total_errors += 1
                        if verbose:
                            print(f"❌ Failed to process: {img}")
                    
                    img_count += 1
                    
                except Exception as e:
                    if verbose:
                        print(f"❌ Error processing {img}: {str(e)}")
                    error_count += 1
                    total_errors += 1
                    continue
            
            print(f"Completed {emotion_path.name}: {img_count} total, {error_count} errors")
            
        except Exception as e:
            print(f"Error accessing folder {emotion_path}: {str(e)}")
    
    print(f"\n=== FINAL SUMMARY ===")
    print(f"Total images processed successfully: {total_processed}")
    print(f"Total errors: {total_errors}")
    if (total_processed + total_errors) > 0:
        print(f"Success rate: {total_processed/(total_processed + total_errors)*100:.1f}%")

if __name__ == "__main__":
    process_dataset()

Current path: d:\hask\GNN

Processing emotion: face_angry
Found 7978 image files
Processing 1/7978: 0.jpg




Processing 2/7978: 1.jpg
Processing 3/7978: 10.jpg
Processing 4/7978: 10002.jpg
Processing 5/7978: 10016.jpg
Processing 6/7978: 10038.jpg
Processing 7/7978: 10052.jpg
Processing 8/7978: 10063.jpg
Processing 9/7978: 10065.jpg
Processing 10/7978: 10069.jpg
Processing 101/7978: 10743.jpg
Processing 201/7978: 1158.jpg
Processing 301/7978: 12404.jpg
Processing 401/7978: 13208.jpg
Processing 501/7978: 13942.jpg
Processing 601/7978: 14719.jpg
Processing 701/7978: 15620.jpg
Processing 801/7978: 16312.jpg
Processing 901/7978: 17140.jpg
Processing 1001/7978: 17856.jpg
Processing 1101/7978: 18492.jpg
Processing 1201/7978: 19067.jpg
Processing 1301/7978: 19765.jpg
Processing 1401/7978: 2067.jpg
Processing 1501/7978: 21371.jpg
Processing 1601/7978: 22177.jpg
Processing 1701/7978: 23.jpg
Processing 1801/7978: 23797.jpg
Processing 1901/7978: 24660.jpg
Processing 2001/7978: 25421.jpg
Processing 2101/7978: 26175.jpg
Processing 2201/7978: 26894.jpg
Processing 2301/7978: 27511.jpg
Processing 2401/7978: 2

In [3]:
mp_face_mes = mp.solutions.face_mesh
mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
face_mesh = mp_face_mes.FaceMesh(max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5)

In [4]:
def preprocess_frame(frame):
    H, W, _ = frame.shape
    rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results_mesh = face_mesh.process(rgb_image)
    mesh_points=np.array([np.multiply([p.x, p.y], [W, H]).astype(int) for p in results_mesh.multi_face_landmarks[0].landmark])
    nose_tip = mesh_points[4]
    forehead = mesh_points[151]
    scale_factor = np.linalg.norm(forehead - nose_tip)
    if np.isclose(scale_factor, 0):
        scale_factor = 1e-6
    return results_mesh, mesh_points, scale_factor

In [5]:
def gera_grafos(results_mesh, mesh_points, scale_factor):
    graph = nx.Graph()
    # sabing all the nodes from mesh_points into the graph
    for i in range(len(mesh_points)):
        graph.add_node(i, pos=mesh_points[i])
    return graph

In [6]:
def plot_graph(graph, mesh_points):
    plt.figure(figsize=(10,10))
    nx.draw_networkx(graph, pos=mesh_points, node_size=10, node_color='black', edge_color='black', with_labels=False)
    # flip the image upside down
    plt.gca().invert_yaxis()
    plt.show()

In [7]:
def get_matrix_adj(graph):
    matrix_adj = nx.adjacency_matrix(graph)
    sparce_matrix = scipy.sparse.csr_matrix(matrix_adj)
    return sparce_matrix    

In [8]:
def save_adjacency_matrix(adjacency_matrix, filename):
    path_name = pathlib.Path(filename).parent.absolute()
    emotion = path_name.name.split('_')[1]
    path_name = path_name.parent.absolute()
    path_name = path_name.joinpath(f'{emotion}_adj')
    # separate the filename from the path
    filename = pathlib.Path(filename).absolute()
    # create te folder if it doesn't exist
    path_name.mkdir(parents=True, exist_ok=True)
    # save the npz file in the path_name
    file_out = path_name.joinpath(f'{filename.name}.npz')
    scipy.sparse.save_npz(file_out, adjacency_matrix)

In [None]:
def save_meshpoints(mesh_points, filename):
    path_name = pathlib.Path(filename).parent.absolute()
    emotion = path_name.name.split('_')[1]
    path_name = path_name.parent.absolute()
    path_name = path_name.joinpath(f'{emotion}_meshpoints')
    # separete the filename from the path
    filename = pathlib.Path(filename).absolute()
    # create te folder if it doesn't exist
    path_name.mkdir(parents=True, exist_ok=True)
    # save the npz file in the path_name
    file_out = path_name.joinpath(f'{filename.name}.json')
    with open(file_out, 'w') as outfile:
        json.dump(mesh_points.tolist(), outfile)

In [None]:
def pipeline(img, plot=False):
    name = img.split('.')[0]
    frame = cv2.imread(img)
    results_mesh, mesh_points, scale_factor = preprocess_frame(frame)
    graph = gera_grafos(results_mesh, mesh_points, scale_factor)
    # adjacency_matrix = get_matrix_adj(graph)
    # save_adjacency_matrix(adjacency_matrix, name)
    save_meshpoints(mesh_points, name)
    if plot:
        plot_graph(graph, mesh_points)

## Applying the pipeline to the training set

In [None]:
# print the actual path
print(pathlib.Path().absolute())

In [None]:
current_path = pathlib.Path().absolute()
path = current_path.parent

angry_path = path / 'face_angry'
disgusted_path = path / 'face_disgusted'
happy_path = path / 'face_happy'
neutral_path = path / 'face_neutral'
sad_path = path / 'face_sad'
surprised_path = path / 'face_surprised'

path_list = [angry_path, disgusted_path, happy_path, neutral_path, sad_path, surprised_path]


In [None]:
for emotion_path in path_list:
    count = 0
    p_c = 0
    print(emotion_path)
    emotion_files = os.listdir(emotion_path)
    for img in emotion_files:
        try:
            path_img = emotion_path / img
            print(path_img)
            pipeline(str(path_img), plot=True)
            if p_c == 10:
                break
            p_c += 1
        except Exception as e:
            print(e)
            count += 1
            if count == 3:
                break
    print(count)
    break

In [None]:
for emotion_path in path_list:
    count = 0
    img_count =1
    print(emotion_path)
    emotion_files = os.listdir(emotion_path)
    for img in emotion_files:
        try:
            path_img = emotion_path / img
            pipeline(str(path_img), plot=True)
            img_count += 1
        except Exception as e:
            #print(e)
            count += 1
            continue
    print('erros',count)

In [None]:
# print the amount of processed images
for emotion_path in path_list:
    print(len(os.listdir(emotion_path)))