# Using Body language to classify diferents actions

## Background

The aim of this project is to develop a system capable of classifying different actions carried out by people, using as input images or videos, which can be captured live via a webcam or provided as pre-recorded files.

The project was made possible according to the [proposed planning](../docs/planning.md)

## Measure of Success

We don't have any criteria for classifying the actions that weight a metric such as precision or recall, so we'll use the F1-Score instead

# 0.0 Imports

In [1]:
import tarfile
import zipfile
import pickle
import mediapipe as mp
import cv2
import pandas as pd
import numpy as np
from scipy.io import loadmat

from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline 
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import f1_score

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

## 0.1 Load data

- This data has an approximate quantity of 12 GB, to facilitate this we will provide intermediate tables with the appropriate treatments

In [2]:
image_archive_path = '../data/01_raw/mpii_human_pose_v1.tar.gz'
annotation_archive_path = '../data/01_raw/mpii_human_pose_v1_u12_2.zip'

with tarfile.open(image_archive_path, "r:gz") as tar:
    tar.extractall(path='../data/02_intermediate/')

with zipfile.ZipFile(annotation_archive_path, 'r') as zip_ref:
    zip_ref.extractall('../data/02_intermediate/')

In [42]:
annotation_path = '../data/02_intermediate/mpii_human_pose_v1_u12_2/mpii_human_pose_v1_u12_1.mat'

annotations = loadmat(annotation_path, squeeze_me=True)

# 1.0 Process images to cordinates

- Here I will browse through the annotations of the images to extract the information where for each image what action was taken.

In [68]:
release = annotations['RELEASE']

annolist = release['annolist'].item() if hasattr(release['annolist'], 'item') else release['annolist']
act = release['act'].item() if hasattr(release['act'], 'item') else release['act']

image_action_mapping = []

for idx, ann in enumerate(annolist):
    image_name = ann['image']['name'] if 'name' in ann['image'].dtype.names else "Unknown Image"
    
    if isinstance(act, np.ndarray) and idx < len(act) and 'act_name' in act[idx].dtype.names:
        action_name = act[idx]['act_name']
    else:
        action_name = "Unknown"
    
    image_action_mapping.append((image_name, action_name))

for mapping in image_action_mapping[:5]:
    print(f"Image: {mapping[0]}, Action: {mapping[1]}")

Image: 037454012.jpg, Action: []
Image: 095071431.jpg, Action: []
Image: 073199394.jpg, Action: []
Image: 059865848.jpg, Action: []
Image: 015601864.jpg, Action: curling


- Now I'm going to select images with only one action and choose 20 actions

In [78]:
df = pd.DataFrame(image_action_mapping, columns=['ImageName', 'Action'])

df_filtered = df[df['Action'] != "Unknown"]
df_cleaned = df_filtered[~df_filtered['Action'].str.contains(",| or ", regex=True)]

top_20_actions = df_cleaned['Action'].value_counts().head(20).index.tolist()

df_filtered = df_cleaned[df_cleaned['Action'].isin(top_20_actions)]

filtered_csv_path = '../data/03_primary/filtered_actions_top_20.csv'
df_filtered.to_csv(filtered_csv_path, index=False)

In [None]:
print(f"As 20 ações mais comuns são: {top_20_actions}")
print(df_filtered.head())

## 1.1 Extract landmarks

- Now for each image I'm going to use mediapipe to extract the poses and faces of the people.

In [107]:
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

def extract_landmarks(image_path):
    with mp_holistic.Holistic(static_image_mode=True) as holistic:
        image = cv2.imread(image_path)
        results = holistic.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        return results

columns = ['ImageName', 'Action']

max_pose_landmarks = 33
max_face_landmarks = 468

for i in range(max_pose_landmarks):
    columns.extend([f'pose_{i}_x', f'pose_{i}_y', f'pose_{i}_z', f'pose_{i}_visibility'])

for i in range(max_face_landmarks):
    columns.extend([f'face_{i}_x', f'face_{i}_y', f'face_{i}_z'])

df_landmarks = pd.DataFrame(columns=columns)

for _, row in df_filtered.iterrows():
    image_path = f"../data/02_intermediate/images/{row['ImageName']}"
    results = extract_landmarks(image_path)
    data = [row['ImageName'], row['Action']]

    if results.pose_landmarks:
        pose = results.pose_landmarks.landmark
        pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
    else:
        pose_row = [np.nan] * (max_pose_landmarks * 4)

    if results.face_landmarks:
        face = results.face_landmarks.landmark
        face_row = list(np.array([[landmark.x, landmark.y, landmark.z] for landmark in face]).flatten())
    else:
        face_row = [np.nan] * (max_face_landmarks * 3)

    row_data = data + pose_row + face_row
    df_landmarks = pd.concat([df_landmarks, pd.DataFrame([row_data], columns=columns)], ignore_index=True)

