# Preprocessing

This notebook is used for preprocessing the data. The data is generated by extracting the pose estimation landmarks of the person idenfied in each video and associating it with an action.

Author: Lim Yun Feng, Ting Yi Xuan, Chua Sheen Wey
Last Modified: 28/10/2023

Reference : https://github.com/nam157/human_activity_recognition-/tree/main

In [None]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.5/33.5 MB[0m [31m22.9 MB/s[0m eta [36m0:00:00[0m
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.4.6-py3-none-any.whl (31 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.5 sounddevice-0.4.6


In [None]:
import os
import numpy as np
import cv2
import mediapipe as mp
import time
import tensorflow as tf
from keras.utils import to_categorical

In [None]:
# Implementation of Mediapipe Pose pose estimation model
class mediapipe_pose:
    def __init__(self):
        self.mp_holistic = mp.solutions.holistic
        self.mp_drawing = mp.solutions.drawing_utils
    def mediapipe_detection(self,image,model):
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = model.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        return image, results
    def draw_styled_landmarks(self,image, results):
        self.mp_drawing.draw_landmarks(image, results.pose_landmarks, self.mp_holistic.POSE_CONNECTIONS,
                                 self.mp_drawing.DrawingSpec(color=(112,112,112), thickness=2, circle_radius=1),
                                 self.mp_drawing.DrawingSpec(color=(94,200,0), thickness=2, circle_radius=1)
                                 )
    def extract_keypoints(self,results):
        pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
        return np.concatenate([pose])
    def BBox(self,image,results):
        xList,yList,bbox = [],[],[]
        if results.pose_landmarks:
            for id,land in enumerate(results.pose_landmarks.landmark):
                h,w,c = image.shape # high,weight,chanel with img
                cx = int(land.x *w)
                cy = int(land.y *h)
                xList.append(cx)
                yList.append(cy)
            xmin,xmax = min(xList),max(xList)
            ymin,ymax = min(yList),max(yList)
            bbox = xmin,ymin,xmax,ymax
        return bbox

In [None]:
# Use this if in google colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Create an instance of mediapipe_pose class
mp = mediapipe_pose()

In [None]:
# define constants
SEQUENCE_LENGTH = 30
DATA_PATH = "./drive/MyDrive/FIT3162_FYP/DATA/"

# Create a path to store the extracted data
if not os.path.exists(DATA_PATH):
    os.mkdir(DATA_PATH)

# Get the list of actions
action_vpath = "./drive/MyDrive/New2_UAV Human-Action Videos/UAV Human-Action Videos/"
actions = os.listdir(action_vpath)
actions

# Create a path to store the img pose data
IMG_POSE_PATH = "./drive/MyDrive/FIT3162_FYP/NEW_IMG_POSE/"
if not os.path.exists(IMG_POSE_PATH):
    os.mkdir(IMG_POSE_PATH)

In [None]:
# create a function to extract pose keypoints and save them into a folder
def pose_frame_extraction():
  # Set mediapipe model
  with mp.mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    # loop for every action
    for action in actions:
      # create folder for saving pose estimation keypoints data
      if not os.path.exists(DATA_PATH+action):
        os.mkdir(DATA_PATH+action)
      # create folder for saving pose estimated video frames
      if not os.path.exists(IMG_POSE_PATH+action):
        os.mkdir(IMG_POSE_PATH+action)
      # to access video for every action
      video_list = os.listdir(action_vpath+action)
      print(video_list)
      # loop through every video
      for video in video_list:
        vid_path = os.path.join(action_vpath + action + "/" + video)
        # read the video
        video_reader = cv2.VideoCapture(vid_path)
        # get the frame number
        video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
        #print(video_frames_count)
        # calculate how many frames to skip
        skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)
        #print(os.path.splitext("/path/to/some/file.txt")[0])
        # get filename without .avi
        print(video[:-4])
        vid_name = video[:-4]
        # create folder for saving pose estimation keypoints data
        if not os.path.exists(DATA_PATH + action + "/" + vid_name):
          os.mkdir(DATA_PATH + action + "/" + vid_name)
        # create folder for saving pose estimated video frames
        if not os.path.exists(IMG_POSE_PATH + action + "/" + vid_name):
          os.mkdir(IMG_POSE_PATH + action + "/" + vid_name)

        # for every frame
        for frame_counter in range(SEQUENCE_LENGTH):
          video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
          success, frame = video_reader.read()
          if not success:
            break
          # APPLY POSE ESTIMATION DETECTION
          image, res = mp.mediapipe_detection(frame, holistic)
          # # DRAW POSE KEYPOINTS ON IMAGE
          # mp.draw_styled_landmarks(image, res)
          # cv2.putText(image, "Frame: " + str(frame_counter), (10,100), cv2.FONT_HERSHEY_PLAIN, 2,(255,0,190),2,cv2.LINE_AA)

          # # save pose estimated images into respective folder
          # pose_img_path = os.path.join(IMG_POSE_PATH, action, vid_name, str(frame_counter) + ".jpg")
          # cv2.imwrite(pose_img_path, image)

          # get the keypoints data
          keypoints = mp.extract_keypoints(res)
          # print(keypoints)
          # print(keypoints.shape)
          #print("====================================")
          npy_path = os.path.join(DATA_PATH, action, vid_name, str(frame_counter))
          np.save(npy_path, keypoints)

    video_reader.release()
        #keypoints_with_scores = results['output_0'].numpy()[:,:,:51].reshape((6,17,3))
      # get only the first one as the video will contain one person


In [None]:
pose_frame_extraction()

['41.avi', '32.avi', '34.avi', '18.avi', '17.avi', '42.avi', '31.avi', '5.avi', '40.avi', '33.avi', '39.avi', '38.avi', '9.avi', '3.avi', '19.avi', '35.avi', '11.avi', '30.avi', '24.avi', '37.avi', '36.avi', '21.avi', '29.avi', '0.avi', '8.avi', '7.avi']
41
32
34
18
17
42
31
5
40
33
39
38
9
3
19
35
11
30
24
37
36
21
29
0
8
7
['96.avi', '137.avi', '142.avi', '136.avi', '157.avi', '88.avi', '158.avi', '95.avi', '156.avi', '114.avi', '134.avi', '112.avi', '135.avi', '115.avi', '167.avi', '94.avi', '122.avi', '99.avi', '83.avi', '64.avi', '149.avi', '159.avi', '40.avi', '130.avi', '155.avi', '124.avi', '86.avi', '79.avi', '111.avi', '4.avi', '127.avi', '145.avi', '54.avi', '148.avi', '80.avi', '143.avi', '70.avi', '22.avi', '125.avi', '116.avi', '106.avi', '120.avi', '53.avi', '105.avi', '119.avi', '65.avi', '41.avi', '161.avi', '110.avi', '8.avi', '163.avi', '75.avi', '10.avi', '160.avi', '162.avi', '166.avi', '97.avi', '154.avi', '23.avi', '68.avi', '123.avi', '77.avi', '118.avi', '132.a

In [None]:
# Assign the class labels to the actions
actions = np.array(actions)
label_map = {label:num for num,label in enumerate(actions)}
label_map

{'A147': 0, 'A042': 1, 'A019': 2, 'A020': 3, 'A152': 4}

In [None]:
# Function to load the data and save them into a list
def data_load(actions, sequence_length):
    seq,labels = [],[]
    for action in actions:
        action_data = os.listdir(DATA_PATH + action)
        for data in action_data:
            # data_pth = action_pth + data
            window = []
            for frame_num in range(sequence_length):
                data_pth = os.path.join(DATA_PATH, action, data,"{}.npy".format(frame_num))
                print(data_pth)
                res = np.load(data_pth)
                window.append(res)
            seq.append(window)
            labels.append(label_map[action])
    return seq,labels

In [None]:
%time sequences, labels = data_load(actions, SEQUENCE_LENGTH)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/12.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/13.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/14.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/15.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/16.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/17.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/18.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/19.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/20.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/21.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/22.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/23.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/24.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/25.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/26.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/27.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/28.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/24/29.npy
./drive/MyDrive/FIT3162_FYP/DATA/A019/127/0.npy
./drive/MyDrive/FIT3162

In [None]:
X = np.array(sequences)
X.shape

(435, 30, 132)

In [None]:
y = to_categorical(labels).astype(int)
y

array([[1, 0, 0, 0, 0],
       [1, 0, 0, 0, 0],
       [1, 0, 0, 0, 0],
       ...,
       [0, 0, 0, 0, 1],
       [0, 0, 0, 0, 1],
       [0, 0, 0, 0, 1]])

In [None]:
with open('drive/MyDrive/FIT3162_FYP/Training/X2.npy', 'wb') as f:
    np.save(f, X)
with open('drive/MyDrive/FIT3162_FYP/Training/y2.npy', 'wb') as f:
    np.save(f, y)