# Documentation for LSTM model for actions recognition

# 1: Annotation Files

To create the dataset, we first need annotations. Annotations created with **ELAN** can be exported as CSV files using the following steps:

1. Go to **File** -> **Export As** -> **Tab-delimited Text**.
2. Select the desired annotations.
3. Export the file, ensuring to choose the CSV format.

Once exported, you should add a header to the CSV file to make reading the various fields easier later on. The following code represents how to prepare annotations for one video.

In [39]:
# Add a header to a CSV file
import pandas as pd
def add_header(csv_in_path, csv_out_path):
    # Header to add to each annotation
    HEADER = ["Event", "Nan", "Start_time", "Start_seconds", "End_time", "End_seconds", "Duration", "Duration_seconds", "Tools/Speaker"]
    annotations_df = pd.read_csv(csv_in_path)
    # Add the header
    annotations_df.columns = HEADER
    # Overwrite the CSV file with the new header
    annotations_df.to_csv(csv_out_path, index=False)
    # Print the first few rows of the file
    print(annotations_df.head())

path_in = "Annotations_Untitled.csv"
path_out = "Annotations_with_header.csv"
add_header(path_in, path_out)


    Event  Nan    Start_time  Start_seconds      End_time  End_seconds  \
0  speech  NaN  00:00:01.347          1.347  00:00:09.667        9.667   
1  speech  NaN  00:00:11.472         11.472  00:00:15.320       15.320   
2  speech  NaN  00:00:26.272         26.272  00:00:32.009       32.009   
3  speech  NaN  00:00:38.135         38.135  00:00:39.147       39.147   
4  speech  NaN  00:00:56.680         56.680  00:00:58.823       58.823   

       Duration  Duration_seconds Tools/Speaker  
0  00:00:08.320             8.320           NaN  
1  00:00:03.848             3.848           NaN  
2  00:00:05.737             5.737           NaN  
3  00:00:01.012             1.012           NaN  
4  00:00:02.143             2.143           NaN  


## 1.1 (Optional): Creating "Speech" and "Other" Annotations

For the purpose of this study, two additional annotations were created:

- **Speech**: To identify when a person is speaking.
- **Other**: To categorize actions or states not covered by other annotations, helping the model's generalization.

To create these annotations, refer to sections *1.1.1* and *1.1.2*.


### 1.1.1 Creating the "Speech" Annotation

To detect and segment speakers in an audio file, the open-source **Speaker Diarization 3.1** model by Pyannote was used. This model enables accurate speaker diarization and segmentation.

The code and detailed instructions for downloading the model and generating the required access token can be found on the official repository:  
[Pyannote-Audio GitHub Repository](https://github.com/pyannote/pyannote-audio).


In [None]:
# Creating Speech Annotation
# Pyannote Speaker Diarization segments audio based on different speakers
import csv
from datetime import timedelta
from pyannote.audio import Pipeline
import torch

# Function to print speakers from an audio file
def print_speakers(audio_file):
    # Load speaker diarization pipeline
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1", 
        use_auth_token="Pippo") # TODO: Insert token here. To obtain the token, refer to the GitHub link mentioned above.

    # Send pipeline to GPU (when available)
    pipeline.to(torch.device("cuda"))
    diarization = pipeline(audio_file)  

    # Print the results
    print("Speaker Diarization:")
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")
    return diarization

# Run diarization on the audio file and print the speakers
diarization = print_speakers("01.wav")

Speaker Diarization:
start=0.0s stop=0.1s speaker_SPEAKER_03
start=0.1s stop=1.0s speaker_SPEAKER_00
start=1.3s stop=2.6s speaker_SPEAKER_00
start=1.3s stop=9.7s speaker_SPEAKER_03
start=3.3s stop=5.5s speaker_SPEAKER_00
start=10.1s stop=10.1s speaker_SPEAKER_03
start=10.1s stop=10.2s speaker_SPEAKER_00
start=10.2s stop=10.4s speaker_SPEAKER_03
start=10.4s stop=10.9s speaker_SPEAKER_00
start=10.9s stop=11.0s speaker_SPEAKER_03
start=11.0s stop=11.5s speaker_SPEAKER_00
start=11.5s stop=15.3s speaker_SPEAKER_03
start=11.5s stop=13.2s speaker_SPEAKER_00
start=14.4s stop=18.0s speaker_SPEAKER_00
start=18.9s stop=21.1s speaker_SPEAKER_00
start=21.6s stop=22.1s speaker_SPEAKER_00
start=21.7s stop=23.9s speaker_SPEAKER_02
start=24.7s stop=25.0s speaker_SPEAKER_00
start=25.0s stop=25.5s speaker_SPEAKER_03
start=25.5s stop=25.9s speaker_SPEAKER_02
start=25.9s stop=25.9s speaker_SPEAKER_03
start=25.9s stop=25.9s speaker_SPEAKER_02
start=26.3s stop=26.3s speaker_SPEAKER_00
start=26.3s stop=32.0s 

### 1.1.1.1 Save the CSV file for the Selected Speaker
After printing the list of speakers, it is necessary to identify the speaker corresponding to the video under analysis. In this case, this step was performed manually by comparing the speaker annotations until they matched the speech of the subject in the video. This process was necessary because, in the dataset used, there are three individuals speaking within the same audio file.

Once the corresponding speaker is identified, the following code allows saving the speech annotations by requiring three parameters:
1. The path to save the CSV file.
2. The identified speaker whose annotations need to be saved.
3. A diarization object returned by the Pyannote model.

In [None]:
# Function to convert seconds into hh:mm:ss.sss format
def seconds_to_timestamp(seconds):
    t = timedelta(seconds=seconds)
    total_seconds = int(t.total_seconds())
    milliseconds = int((t.total_seconds() - total_seconds) * 1000)
    hours, remainder = divmod(total_seconds, 3600) # 3600 seconds = 1 hour
    minutes, seconds = divmod(remainder, 60) # 60 seconds = 1 minute
    return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}"

# Function to save the annotations of a specific speaker to a CSV file in ELAN format
def save_annotation_speech(file_csv,selected_speaker,diarization):
    # Open a CSV file to write the results
    with open(file_csv, "w", newline="") as csvfile:
        csvwriter = csv.writer(csvfile)
        # Write the header
        csvwriter.writerow(["Event", "Nan", "Start_time", "Start_seconds", "End_time", "End_seconds", "Duration", "Duration_seconds", "Tools/Speaker"])
        # Iterate through the diarization results
        for turn, _, speaker in diarization.itertracks(yield_label=True):
            start_time = seconds_to_timestamp(turn.start)
            end_time = seconds_to_timestamp(turn.end)
            duration = turn.end - turn.start
            
            if speaker == selected_speaker and duration > 1: 
                 # Write the row to the CSV file
                csvwriter.writerow([
                    "speech", "Nan",
                    start_time, f"{turn.start:.3f}",
                    end_time, f"{turn.end:.3f}",
                    seconds_to_timestamp(duration), f"{duration:.3f}",
                    f"{speaker}"
                ])

save_annotation_speech("Speech_annotations.csv", "SPEAKER_00", diarization)