df_landmarks.to_csv('../data/04_feature/final_landmarks_with_actions.csv', index=False)

  df_landmarks = pd.concat([df_landmarks, pd.DataFrame([row_data], columns=columns)], ignore_index=True)


In [5]:
df_landmarks

Unnamed: 0,ImageName,Action,pose_0_x,pose_0_y,pose_0_z,pose_0_visibility,pose_1_x,pose_1_y,pose_1_z,pose_1_visibility,...,face_464_z,face_465_x,face_465_y,face_465_z,face_466_x,face_466_y,face_466_z,face_467_x,face_467_y,face_467_z
0,084922341.jpg,ballroom,0.477999,0.243427,-0.104067,0.997500,0.480529,0.218856,-0.105465,0.996411,...,,,,,,,,,,
1,065761289.jpg,ballroom,0.623431,0.388648,-0.107929,0.999994,0.626444,0.378512,-0.097112,0.999991,...,0.001516,0.624713,0.375394,0.000605,0.630487,0.375629,0.004073,0.631177,0.374741,0.004253
2,056830860.jpg,ballroom,0.420640,0.365611,0.003073,0.997565,0.425081,0.349444,0.013141,0.997440,...,,,,,,,,,,
3,009367477.jpg,ballroom,,,,,,,,,...,,,,,,,,,,
4,036771580.jpg,ballroom,0.264926,0.227335,-0.112517,0.998056,0.268532,0.214265,-0.109486,0.996739,...,-0.004300,0.264555,0.216310,-0.004516,0.273764,0.212818,-0.008668,0.274791,0.210975,-0.009200
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2596,025820296.jpg,resistance training,0.201862,0.143329,-0.007814,0.999397,0.194155,0.126794,-0.003160,0.999372,...,,,,,,,,,,
2597,097064125.jpg,resistance training,0.412948,0.522286,-0.326439,0.999448,0.413722,0.506519,-0.323732,0.999839,...,,,,,,,,,,
2598,059740789.jpg,resistance training,0.430848,0.540816,-0.193367,0.991907,0.432859,0.526751,-0.190976,0.971611,...,,,,,,,,,,
2599,000708647.jpg,resistance training,0.459628,0.382458,-0.084313,0.997350,0.459678,0.360366,-0.095997,0.997010,...,-0.003691,0.480774,0.390505,-0.003916,0.491623,0.389910,-0.009860,0.493047,0.388514,-0.010559


# 2.0 Cleaning data

- We have a lot of null data, so let's outline a cleaning approach

-> It's important to note that although there are several methods to clean the data, heavy processing becomes a bottleneck in real-time detection on webcams for example, so I'll simply treat the null data generically to prioritize performance.

In [6]:
cleaned_data = df_landmarks.copy()

In [7]:
cleaned_data.shape

(2601, 1538)

In [8]:
def cleaning_data(data):
    data = data.fillna(0)

    return data

cleaned_data = cleaning_data(cleaned_data)

In [9]:
cleaned_data.shape

(2601, 1538)

# 3.0 Prepare data to machine learning

In [10]:
X = cleaned_data.drop(["Action", "ImageName"], axis=1)
y = cleaned_data["Action"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 4.0 Machine Learning

In [11]:
pipelines = {
    'lr':make_pipeline(StandardScaler(), LogisticRegression()),
    'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
}

In [None]:
fit_models = {}
for algo, pipeline in pipelines.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algo] = model

In [22]:
label_encoder = LabelEncoder()
ss = StandardScaler()

# Data to sequential model
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

X_transformed_train = ss.fit_transform(X_train)
X_transformed_test = ss.fit_transform(X_test)

feature_names = X_train.columns.tolist()

In [35]:
model = Sequential([
    Flatten(input_shape=(len(feature_names),)),
    Dense(512, activation='relu'),
    Dropout(0.2),
    Dense(256, activation='relu'),
    Dropout(0.2),
    Dense(124, activation='relu'),
    Dropout(0.2),
    Dense(64, activation='relu'),
    Dropout(0.2),
    Dense(20, activation='softmax')
])

callbacks = [
    EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True),
    ModelCheckpoint(filepath='best_model.keras', monitor='val_loss', save_best_only=True)
]

model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(X_transformed_train, y_train_encoded, epochs=100, validation_data=(X_transformed_test, y_test_encoded), callbacks=callbacks)

