In [1]:
# Import libraries
import cv2
import csv
from datetime import datetime
import mediapipe as mp
import numpy as np
from scipy.stats import zscore



In [None]:
# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
mp_drawing = mp.solutions.drawing_utils

# Initialize video capture
cap = cv2.VideoCapture(0)

# Set the FPS
cap.set(cv2.CAP_PROP_FPS, 60)

# Indices for the nose tip and chin landmarks
NOSE_TIP_INDEX = 1
CHIN_INDEX = 152

# Set drawing specifications
landmark_specs = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)

# Create a CSV file with a timestamp in the filename
now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M%S_%f")
initial_timestamp_in_microseconds = (
    now.hour * 3_600_000_000 +   # Hours to microseconds
    now.minute * 60_000_000 +    # Minutes to microseconds
    now.second * 1_000_000 +     # Seconds to microseconds
    now.microsecond              # Already in microseconds
)
csv_filename = f"data/csv/face_landmarks_{timestamp}.csv"

# Peak detection variables
distances = []
peak_count = 0
interval_count = 0
Z_THRESHOLD = 2.22  # Z-score threshold for peak detection

# Total number of landmarks in MediaPipe Face Mesh
TOTAL_LANDMARKS = 468

with open(csv_filename, 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)

    # Generate header
    header = ['Frame_Count', 'Timestamp', 'Peak_Count', 'Distance']
    for i in range(1, TOTAL_LANDMARKS + 1):
        header.extend([f'{i}_X', f'{i}_Y'])
    csv_writer.writerow(header)    
    
    # Create Face Mesh object with specific settings
    with mp_face_mesh.FaceMesh(
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    ) as face_mesh:

        # Check if the video source is available
        if not cap.isOpened():
            print("Error: Could not open video source.")
        else:
            frame_count = 0
            # Process each video frame
            while cap.isOpened():
                success, image = cap.read()

                if not success:
                    print("Ignoring empty camera frame.")
                    continue

                # Convert the image to RGB for face mesh processing
                image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                results = face_mesh.process(image_rgb)

                # If face landmarks are detected
                if results.multi_face_landmarks:
                    for face_landmarks in results.multi_face_landmarks:

                        # Get image dimensions
                        h, w, _ = image.shape

                        # Extract the landmark coordinates
                        landmark_coords = []
                        for idx, landmark in enumerate(face_landmarks.landmark):
                            x = int(landmark.x * w)
                            y = int(landmark.y * h)
                            landmark_coords.append((x, y))

                        # Extract nose tip and chin coordinates
                        nose_tip_coords = landmark_coords[NOSE_TIP_INDEX]
                        chin_coords = landmark_coords[CHIN_INDEX]

                        # Calculate the Euclidean distance
                        distance = round(
                            ((nose_tip_coords[0] - chin_coords[0])**2 + 
                             (nose_tip_coords[1] - chin_coords[1])**2)**0.5, 10
                        )
                        
                        # Add distance to the list
                        distances.append(distance)
                        
                        # Get timestamp in microseconds
                        now = datetime.now()
                        timestamp_in_microseconds = (
                            now.hour * 3_600_000_000 +   # Hours to microseconds
                            now.minute * 60_000_000 +    # Minutes to microseconds
                            now.second * 1_000_000 +     # Seconds to microseconds
                            now.microsecond              # Already in microseconds
                        )
                        if initial_timestamp_in_microseconds >= timestamp_in_microseconds:
                            timestamp_in_microseconds = timestamp_in_microseconds + 3_600_000_000 * 24 - initial_timestamp_in_microseconds
                        else:
                            timestamp_in_microseconds -= initial_timestamp_in_microseconds
                        
                        # Real-time Z-Score calculation for peak detection
                        if len(distances) > 10:  # Start detecting after 10 frames
                            z_scores = zscore(distances[-10:])  # Calculate Z-score for the last 10 distances

                            if interval_count > 0:
                                interval_count -= 1
                            
                            elif abs(z_scores[-1]) > Z_THRESHOLD:
                                peak_count += 1
                                interval_count = 8
                                print(f"Peak detected! Total peaks: {peak_count}")

                        # Prepare data row
                        data_row = [frame_count, timestamp_in_microseconds, peak_count, distance]
                        for x, y in landmark_coords:
                            data_row.extend([x, y])
                            
                        # Write coordinates to CSV
                        csv_writer.writerow(data_row)
                            
                        # Draw the landmark points on the image
                        for coords in [nose_tip_coords, chin_coords]:
                            cv2.circle(image, coords, 5, (0, 255, 0), -1)

                        # Draw lines between the points
                        cv2.line(image, nose_tip_coords, chin_coords, (255, 0, 0), 2)

                # Display the output
                cv2.imshow("Face Landmarks Detection", cv2.flip(image, 1))

                frame_count += 1

                # Break the loop if 'q' is pressed
                if cv2.waitKey(16) == ord('q'):
                    break

# Release the capture and close windows
cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1)

print(f"Landmarks data saved to {csv_filename}")

Peak detected! Total peaks: 1
Peak detected! Total peaks: 2
Peak detected! Total peaks: 3
Peak detected! Total peaks: 4
Peak detected! Total peaks: 5
Peak detected! Total peaks: 6
Peak detected! Total peaks: 7
Peak detected! Total peaks: 8
Peak detected! Total peaks: 9
Peak detected! Total peaks: 10
Peak detected! Total peaks: 11
Peak detected! Total peaks: 12
Peak detected! Total peaks: 13
Peak detected! Total peaks: 14
Peak detected! Total peaks: 15
Peak detected! Total peaks: 16
Peak detected! Total peaks: 17
Landmarks data saved to data/csv/face_landmarks_20241112_134404_543579.csv