### 1.1.1.2 Removal of Overlapping Speech Annotations
Finally, the speech annotations are compared with the existing ones, and any speech annotations overlapping with the existing ones are removed. Additionally, the annotations are merged and concatenated with the other existing annotations into a single CSV file.

In [None]:
# Add speech annotations to the file of existing annotations, removing speech annotations that overlap with the existing ones
import pandas as pd
from datetime import timedelta
#  Function to check overlapping between two intervals
def is_overlapping(start1, end1, start2, end2):
    return max(start1, start2) < min(end1, end2)                         

# Function to convert seconds into hh:mm:ss.sss format
def seconds_to_timestamp(seconds):
    t = timedelta(seconds=seconds)
    total_seconds = int(t.total_seconds())
    milliseconds = int((t.total_seconds() - total_seconds) * 1000)
    hours, remainder = divmod(total_seconds, 3600) # 3600 seconds = 1 hour
    minutes, seconds = divmod(remainder, 60) # 60 seconds = 1 minute
    return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}"

def add_speech_annotations(existing_annotation,speech_annotation, output_csv):
    # Load existing annotations
    existing_df = pd.read_csv(existing_annotation)
    existing_interval = []
    existing_df["Start_seconds"] = pd.to_numeric(existing_df["Start_seconds"], errors='coerce') # Convert to float
    existing_df["End_seconds"] = pd.to_numeric(existing_df["End_seconds"], errors='coerce')
    for _, row in existing_df.iterrows():
        existing_interval.append((row["Start_seconds"], row["End_seconds"]))

    # Load speech annotations
    speech_df = pd.read_csv(speech_annotation)
    speech_interval = []
    speech_df["Start_seconds"] = pd.to_numeric(speech_df["Start_seconds"], errors='coerce')
    speech_df["End_seconds"] = pd.to_numeric(speech_df["End_seconds"], errors='coerce')
    for _, row in speech_df.iterrows():
        speech_interval.append((row["Start_seconds"], row["End_seconds"]))

    # Filter speech annotations that do not overlap with existing annotations
    filtered_speech = []
    for start, end in speech_interval:
        overlap_found = False
        start_check = start
        end_check = end
        for existing_start, existing_end in existing_interval:
            if is_overlapping(start_check, end_check, existing_start, existing_end):
                overlap_found = True
                break
        if not overlap_found:
            filtered_speech.append((start, end))

    # Add filtered speech annotations to the file with existing annotations
    with open(output_csv, "w") as csvfile:
        csvfile.write("Event,Nan,Start_time,Start_seconds,End_time,End_seconds,Duration,Duration_seconds,Tools/Speaker\n") # Save header to CSV
        for start, end in filtered_speech:
            csvfile.write(f'speech,,{seconds_to_timestamp(start)},{start:.3f},{seconds_to_timestamp(end)},{end:.3f},{seconds_to_timestamp(end - start)},{(end-start):.3f},""\n')
        for _, row in existing_df.iterrows():
            csvfile.write(f'{row["Event"]},,{row["Start_time"]},{row["Start_seconds"]},{row["End_time"]},{row["End_seconds"]},{row["Duration"]},{row["Duration_seconds"]},{row["Tools/Speaker"]}\n')

add_speech_annotations("Annotations_video.csv", "Speech_annotations.csv", "Annotations_video_Speech.csv")

### 1.1.2 Creating the "Other" Annotation
- Creation of "**other**" annotations (Before doing this, ensure you have a CSV file containing all the video annotations; this way, the created annotations will not overwrite the existing ones).
- The "other" annotation is created 1 second after the end of the current annotation, but only if its duration is between **1 and 3 seconds**, with a maximum of **50** annotations for video. 
    - The maximum number of annotations to save can be adjusted based on the dataset distribution. To achieve a balanced dataset, it is ideal to avoid having too many annotations for a single class and instead aim for a similar number across all classes.

In [42]:
import pandas as pd
from datetime import timedelta

# Function to convert seconds into hh:mm:ss.sss format
def seconds_to_timestamp(seconds):
    t = timedelta(seconds=seconds)
    total_seconds = int(t.total_seconds())
    milliseconds = int((t.total_seconds() - total_seconds) * 1000)
    hours, remainder = divmod(total_seconds, 3600) # 3600 seconds = 1 hour
    minutes, seconds = divmod(remainder, 60) # 60 seconds = 1 minute
    return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}"

# Function to add "other" annotations
def add_other_annotations(annotations_file, output_csv):
    #  Read the annotations
    df = pd.read_csv(annotations_file)
    df["Start_seconds"] = pd.to_numeric(df["Start_seconds"], errors='coerce')
    df["End_seconds"] = pd.to_numeric(df["End_seconds"], errors='coerce')
    print(f"Annotations data loaded with shape: {df.shape}")

    other_annotations = []
    other_count = 0
    # Iterate through annotations to identify gaps between end and start times
    for i in range(len(df) - 1): # Does not consider the last annotation (in case it coincides with the end of the video)
        current_end = df.iloc[i]["End_seconds"]
        next_start = df.iloc[i + 1]["Start_seconds"]
        
       # Calculate the gap duration, considering 1 second after the end of the current annotation
        start_other = current_end + 1
        gap_duration = next_start - start_other 

        # If the gap is at least 1 second, create "other" annotations
        if gap_duration >= 1:
            other_duration  = min(gap_duration, 3)  # Limit the duration to a maximum of 3 seconds
            other_end = start_other + other_duration 
            
            for j in range(len(df)):
                existing_start = df.iloc[j]["Start_seconds"]
                existing_end = df.iloc[j]["End_seconds"]
                
                # Check if the new "other" annotation overlaps with an existing annotation
                if start_other < existing_end and other_end > existing_start:
                    other_end = min(other_end , existing_start) # Truncate "other" before the annotation
                    other_duration = other_end - start_other
                    break

            if other_duration  < 1: # Skip if the duration is less than 1 second
                continue

            # Create the new "other" annotation
            other_annotation = {
                'Event': 'other',
                'Nan': '',
                'Start_time': seconds_to_timestamp(start_other),
                'Start_seconds': round(start_other, 3),
                'End_time': seconds_to_timestamp(other_end),
                'End_seconds': round(other_end, 3),
                'Duration': seconds_to_timestamp(other_duration),
                'Duration_seconds': round(other_duration, 3),
                'Tools/Speaker': ''
            }
            if other_count < 50: # Limit the number of "other" annotations to 50. TODO: Change this value based on the balance of the dataset
                other_count += 1
                other_annotations.append(other_annotation)
            else:
                break

    # Add the new "other" annotations to the original dataframe and sort by start time
    other_df = pd.DataFrame(other_annotations)
    combined_df = pd.concat([df, other_df], ignore_index=True)
    combined_df = combined_df.sort_values(by="Start_seconds").reset_index(drop=True)
    combined_df.to_csv(output_csv, index=False)
    print(f"Saved extended annotations with 'other' annotations to {output_csv}")

# Aggiungi le annotazioni "other"
add_other_annotations("Annotations_video_Speech.csv","Annotations_video_Speech&Other.csv")

Annotations data loaded with shape: (310, 9)
Saved extended annotations with 'other' annotations to Annotations_video_Speech&Other.csv