Epoch 1/100
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - accuracy: 0.0682 - loss: 3.3775 - val_accuracy: 0.0902 - val_loss: 2.9696
Epoch 2/100
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.0667 - loss: 3.0778 - val_accuracy: 0.0979 - val_loss: 2.9393
Epoch 3/100
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.0783 - loss: 3.0228 - val_accuracy: 0.0921 - val_loss: 2.9450
Epoch 4/100
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.0798 - loss: 2.9970 - val_accuracy: 0.0653 - val_loss: 2.9646
Epoch 5/100
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.0848 - loss: 2.9748 - val_accuracy: 0.0825 - val_loss: 2.9248
Epoch 6/100
[1m65/65[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.0792 - loss: 2.9574 - val_accuracy: 0.0806 - val_loss: 2.9354
Epoch 7/100
[1m65/65[0m [32m━━━

<keras.src.callbacks.history.History at 0x1d3d50ee230>

In [177]:
for algo, model in fit_models.items():
    yhat = model.predict(X_test)
    print(algo, f1_score(y_test, yhat, average='weighted'))

lr 0.2878807863685754
rc 0.29061403724470747
rf 0.37521235600002995
gb 0.3180176568254469


- Based on metrics im going to select random forest model.

In [178]:
with open('../data/06_models/rf_body_language.pkl', 'wb') as f:
    pickle.dump(fit_models['rf'], f)

In [10]:
with open('../data/06_models/rf_body_language.pkl', 'rb') as f:
    model = pickle.load(f)

# 5.0 Identify real-time actions

- Here I will need to rebuild the landmarks function as there may be null points in the landmark and cause an error when extracting new information

- Instructions:

1) You have two inputs for the `process_video` function, one with the video path if you want to run a video and another for the camera channel if you want to use the webcam.
2) Once you have executed the function you can end the process with the ESC key.

In [None]:
def extract_landmarks(results, num_pose_landmarks=33, num_face_landmarks=468):
    if results.pose_landmarks:
        pose_landmarks = [[lm.x, lm.y, lm.z, lm.visibility] for lm in results.pose_landmarks.landmark]
    else:
        pose_landmarks = [[0, 0, 0, 0] for _ in range(num_pose_landmarks)]
    
    if results.face_landmarks:
        face_landmarks = [[lm.x, lm.y, lm.z] for lm in results.face_landmarks.landmark]
    else:
        face_landmarks = [[0, 0, 0] for _ in range(num_face_landmarks)]

    flat_landmarks = [item for sublist in pose_landmarks + face_landmarks for item in sublist]

    return flat_landmarks

In [None]:
def process_video(webcam_input=0, video_path=None):
    """
    Processes video input from a file or webcam, applies pose estimation and action prediction, and displays the results.

    This function initializes video capture either from a specified video file or webcam. It then continuously reads frames,
    processes each frame using the MediaPipe Holistic model to detect human pose and face landmarks, predicts the action using
    a pre-trained model, and displays the predicted action and its probability on the video feed.

    Parameters
    ----------
    webcam_input : int, optional
        The device index of the webcam (default is 0, which usually represents the default webcam).
        Ignored if `video_path` is provided.
    video_path : str, optional
        The path to the video file to be processed. If None, webcam input is used instead.

    Notes
    -----
    - Press 'ESC' to exit the video feed.
    - The function assumes the existence of a pre-defined `model` for action prediction and `ss` (StandardScaler) for data scaling.
    - `extract_landmarks` and `cleaning_data` are required utility functions for landmarks extraction and data preparation.

    Examples
    --------
    Process video from the default webcam:
    
    >>> process_video()

    Process video from a specified video file:
    
    >>> process_video(video_path='path/to/your/video.mp4')
    """
    if video_path:
        cap = cv2.VideoCapture(video_path)
    else:
        cap = cv2.VideoCapture(webcam_input)
    
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                if video_path:
                    print("Fim do vídeo.")
                else:
                    print("Ignorando frames vazios.")
                break
            
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic.process(image)

            landmarks_row = extract_landmarks(results)

            X = pd.DataFrame([landmarks_row])
            
            X = cleaning_data(X)
            X_transformed = ss.transform(X)
            body_language_class = model.predict(X_transformed)[0]
            body_language_prob = model.predict_proba(X_transformed)[0]

            cv2.putText(frame, body_language_class, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
            cv2.putText(frame, f'Prob: {round(body_language_prob[np.argmax(body_language_prob)],2)}', (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
            
            cv2.imshow('Video Feed', frame)
            
            if cv2.waitKey(5) & 0xFF == 27:
                break
    
    cap.release()
    cv2.destroyAllWindows()

# Usage example to video
video_path = 'video-path.mp4'
process_video(video_path)

# Usage example to webcam
process_video(webcam_input=1)

# Next Steps


- Validade metrics with cross-validation.
- Fine tunning models.
- Deploy model and pipeline from api.