# 2: Data Extraction from Videos

To prepare the input data for the model, it is necessary to extract features from videos while considering annotations. The extracted features are divided into two main categories:

## 2.1 Keypoints
Keypoints represent the 3D coordinates of specific body points, selected in this project from facial, hand, and body landmarks.

### Normalization
To eliminate influences caused by camera framing, video resolution, or subject size, the keypoints undergo a normalization process:
- **Anchor-based Normalization**:  
  - Facial keypoints are normalized relative to the nose as a fixed reference point.  
  - Hand keypoints are normalized using the wrists as reference points.
- **Body Size-based Normalization**:  
  - Keypoints are normalized by dividing their coordinates based on the ratio between head height and torso width.

## 2.2 Descriptors
Descriptors represent the distances between specific keypoints. These are carefully chosen to provide the model with relevant information for distinguishing actions.

---

## 2.3 Feature Extraction Process
Features are extracted within the time intervals defined by video annotations. Annotations are padded with additional frames at the end to ensure that the total frame count matches the sequence length required by the model.

### Validation Check
Data is saved only if the annotations have more than 50% of frames where keypoints are successfully extracted. If not, both the data and the corresponding annotation are discarded. This precaution addresses situations where MediaPipe fails to extract keypoints due to occluded body parts, resulting in null data. This approach ensures that only reliable data is used for model training.

---

## 2.4 Augmentation
Features are extracted twice for each video:
1. Using the original video.
2. Using a horizontally flipped version of the video.

This augmentation is necessary because most subjects in the dataset predominantly use their right hand. Without flipping, the model may struggle to classify actions performed with the left hand. However, this step can be disabled if not required.

In [None]:
import cv2
import mediapipe as mp
import csv
import pandas as pd
import numpy as np

# Function to normalize keypoints
def normalize_landmarks(landmarks, nose, height_width_ratio):
    # List of selected face keypoints to extract
    face_keypoint_indices = [1, 159, 386, 78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 191, 80, 81, 82, 13, 312, 311, 310, 415,
                         474, 475, 476, 477, 469, 470, 471, 472, 33, 133, 362, 61, 199, 263, 291]
    keypoints = []
    nose_x, nose_y, nose_z= nose.x, nose.y,nose.z
    keypoints.append([float(nose_x/height_width_ratio), float(nose_y/height_width_ratio),float(nose_z/height_width_ratio), 1]) 
    for idx in face_keypoint_indices:
        if idx == 1:
            continue
        keyp = landmarks[idx]
        x, y,z = keyp.x, keyp.y,keyp.z
        keypoints.append([float((x - nose_x)/height_width_ratio), float((y - nose_y)/height_width_ratio),float((z - nose_z)/height_width_ratio),idx])
    return keypoints

# Function to extract keypoints from face, hands and pose
def extract_keypoints(face_landmarks, hand_landmarks, pose_landmarks):
    
    face_keypoints = []
    height_width_ratio = 0
    hand_keypoints = {
            'left_hand': [],
            'right_hand': []
    }
    if face_landmarks and pose_landmarks: # Skip frame where there are no face and pose landmarks
        nose = face_landmarks[1]
        # Calculate the height/width ratio useful for normalization
        left_shoulder = pose_landmarks.landmark[11]  
        right_shoulder = pose_landmarks.landmark[12] 
        # Calculate the euclidean distance between the shoulders
        shoulder_width = np.linalg.norm(np.array([left_shoulder.x, left_shoulder.y,left_shoulder.z]) - np.array([right_shoulder.x, right_shoulder.y,right_shoulder.z])) 
        left_eye = face_landmarks[33]
        right_eye = face_landmarks[263]
        chin = face_landmarks[199]
        eye_middle = (np.array([left_eye.x, left_eye.y,left_eye.z]) + np.array([right_eye.x, right_eye.y,right_eye.z])) / 2
        face_height = np.linalg.norm(np.array([eye_middle[0], eye_middle[1],eye_middle[2]]) - np.array([chin.x, chin.y,chin.z]))
        height_width_ratio = face_height / shoulder_width
        # Normalize the keypoints
        face_keypoints = normalize_landmarks(face_landmarks, nose, height_width_ratio)

        # Extract the left and right wrist landmarks from the pose landmarks
        left_wrist = pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.LEFT_WRIST]
        right_wrist = pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.RIGHT_WRIST]

        # Extract the hand landmarks and normalize them
        hand_keypoints = {
            'left_hand': [[float((lm.x-left_wrist.x)/height_width_ratio), float((lm.y-left_wrist.y)/height_width_ratio),float((lm.z - left_wrist.z)/height_width_ratio),index] for index,lm in enumerate(hand_landmarks.get('left_hand', []))],
            'right_hand': [[float((lm.x-right_wrist.x)/height_width_ratio), float((lm.y-right_wrist.y)/height_width_ratio),float((lm.z-right_wrist.z)/height_width_ratio),index] for index,lm in enumerate(hand_landmarks.get('right_hand', []))]
        }
    return face_keypoints, hand_keypoints, height_width_ratio

# Function to process the video and extract keypoints
def process_video(video_file, output_csv, annotation_intervals, flip_bool, window_size):
    cap = cv2.VideoCapture(video_file)
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # Create the output CSV file
    with open(output_csv, mode='w', newline='') as file:
        writer = csv.writer(file)
        # Header for the CSV file
        writer.writerow(['timestamp', 'face_keypoints', 'left_hand_keypoints', 'right_hand_keypoints','left_wrist','right_wrist', 
                         'mouth_polseSX_dist','mouth_polseDX_dist', 'lip_up_down_dist','mouth_left_right_dist','height_width_ratio','dist_nose_mouth_left',
                         'dist_nose_mouth_right','nose_polseSX_dist','nose_polseDX_dist','dist_nose_mouth_left2','dist_nose_mouth_right2','dist_mouth_eye_center',
                         'count','label'])
        
        # Set the minimum detection and tracking confidence for MediaPipe solutions
        MIN_DETECTION_CONFIDENCE = 0.5
        MIN_TRACKING_CONFIDENCE = 0.5

        # Initialize MediaPipe solutions for face mesh, hands and pose
        with mp.solutions.face_mesh.FaceMesh(refine_landmarks=True, min_detection_confidence=MIN_DETECTION_CONFIDENCE, min_tracking_confidence=MIN_TRACKING_CONFIDENCE) as face_mesh, \
             mp.solutions.hands.Hands(static_image_mode=False, max_num_hands=2, min_detection_confidence=MIN_DETECTION_CONFIDENCE) as hands, \
             mp.solutions.pose.Pose(min_detection_confidence=MIN_DETECTION_CONFIDENCE, min_tracking_confidence=MIN_TRACKING_CONFIDENCE) as pose:
            
            for start_time, end_time, event in annotation_intervals:
                frame_csv = []
                count_frame_good = 0
                # Calculate the start and end frames
                start_frame = int(start_time * fps)
                end_frame = int(end_time * fps)
                
                # Set the video to the starting frame of the interval of the annotation
                cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
                # Calculate the number of frames in the interval
                duration = (end_frame - start_frame) + 1
                
                if duration < window_size: #Padding to have window_size frame
                    end_frame = end_frame + (window_size - duration)
                elif duration % window_size != 0: #Padding to have a multiple of window_size frames
                    end_frame = end_frame + (window_size - (duration % window_size))
                
                count_frame = 1
                for frame_number in range(start_frame, end_frame + 1):
                    ret, frame = cap.read()
                    if not ret:
                        break
                    
                    if flip_bool == True: # Flip the frame horizontally if specified
                        frame = cv2.flip(frame,1)
                    # Calculate the current timestamp
                    timestamp = frame_number / fps
                    # Process the image
                    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                    # Extract face, hand and pose landmarks
                    face_mesh_results = face_mesh.process(image)
                    hands_results = hands.process(image)
                    pose_results = pose.process(image)

                    # Extract the face landmarks (if present)
                    face_landmarks = face_mesh_results.multi_face_landmarks[0].landmark \
                        if face_mesh_results.multi_face_landmarks else None

                    # Extract the hand landmarks (if present)
                    hand_landmarks = {}
                    if hands_results.multi_hand_landmarks:
                        for idx, hand_landmark in enumerate(hands_results.multi_hand_landmarks):
                            hand_type = 'left_hand' if hands_results.multi_handedness[idx].classification[0].label == 'Left' else 'right_hand'
                            hand_landmarks[hand_type] = hand_landmark.landmark

                    # Extract the pose landmarks (if present)  
                    pose_landmarks = pose_results.pose_landmarks if pose_results.pose_landmarks else None
                
                    # Extract normalized keypoints
                    face_keypoints, hand_keypoints, height_width_ratio = extract_keypoints(face_landmarks, hand_landmarks, pose_landmarks)

                    # Extract distances
                    mouth_polseSX_dist = 0
                    mouth_polseDX_dist = 0
                    lip_up_down_dist = 0
                    mouth_left_right_dist = 0
                    dist_nose_mouth_left = 0
                    dist_nose_mouth_right = 0
                    nose_polseSX_dist = 0
                    nose_polseDX_dist = 0
                    dist_nose_mouth_left2 = 0
                    dist_nose_mouth_right2 = 0
                    dist_mouth_eye_center = 0
                    left_wrist = None
                    right_wrist = None
                    
                    if pose_results.pose_landmarks:
                        # Get wrist landmarks (from Pose landmarks)
                        left_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.LEFT_WRIST]
                        left_wrist_point = np.array([left_wrist.x, left_wrist.y,left_wrist.z])
                        right_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.RIGHT_WRIST]
                        right_wrist_point = np.array([right_wrist.x, right_wrist.y,right_wrist.z])
                        
                        if face_mesh_results.multi_face_landmarks:
                            face_landmarks = face_mesh_results.multi_face_landmarks[0]
                            mouth_x = (face_landmarks.landmark[13].x + face_landmarks.landmark[14].x) / 2
                            mouth_y = (face_landmarks.landmark[13].y + face_landmarks.landmark[14].y) / 2
                            mouth_z = (face_landmarks.landmark[13].z + face_landmarks.landmark[14].z) / 2
                            mouth_point = np.array([mouth_x, mouth_y,mouth_z])
                            nose_point = np.array([face_landmarks.landmark[1].x , face_landmarks.landmark[1].y , face_landmarks.landmark[1].z ])
                            
                            # Distance between the mouth and wrists
                            mouth_polseSX_dist = np.linalg.norm(left_wrist_point - mouth_point)
                            mouth_polseDX_dist = np.linalg.norm(right_wrist_point - mouth_point)

                            # Distance between upper and lower lip
                            lip_up = np.array([face_landmarks.landmark[13].x, face_landmarks.landmark[13].y, face_landmarks.landmark[13].z])
                            lip_down = np.array([face_landmarks.landmark[14].x, face_landmarks.landmark[14].y, face_landmarks.landmark[14].z])
                            lip_up_down_dist = np.linalg.norm(lip_up - lip_down)

                            # Distance between the left and right corners of the mouth
                            mouth_left = np.array([face_landmarks.landmark[78].x, face_landmarks.landmark[78].y, face_landmarks.landmark[78].z])
                            mouth_right = np.array([face_landmarks.landmark[308].x, face_landmarks.landmark[308].y, face_landmarks.landmark[308].z])

                            # Distance between the nose and the corners of the mouth
                            dist_nose_mouth_left = np.linalg.norm(nose_point - mouth_left)
                            dist_nose_mouth_right = np.linalg.norm(nose_point - mouth_right)
                        
                            # Distance between the nose and the wrists
                            nose_polseSX_dist = np.linalg.norm(left_wrist_point - nose_point)
                            nose_polseDX_dist = np.linalg.norm(right_wrist_point - nose_point)
                            
                            # Set a specific point for the corners of the mouth (left and right)
                            mouth_left2 = np.array([face_landmarks.landmark[61].x, face_landmarks.landmark[61].y, face_landmarks.landmark[61].z])
                            mouth_right2 = np.array([face_landmarks.landmark[291].x, face_landmarks.landmark[291].y, face_landmarks.landmark[291].z])

                            # Distance between the nose and the specific corners of the mouth (left and right)
                            dist_nose_mouth_left2 = np.linalg.norm(nose_point - mouth_left2)
                            dist_nose_mouth_right2 = np.linalg.norm(nose_point - mouth_right2)
                    
                            # Distance between the specific corners of the mouth (left and right)
                            mouth_left_right_dist = np.linalg.norm(mouth_left2 - mouth_right2)

                            # Set a specific point for the eyes (left and right)
                            left_eye = np.array([face_landmarks.landmark[133].x , face_landmarks.landmark[133].y , face_landmarks.landmark[133].z ])
                            right_eye = np.array([face_landmarks.landmark[362].x , face_landmarks.landmark[362].y , face_landmarks.landmark[362].z])

                            # Distance vertical between the center of the eyes and the center of the mouth
                            center_eye = np.array((left_eye + right_eye)/2)
                            mouth_center = np.array((mouth_left2 + mouth_right2)/2)
                            dist_mouth_eye_center = np.linalg.norm(mouth_center - center_eye)
                            
                    if face_keypoints != []:
                        count_frame_good += 1

                    if height_width_ratio != 0:
                        left_wrist = [float(left_wrist.x/height_width_ratio), float(left_wrist.y/height_width_ratio),float(left_wrist.z/height_width_ratio)] if left_wrist else [0, 0, 0] 
                        right_wrist = [float(right_wrist.x/height_width_ratio), float(right_wrist.y/height_width_ratio),float(right_wrist.z/height_width_ratio)] if right_wrist else [0, 0, 0]
                    else:
                        left_wrist = [left_wrist.x, left_wrist.y,left_wrist.z] if left_wrist else [0, 0, 0] 
                        right_wrist = [right_wrist.x, right_wrist.y,right_wrist.z] if right_wrist else [0, 0, 0]
                            
                    frame_csv.append([timestamp, face_keypoints, hand_keypoints['left_hand'], hand_keypoints['right_hand'],left_wrist, right_wrist,mouth_polseSX_dist,mouth_polseDX_dist,lip_up_down_dist,mouth_left_right_dist,height_width_ratio,dist_nose_mouth_left,dist_nose_mouth_right,nose_polseSX_dist,nose_polseDX_dist,dist_nose_mouth_left2,dist_nose_mouth_right2,dist_mouth_eye_center,count_frame,event])
                    count_frame += 1
                # Save only if the majority of frames are good
                if count_frame_good >= (len(frame_csv) / 2): 
                    for frame in frame_csv:
                        writer.writerow(frame)
    cap.release()

# Main function to extract data from the video 
def extract_data(annotations_path, video_path, output_csv):
    window_size = 21 # TODO: Change in base of the sequence length of the model
    annotations_df = pd.read_csv(annotations_path)
    annotation_intervals = []
    speech_count = 0
    for _, row in annotations_df.iterrows():
        # Some conditions to filter the annotations, optional if the csv file of annotations is already filtered and cleaned 
        if row['Event'] == 'speech': 
            speech_count += 1
            if speech_count > 50: # TODO: Change this value based on the balance of the dataset
                continue
        if row['Event'] == 'mouth_open': # In this case, this annotation class is ignored
            continue
        annotation_intervals.append((row['Start_seconds'], row['End_seconds'], row['Event']))
    
    # Process the video
    print(f"Processing video: {video_path}")
    process_video(video_path, output_csv, annotation_intervals, False, window_size)

    # Process the video with flipped frames
    print(f"Processing video_flip: {video_path}")
    process_video(video_path, output_csv.replace(".csv", "_flip.csv"), annotation_intervals, True, window_size)

extract_data("Annotations_video_Speech&Other.csv", "01_1.mp4", "01_keypoints.csv")

Processing video: 01_1.mp4


# 3: LSTM Model

In this project, an LSTM model was used for sequential data processing. This model is ideal due to its ability to preserve information over time intervals, making it particularly suitable for the classification of actions that unfold over time.


In [55]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import mediapipe as mp
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, LayerNormalization
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score, confusion_matrix
from tensorflow.keras.models import load_model
import keras_tuner as kt
from tensorflow.keras.optimizers import Adam

## 3.1: Load the Data to Provide as Input to the Model

First, the data previously extracted from the CSV files must be loaded to begin training the model.  
The code shown applies to a single video.

In [None]:
# Function to read the keypoints from the CSV file
def read_keypoints(file_path):
    df = pd.read_csv(file_path)    
    # Convert strings to lists for columns with non-empty values
    df["face_keypoints"] = df["face_keypoints"].apply(lambda x: eval(x) if isinstance(x, str) else []) 
    df["left_hand_keypoints"] = df["left_hand_keypoints"].apply(lambda x: eval(x) if isinstance(x, str) else [])
    df["right_hand_keypoints"] = df["right_hand_keypoints"].apply(lambda x: eval(x) if isinstance(x, str) else [])
    df["left_wrist"] = df["left_wrist"].apply(lambda x: eval(x) if isinstance(x, str) else [])
    df["right_wrist"] = df["right_wrist"].apply(lambda x: eval(x) if isinstance(x, str) else [])
    
    # Convert the columns to numeric format
    columns = ["timestamp", "count", "mouth_polseSX_dist", "mouth_polseDX_dist", "lip_up_down_dist", 
        "mouth_left_right_dist", "height_width_ratio", "dist_nose_mouth_left", 
        "dist_nose_mouth_right", "nose_polseSX_dist", "nose_polseDX_dist", 
        "dist_nose_mouth_left2", "dist_nose_mouth_right2", "dist_mouth_eye_center"]    
    for col in columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    print(f"Keypoints data loaded with shape: {df.shape}")
    return df

# Function to map annotations to keypoints extracted from the video
def assign_labels_to_keypoints(keypoints_df, window_size):
    sequences = []
    labels = []
    relevant_keypoints = []
    action_label=""
    face_keypoint_indices = [1, 159, 386, 78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 191, 80, 81, 82, 13, 312, 311, 310, 415, 
                             474, 475, 476, 477, 469, 470, 471, 472, 33, 133, 362, 61, 199, 263, 291]
    hand_keypoint_indices = list(range(21))  # 21 keypoints per hand
    
    for _, row in keypoints_df.iterrows():
        if (row["label"] == "mouth_open"): # Skip this annotation class, optional if the csv file of annotations is already filtered and cleaned
            continue

        # Add face keypoints
        face_keypoint=[]
        if len(row["face_keypoints"]) == 0: # If there are no face keypoints, add null coordinates (0,0,0) for each keypoint
            for idx in face_keypoint_indices:
                face_keypoint.append([0, 0,0])
        else: # If there are face keypoints, extract them, of some of them are missing, add null coordinates (0,0,0) 
            keyp_found=False
            for idx in face_keypoint_indices:
                for keypoint in row["face_keypoints"]:
                    if keypoint[3] == idx:
                        face_keypoint.append(keypoint[0:3])# 
                        keyp_found=True
                        break
                if not keyp_found:
                    face_keypoint.append([0, 0,0])
        
        # Flatten the face keypoints
        face_keypoints_flat = np.array(face_keypoint,dtype='float32').ravel()    

        left_hand=[]
        right_hand=[]
        
        # Add left hand and right hand keypoints, if there are no keypoints, add null coordinates (0,0,0) for each keypoint
        if len(row["left_hand_keypoints"]) == 0:
            for idx in hand_keypoint_indices:
                left_hand.append([0, 0,0])
        else:
            keyp_found=False
            for idx in hand_keypoint_indices:
                for keypoint in row["left_hand_keypoints"]:
                    if keypoint[3] == idx:
                        left_hand.append(keypoint[0:3])
                        keyp_found=True
                        break
                if not keyp_found:
                    left_hand.append([0, 0,0])

        if len(row["right_hand_keypoints"]) == 0:
            for idx in hand_keypoint_indices:
                right_hand.append([0, 0,0])
        else:
            keyp_found=False
            for idx in hand_keypoint_indices:
                for keypoint in row["right_hand_keypoints"]:
                    if keypoint[3] == idx:
                        right_hand.append(keypoint[0:3])
                        keyp_found=True
                        break
                if not keyp_found:
                    right_hand.append([0, 0,0])

        # Flatten the hand keypoints
        left_hand_flat=np.array(left_hand,dtype='float32').ravel()
        right_hand_flat=np.array(right_hand,dtype='float32').ravel()

        # Flatten the wrist keypoints
        left_wrist = np.array(row["left_wrist"],dtype='float32').ravel()
        right_wrist = np.array(row["right_wrist"],dtype='float32').ravel()
        
        # Save the label of the annotation for the first frame
        if row["count"] % window_size == 1:
            action_label = row["label"]


        # Extract distances and flatten them     
        distances = np.array([row["mouth_polseSX_dist"], row["mouth_polseDX_dist"], row["lip_up_down_dist"], 
                              row["mouth_left_right_dist"], row["dist_nose_mouth_left"], row["dist_nose_mouth_right"],
                              row["nose_polseSX_dist"], row["nose_polseDX_dist"], row["dist_nose_mouth_left2"],row["dist_nose_mouth_right2"],
                              row["dist_mouth_eye_center"]],dtype='float32').ravel() 
        
        # Concatenate keypoints and distances
        combined_data = np.concatenate([
            face_keypoints_flat,
            left_hand_flat,
            right_hand_flat,
            left_wrist,
            right_wrist,
            distances
        ],dtype='float32')
        relevant_keypoints.append(combined_data)
        
        # Save the sequence and the label when the window size is reached
        if row["count"] % window_size == 0:
            sequences.append(relevant_keypoints)
            labels.append(action_label)
            relevant_keypoints = []
            action_label=""
    
    print(f"Sequences: {len(sequences)}, Labels: {len(labels)}")
    return np.array(sequences, dtype='float32'), labels


# Function to prepare data for LSTM model
def prepare_data(keypoints_file,window_size):
    keypoints_df = read_keypoints(keypoints_file) # Read from the CSV file
    sequences,labels = assign_labels_to_keypoints(keypoints_df,window_size) # Assign labels to keypoints
    label_mapping = {"food_to_mouth": 0, "drink_to_mouth": 1, "speech": 2, "other": 3} # Map labels to integers
    labels = np.array([label_mapping[label] for label in labels])
    labels = to_categorical(labels)  # Convert labels to categorical format
    return sequences, labels

# Main function to get data from the CSV files
def get_data(keypoints_csv, keypoints_csv_flip, window_size):    
    all_X = []
    all_y = []

    # Process video
    print(f"Processing video: {keypoints_csv}")
    X, y = prepare_data(keypoints_csv, window_size) 
    all_X.append(X)
    all_y.append(y)

    # Process flipped video
    print(f"Processing video flipped: {keypoints_csv_flip}")
    X, y = prepare_data(keypoints_csv_flip, window_size) 
    all_X.append(X)
    all_y.append(y)

    X = np.concatenate(all_X, axis=0)
    y = np.concatenate(all_y, axis=0)
    return X, y

# Execute the function to get data
X,y = get_data("01_keypoints.csv","01_keypoints_flip.csv", 21)

print(f"X shape: {X.shape}, y shape: {y.shape}")


Processing video: 01_keypoints.csv
Keypoints data loaded with shape: (12621, 20)
Sequences: 601, Labels: 601
Processing video flipped: 01_keypoints_flip.csv
Keypoints data loaded with shape: (12201, 20)
Sequences: 581, Labels: 581
X shape: (1182, 21, 257), y shape: (1182, 4)


## 3.2: LSTM Model

Once the data is loaded, the training of the LSTM model begins, which follows the architecture outlined below.


In [None]:
# Function to build the LSTM model
def build_model(num_classes):
    model = Sequential()
    model.add(LSTM(128, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LayerNormalization())
    model.add(LSTM(48))
    model.add(Dropout(0.30000000000000004))
    model.add(LayerNormalization())
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(optimizer=Adam(learning_rate=0.00039045908482525734), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Build the model
model = build_model(y.shape[1])

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) 

# Train the model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test))

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test loss: {loss}')
print(f'Test accuracy: {accuracy}')

# Save the model
model.save('lstm_model.h5')
print("Model saved as 'lstm_model.h5'")

In [None]:
# Model evaluation and printing of confusion matrix and classification report
y_pred = model.predict(X_test)
y_pred_classes = y_pred.argmax(axis=1)
y_test_classes = y_test.argmax(axis=1)

# Calculate F-score
f_score = f1_score(y_test_classes, y_pred_classes, average='weighted')
print(f"F-score: {f_score}")

# Print a complete classification report
report = classification_report(y_test_classes, y_pred_classes, target_names=['food_to_mouth', 'drink_to_mouth','speech','other'])
print(report)

# Print the confusion matrix
conf_matrix = confusion_matrix(y_test_classes, y_pred_classes)

plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=['food_to_mouth', 'drink_to_mouth','speech','other'], yticklabels=['food_to_mouth', 'drink_to_mouth','speech','other'])
plt.title("Confusion Matrix")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.show()

## 3.2.1 (Optional) Fine-Tuning with KerasTuner (Hyperband)
To optimize hyperparameters such as LSTM units, dropout rates, and the learning rate, you can use KerasTuner. With the "Hyperband" algorithm, KerasTuner can explore multiple configurations and return the best one.

In [None]:
# Fine-tuning with Hyperband

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) 

num_classes = y.shape[1]  # Number of classes in the dataset

# Function to build the LSTM model
def build_model(hp):
    model = Sequential()
    model.add(LSTM(hp.Int('units_lstm_1', min_value=32, max_value=128, step=32), return_sequences=True))
    model.add(Dropout(hp.Float('dropout_1', min_value=0.2, max_value=0.5, step=0.1)))  # Tasso di dropout variabile
    model.add(LayerNormalization()) 
    model.add(LSTM(hp.Int('units_lstm_2', min_value=16, max_value=64, step=16)))  # Secondo layer LSTM
    model.add(Dropout(hp.Float('dropout_2', min_value=0.2, max_value=0.5, step=0.1)))  # Secondo dropout
    model.add(LayerNormalization()) 
    model.add(Dense(num_classes, activation='softmax'))
    # Ottimizzatore con learning rate variabile
    model.compile(optimizer=Adam(learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG', default=0.005)),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

# Define the tuner
def run_tuner(x_train, x_test, y_train, y_test):
    tuner = kt.Hyperband(
        build_model,  # Function to build the model
        objective='accuracy', # Metric to optimize
        max_epochs=50,  # Maximum epochs for each trial
        factor=3,  # Factor for Hyperband
        directory = 'tuner_results',  # Directory to save results
        project_name='lstm_tuning',  # Project name
        overwrite=True  # Overwrite previous results
    )
    
    # Execute the tuning process
    tuner.search(x_train, y_train, epochs=50, validation_data=(x_test, y_test)) 

    # Get the best hyperparameters
    best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]
    print(f"Best hyperparameters: {best_hyperparameters.values}")

    # Build the model with the best hyperparameters
    best_model = tuner.hypermodel.build(best_hyperparameters)
    best_model.fit(x_train, y_train, epochs=10, validation_data=(x_test, y_test))

    return best_model

# Run the tuner and get the best model
best_model = run_tuner(X_train, X_test, y_train, y_test)


# 5 Using the Model in Webcam/Video
Once the model is trained, it can be utilized for real-time video processing or directly with a webcam feed.

In [None]:
model = load_model('lstm_model.h5')
# Function to extract keypoints from face, hands and pose
def extract_keypoints(face_results, hands_results, pose_results):
    keypoints = []
    face_keypoint_indices = [1, 159, 386, 78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 191, 80, 81, 82, 13, 312, 311, 310, 415, 
                             474, 475, 476, 477, 469, 470, 471, 472, 33, 133, 362, 61, 199, 263, 291]
    hand_keypoint_indices = list(range(21))  # 21 keypoints per hand
    left_hand_keypoints, right_hand_keypoints = [], []  
    height_width_ratio = 0

    # Extract and normalize the keypoints 
    if face_results.multi_face_landmarks and pose_results.pose_landmarks: # Skip frame where there are no face and pose landmarks
        face_landmarks = face_results.multi_face_landmarks[0].landmark
        pose_landmarks = pose_results.pose_landmarks
        nose = face_landmarks[1]
        # Calculate the height/width ratio useful for normalization
        left_shoulder = pose_landmarks.landmark[11]  
        right_shoulder = pose_landmarks.landmark[12] 
        # Calculate the euclidean distance between the shoulders
        shoulder_width = np.linalg.norm(np.array([left_shoulder.x, left_shoulder.y,left_shoulder.z]) - np.array([right_shoulder.x, right_shoulder.y,right_shoulder.z])) 
        left_eye = face_landmarks[33]
        right_eye = face_landmarks[263]
        chin = face_landmarks[199]
        eye_middle = (np.array([left_eye.x, left_eye.y,left_eye.z]) + np.array([right_eye.x, right_eye.y,right_eye.z])) / 2
        face_height = np.linalg.norm(np.array([eye_middle[0], eye_middle[1],eye_middle[2]]) - np.array([chin.x, chin.y,chin.z]))
        height_width_ratio = face_height / shoulder_width
        # Extract the left and right wrist landmarks from the pose landmarks
        left_wrist = pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.LEFT_WRIST]
        right_wrist = pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.RIGHT_WRIST]
        
        # Extract the face keypoints
        for idx in face_keypoint_indices:
            if idx == 1:
                keypoints.append([float(nose.x/height_width_ratio),float(nose.y/height_width_ratio),float(nose.z/height_width_ratio), idx])
            keyp = face_landmarks[idx]
            keypoints.append([float((keyp.x - nose.x)/height_width_ratio), float((keyp.y - nose.y)/height_width_ratio),float((keyp.z - nose.z)/height_width_ratio),idx])
        # Extract the hand landmarks (if present)
        if hands_results.multi_hand_landmarks:
            for idx, hand_landmarks in enumerate(hands_results.multi_hand_landmarks):
                hand_type = hands_results.multi_handedness[idx].classification[0].label
                if hand_type == "Left":
                    left_hand_keypoints = [[float((lm.x-left_wrist.x)/height_width_ratio), float((lm.y-left_wrist.y)/height_width_ratio),float((lm.z - left_wrist.z)/height_width_ratio),index] for index,lm in enumerate(hand_landmarks.landmark)]
                elif hand_type == "Right":
                    right_hand_keypoints = [[float((lm.x-right_wrist.x)/height_width_ratio), float((lm.y-right_wrist.y)/height_width_ratio),float((lm.z - right_wrist.z)/height_width_ratio),index] for index,lm in enumerate(hand_landmarks.landmark)]
    
    # Save the keypoints of the face, if there are no keypoints, add null coordinates (0,0,0) for each keypoint
    face_keypoint=[]
    if len(keypoints) == 0:
        for idx in face_keypoint_indices:
            face_keypoint.append([0, 0,0])
    else:
        keyp_found=False
        for idx in face_keypoint_indices:
            for keypoint in keypoints:
                if keypoint[3] == idx:
                    face_keypoint.append(keypoint[0:3])
                    keyp_found=True
                    break
            if not keyp_found:
                face_keypoint.append([0, 0, 0])

    # Flatten the face keypoints    
    face_keypoints_flat = np.array(
        face_keypoint,dtype='float32'
    ).ravel()  

    # Save the left and right hand keypoints, if there are no keypoints, add null coordinates (0,0,0) for each keypoint
    left_hand=[]
    right_hand=[]
    if len(left_hand_keypoints) == 0:
        for idx in hand_keypoint_indices:
            left_hand.append([0, 0,0])
    else:
        keyp_found=False
        for idx in hand_keypoint_indices:
            for keypoint in left_hand_keypoints:
                if keypoint[3] == idx:
                    left_hand.append(keypoint[0:3])
                    keyp_found=True
                    break
            if not keyp_found:
                left_hand.append([0, 0,0])

    if len(right_hand_keypoints) == 0:
        for idx in hand_keypoint_indices:
            right_hand.append([0, 0,0])
    else:
        keyp_found=False
        for idx in hand_keypoint_indices:
            for keypoint in right_hand_keypoints:
                if keypoint[3] == idx:
                    right_hand.append(keypoint[0:3])
                    keyp_found=True
                    break
            if not keyp_found:
                right_hand.append([0, 0, 0])
    
    # Flatten the hand keypoints
    left_hand_flat = np.array(
        left_hand,dtype='float32'
    ).ravel()

    right_hand_flat  = np.array(
        right_hand,dtype='float32'
    ).ravel()
    
    # Extract distances
    mouth_polseSX_dist = 0
    mouth_polseDX_dist = 0
    lip_up_down_dist = 0
    mouth_left_right_dist = 0
    dist_nose_mouth_left = 0
    dist_nose_mouth_right = 0
    nose_polseSX_dist = 0
    nose_polseDX_dist = 0
    dist_nose_mouth_left2 = 0
    dist_nose_mouth_right2 = 0
    dist_mouth_eye_center = 0
    left_wrist = None
    right_wrist = None
    
    if pose_results.pose_landmarks:
        # Get wrist landmarks (from Pose landmarks)
        left_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.LEFT_WRIST]
        left_wrist_point = np.array([left_wrist.x, left_wrist.y, left_wrist.z])
        right_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.RIGHT_WRIST]
        right_wrist_point = np.array([right_wrist.x, right_wrist.y, right_wrist.z])
        if face_results.multi_face_landmarks:
            face_landmarks = face_results.multi_face_landmarks[0]
            mouth_x = (face_landmarks.landmark[13].x + face_landmarks.landmark[14].x) / 2
            mouth_y = (face_landmarks.landmark[13].y + face_landmarks.landmark[14].y) / 2
            mouth_z = (face_landmarks.landmark[13].z + face_landmarks.landmark[14].z) / 2
            mouth_point = np.array([mouth_x, mouth_y, mouth_z])
            nose_point = np.array([face_landmarks.landmark[1].x , face_landmarks.landmark[1].y , face_landmarks.landmark[1].z ])

            # Distance between the mouth and wrists
            mouth_polseSX_dist = np.linalg.norm(left_wrist_point - mouth_point)
            mouth_polseDX_dist = np.linalg.norm(right_wrist_point - mouth_point)

            # Distance between upper and lower lip
            lip_up = np.array([face_landmarks.landmark[13].x, face_landmarks.landmark[13].y, face_landmarks.landmark[13].z])
            lip_down = np.array([face_landmarks.landmark[14].x, face_landmarks.landmark[14].y, face_landmarks.landmark[14].z])
            lip_up_down_dist = np.linalg.norm(lip_up - lip_down)

            # Distance between the left and right corners of the mouth
            mouth_left = np.array([face_landmarks.landmark[78].x, face_landmarks.landmark[78].y, face_landmarks.landmark[78].z])
            mouth_right = np.array([face_landmarks.landmark[308].x, face_landmarks.landmark[308].y, face_landmarks.landmark[308].z])

            # Distance between the nose and the corners of the mouth
            dist_nose_mouth_left = np.linalg.norm(nose_point - mouth_left)
            dist_nose_mouth_right = np.linalg.norm(nose_point - mouth_right)
            
            # Distance between the nose and the wrists
            nose_polseSX_dist = np.linalg.norm(left_wrist_point - nose_point)
            nose_polseDX_dist = np.linalg.norm(right_wrist_point - nose_point)
            
            # Set a specific point for the corners of the mouth (left and right)    
            mouth_left2 = np.array([face_landmarks.landmark[61].x, face_landmarks.landmark[61].y, face_landmarks.landmark[61].z])
            mouth_right2 = np.array([face_landmarks.landmark[291].x, face_landmarks.landmark[291].y, face_landmarks.landmark[291].z])

            # Distance between the nose and the specific corners of the mouth (left and right)
            dist_nose_mouth_left2 = np.linalg.norm(nose_point - mouth_left2)
            dist_nose_mouth_right2 = np.linalg.norm(nose_point - mouth_right2)

            # Distance between the specific corners of the mouth (left and right)
            mouth_left_right_dist = np.linalg.norm(mouth_left2 - mouth_right2)

            # Set a specific point for the eyes (left and right)
            left_eye2 = np.array([face_landmarks.landmark[133].x , face_landmarks.landmark[133].y , face_landmarks.landmark[133].z ])
            right_eye2 = np.array([face_landmarks.landmark[362].x , face_landmarks.landmark[362].y , face_landmarks.landmark[362].z])

            # Distance vertical between the center of the eyes and the center of the mouth
            center_eye = np.array((left_eye2 + right_eye2)/2)
            mouth_center = np.array((mouth_left2 + mouth_right2)/2)
            dist_mouth_eye_center = np.linalg.norm(mouth_center - center_eye)

    # Save the distances in a numpy array
    distances= np.array([mouth_polseSX_dist, mouth_polseDX_dist, lip_up_down_dist, 
                         mouth_left_right_dist,dist_nose_mouth_left,dist_nose_mouth_right,
                         nose_polseSX_dist,nose_polseDX_dist,dist_nose_mouth_left2,dist_nose_mouth_right2,
                         dist_mouth_eye_center],dtype='float32').ravel()

    left_wrist = None
    right_wrist = None
    if pose_results.pose_landmarks:
        # Get wrist landmarks (from Pose landmarks)
        left_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.LEFT_WRIST]
        right_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.RIGHT_WRIST]
    if height_width_ratio != 0:
        left_wrist = [float(left_wrist.x/height_width_ratio), float(left_wrist.y/height_width_ratio),float(left_wrist.z/height_width_ratio)] if left_wrist else [0, 0, 0] 
        right_wrist = [float(right_wrist.x/height_width_ratio), float(right_wrist.y/height_width_ratio),float(right_wrist.z/height_width_ratio)] if right_wrist else [0, 0, 0]
    else:
        left_wrist = np.array([left_wrist.x, left_wrist.y, left_wrist.z] if left_wrist else [0, 0,0],dtype='float32').ravel()
        right_wrist = np.array([right_wrist.x, right_wrist.y, right_wrist.z] if right_wrist else [0, 0,0],dtype='float32').ravel()
        
    
    # Save the keypoints and distances extracted
    combined_data = np.concatenate([
        face_keypoints_flat,
        left_hand_flat,
        right_hand_flat,
        left_wrist,
        right_wrist,
        distances
    ])
    return np.array(combined_data)

# Function to predict the action using the trained model
def predict_action(sequence):
    sequence = np.expand_dims(sequence, axis=0)
    prediction = model.predict(sequence)
    action_probabilities = prediction[0]
    predicted_class = np.argmax(action_probabilities)
    actions = ['food_to_mouth', 'drink_to_mouth','speech','other'] 
    return actions[predicted_class]

# List of keypoint indices for face landmarks
face_keypoint_indices = [1, 159, 386, 78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 191, 80, 81, 82, 13, 312, 311, 310, 415,
                         474, 475, 476, 477, 469, 470, 471, 472, 33, 133, 362, 61, 199, 263, 291]

MIN_DETECTION_CONFIDENCE = 0.5 
MIN_TRACKING_CONFIDENCE = 0.5 
window_size = 21 #TODO: Change in base of the sequence length of the model

# # Initialize MediaPipe solutions for face mesh, hands and pose
face_mesh = mp.solutions.face_mesh.FaceMesh(refine_landmarks=True, min_detection_confidence=MIN_DETECTION_CONFIDENCE, min_tracking_confidence=MIN_TRACKING_CONFIDENCE)
hands = mp.solutions.hands.Hands(static_image_mode=False, max_num_hands=2, min_detection_confidence=MIN_DETECTION_CONFIDENCE)
pose = mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=MIN_DETECTION_CONFIDENCE, min_tracking_confidence=MIN_TRACKING_CONFIDENCE)
sequence = []
video_path = "video3_dXR.avi" 
# Open webcam/video 
# cap = cv2.VideoCapture(0)  # Webcam
cap = cv2.VideoCapture(video_path) # Video file

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    face_results = face_mesh.process(rgb_frame)
    hands_results = hands.process(rgb_frame)
    pose_results = pose.process(rgb_frame)
    
    # Extract keypoints from face, hands and pose
    keypoints = extract_keypoints(face_results, hands_results, pose_results)

    #Optional: Draw the keypoints used for the prediction on the frame
    if keypoints is not None:    
        # Draw the face landmarks on the frame
        if face_results.multi_face_landmarks:
            landmarks = face_results.multi_face_landmarks[0].landmark
            for idx in face_keypoint_indices:
                landmark = landmarks[idx]
                x = int(landmark.x * frame.shape[1])
                y = int(landmark.y * frame.shape[0])
                cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)  # Cerchio verde
                
                    
        # Draw the hand landmarks on the frame
        if hands_results.multi_hand_landmarks:
            for idx, hand_landmarks in enumerate(hands_results.multi_hand_landmarks):
                for landmark in hand_landmarks.landmark:
                    x = int(landmark.x * frame.shape[1])
                    y = int(landmark.y * frame.shape[0])
                    cv2.circle(frame, (x, y), 2, (255, 0, 0), -1)

        if pose_results.pose_landmarks:
            # Draw the pose landmarks on the frame
            left_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.LEFT_WRIST]
            left_wrist_point = (int(left_wrist.x * frame.shape[1]), int(left_wrist.y * frame.shape[0]))
            right_wrist = pose_results.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.RIGHT_WRIST]
            right_wrist_point = (int(right_wrist.x * frame.shape[1]), int(right_wrist.y * frame.shape[0]))
            cv2.circle(frame, left_wrist_point, 2, (255, 0, 0), -1)
            cv2.circle(frame, right_wrist_point, 2, (255, 0, 0), -1)

    # Predict the action using the model
    if keypoints is not None:
        sequence.append(keypoints)
        
        # Limit the sequence length to the specified length
        if len(sequence) > window_size:
            sequence.pop(0)

        # Predict the action when the sequence is complete
        if len(sequence) == window_size:
            action = predict_action(sequence)
            action_text = f"Azione predetta: {action}"
            cv2.putText(frame, action_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
    # Display the video
    cv2.imshow('Output', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'): # Press q to exit
        break

cap.release()
cv2.destroyAllWindows()



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 219ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2