In [1]:
import cv2
import mediapipe as mp
import time
import pandas as pd

In [2]:
landmark_dict = {
    0: "nose",
    1: "left_eye_inner",
    2: "left_eye",
    3: "left_eye_outer",
    4: "right_eye_inner",
    5: "right_eye",
    6: "right_eye_outer",
    7: "left_ear",
    8: "right_ear",
    9: "mouth_left",
    10: "mouth_right",
    11: "left_shoulder",
    12: "right_shoulder",
    13: "left_elbow",
    14: "right_elbow",
    15: "left_wrist",
    16: "right_wrist",
    17: "left_pinky",
    18: "right_pinky",
    19: "left_index",
    20: "right_index",
    21: "left_thumb",
    22: "right_thumb",
    23: "left_hip",
    24: "right_hip",
    25: "left_knee",
    26: "right_knee",
    27: "left_ankle",
    28: "right_ankle",
    29: "left_heel",
    30: "right_heel",
    31: "left_foot_index",
    32: "right_foot_index",
}

In [5]:
import csv
import numpy as np


def extract_data(file_name):
  mp_drawing = mp.solutions.drawing_utils
  mp_drawing_styles = mp.solutions.drawing_styles
  mp_holistic = mp.solutions.holistic

  # VID_DIM = (1290, 1080)
  counter = 0
  cap = cv2.VideoCapture(f"C:/Users/chengyang/Physio/videos/{file_name}")
  # used to record the time when we processed last frame 
  prev_frame_time = 0
    
  # used to record the time at which we processed current frame 
  new_frame_time = 0
  stage = None

  frame_width = int(cap.get(3))
  frame_height = int(cap.get(4))
  frame_size = (frame_width,frame_height)
  fourcc = cv2.VideoWriter_fourcc(*'MP4V')
  file = cv2.VideoWriter('output2.mp4', fourcc, 20.0, frame_size)
  data = []

  file_name = file_name.split(".")[0]
  landmarks = ['class']
  for val in range(1, 33+1):
      landmarks += ['x{}'.format(val), 'y{}'.format(val), 'z{}'.format(val), 'v{}'.format(val)]

  with open(f"C:/Users/chengyang/Physio/src/PoseDetection/SquatData/{file_name}.csv", mode='w', newline='') as f:
      csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
      csv_writer.writerow(landmarks)

  with mp_holistic.Holistic(
      model_complexity = 0,
      min_detection_confidence=0.5,
      min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
      success, image = cap.read()
      if not success:
        print("Ignoring empty camera frame.")
        break

      new_frame_time = time.time() 
    
      # Calculating the fps 
    
      # fps will be number of frame processed in given time frame 
      # since their will be most of time error of 0.001 second 
      # we will be subtracting it to get more accurate result 
      fps = 1/(new_frame_time-prev_frame_time) 
      prev_frame_time = new_frame_time 

      image = cv2.flip(image, 1)
      
      # To improve performance, optionally mark the image as not writeable to
      # pass by reference.
      image.flags.writeable = False
      image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
      results = holistic.process(image)

      # Draw the hand annotations on the image.
      image.flags.writeable = True
      image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

      mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, mp_drawing_styles.get_default_hand_landmarks_style(), mp_drawing_styles.get_default_hand_connections_style())
      mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, mp_drawing_styles.get_default_hand_landmarks_style(), mp_drawing_styles.get_default_hand_connections_style())
      mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)

      try:
          landmarks = results.pose_world_landmarks.landmark
          # Get coordinates
          # Define a list of landmark names
          # landmark_names = [mp_holistic.PoseLandmark(i).name for i in range(mp_holistic.PoseLandmark.NUM_POSE_LANDMARKS)]

          # Create a dictionary to store landmark coordinates
          # coordinates = {}

          # for id, pose_landmarks in enumerate(landmarks):
          #     print(pose_landmarks)
          #     # Here is How to Get All the Coordinates
          #     cx, cy, cz = pose_landmarks.x * frame_width, pose_landmarks.y * frame_height, pose_landmarks.z * frame_height
          #     coordinates[landmark_dict[id]] = [cx, cy, cz]
          # coordinates["exercise"] = "squat"
          # data.append(coordinates)

          pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in landmarks]).flatten())        
          
          # Export to CSV
          with open(f"C:/Users/chengyang/Physio/src/PoseDetection/SquatData/{file_name}.csv", mode='a', newline='') as f:
              csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
              csv_writer.writerow(pose_row) 
      except Exception as e:
          print(e)
          pass
      
      # Render curl counter
      # Setup status box
      cv2.rectangle(image, (0,0), (325,73), (245,117,16), -1)
      
      # Rep data
      cv2.putText(image, 'REPS', (15,12), 
                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
      cv2.putText(image, str(counter), 
                  (10,60), 
                  cv2.FONT_HERSHEY_SIMPLEX, 2, (255,255,255), 2, cv2.LINE_AA)
      
      # Stage data
      cv2.putText(image, 'STAGE', (115,12), 
                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
      cv2.putText(image, stage, 
                  (100,60), 
                  cv2.FONT_HERSHEY_SIMPLEX, 2, (255,255,255), 2, cv2.LINE_AA)
      
      cv2.putText(image, 'FPS: ' + str(int(fps)), (225,12), 
                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)

      # Write the frame to the video file
      file.write(image)

      # Flip the image horizontally for a selfie-view display.
      cv2.imshow('MediaPipe Hands', image)

      if cv2.waitKey(10) & 0xFF == ord('q'):
              break
  cap.release()
  file.release()
  cv2.destroyAllWindows()
  # data = pd.DataFrame(data)
  # file_name = file_name.split(".")[0]
  # data.to_csv(f"C:/Users/chengyang/Physio/src/PoseDetection/SquatData/{file_name}.csv")

In [7]:
import os

folder_path = "C:/Users/chengyang/Physio/videos/"  # Replace with the path to your folder

# List all files in the folder
files = os.listdir(folder_path)

# Loop through the files
for file_name in files:
    # Check if the item is a file (not a subdirectory)
    if os.path.isfile(os.path.join(folder_path, file_name)):
        extract_data(file_name)
        print(file_name)
        # Add your processing logic for each file here


'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
Ignoring empty camera frame.
20231116_231303.mp4
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attribute 'landmark'
'NoneType' object has no attrib

# Label the datasets
## column for type of exercise, 
## maybe should just collect all the body landmarks, 
## column for feedback (good, bend the knees more, straighten back, feet too wide)

In [None]:
import torch.nn as nn
import torch
from torch.nn.parameter import Parameter
import math


class GraphConvolution(nn.Module):
    """
    Define a Graph convolutional layer with a learnable adjacency matrix
    """

    def __init__(self, in_features, out_features, bias=True, node_n=57):

        super(GraphConvolution, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.weight = Parameter(torch.FloatTensor(in_features, out_features))
        self.adj = Parameter(torch.FloatTensor(node_n, node_n))
        if bias:
            self.bias = Parameter(torch.FloatTensor(out_features))
        else:
            self.register_parameter('bias', None)
        self.reset_parameters()

    def reset_parameters(self):
        stdv = 1. / math.sqrt(self.weight.size(1))
        self.weight.data.uniform_(-stdv, stdv)
        self.adj.data.uniform_(-stdv, stdv)
        if self.bias is not None:
            self.bias.data.uniform_(-stdv, stdv)

    def forward(self, input):
        support = torch.matmul(input, self.weight)
        output = torch.matmul(self.adj, support)
        if self.bias is not None:
            return output + self.bias
        else:
            return output

    def __repr__(self):
        return self.__class__.__name__ + ' (' \
               + str(self.in_features) + ' -> ' \
               + str(self.out_features) + ')'


class GC_Block(nn.Module):
    """
    Define a residual block of GCN
    """

    def __init__(self, in_features, p_dropout, bias=True, node_n=57):

        super(GC_Block, self).__init__()
        self.in_features = in_features
        self.out_features = in_features

        self.gc1 = GraphConvolution(in_features, in_features, node_n=node_n, bias=bias)
        self.bn1 = nn.BatchNorm1d(node_n * in_features)

        self.gc2 = GraphConvolution(in_features, in_features, node_n=node_n, bias=bias)
        self.bn2 = nn.BatchNorm1d(node_n * in_features)

        self.do = nn.Dropout(p_dropout)
        self.act_f = nn.ReLU()

    def forward(self, x):
        y = self.gc1(x)
        if len(y.shape) == 3:
            b, n, f = y.shape
        else:
            b = 1
            n, f = y.shape
        y = self.bn1(y.view(b, -1)).view(b, n, f)
        y = self.act_f(y)
        y = self.do(y)

        y = self.gc2(y)
        y = self.bn2(y.view(b, -1)).view(b, n, f)
        y = self.act_f(y)
        y = self.do(y)

        return y + x

    def __repr__(self):
        return self.__class__.__name__ + ' (' \
               + str(self.in_features) + ' -> ' \
               + str(self.out_features) + ')'


class GCN_corr(nn.Module):

    def __init__(self, input_feature=25, hidden_feature=128, p_dropout=0.5, num_stage=2, node_n=57):
        """
        :param input_feature: num of input feature
        :param hidden_feature: num of hidden feature
        :param p_dropout: drop out prob.
        :param num_stage: number of residual blocks
        :param node_n: number of nodes in graph
        """
        super(GCN_corr, self).__init__()
        self.num_stage = num_stage

        self.gcin = GraphConvolution(input_feature, hidden_feature, node_n=node_n)
        self.bn1 = nn.BatchNorm1d(node_n * hidden_feature)

        self.gcbs = []
        for i in range(num_stage):
            self.gcbs.append(GC_Block(hidden_feature, p_dropout=p_dropout, node_n=node_n))

        self.gcbs = nn.ModuleList(self.gcbs)

        self.gcout = GraphConvolution(hidden_feature, input_feature, node_n=node_n)
        self.gcatt = GraphConvolution(hidden_feature, 1, node_n=node_n)

        self.do = nn.Dropout(p_dropout)
        self.act_f = nn.ReLU()
        self.act_fatt = nn.Sigmoid()

    def forward(self, x):

        y = self.gcin(x)
        if len(y.shape) == 3:
            b, n, f = y.shape
        else:
            b = 1
            n, f = y.shape

        y = self.bn1(y.view(b, -1)).view(b, n, f)
        y = self.act_f(y)
        y = self.do(y)

        for i in range(self.num_stage):
            y = self.gcbs[i](y)

        out = self.gcout(y)

        att = self.gcatt(y)
        att = self.act_fatt(att)

        return out, att


class GCN_class(nn.Module):

    def __init__(self, input_feature=25, hidden_feature=32, p_dropout=0.5, node_n=57, classes=12):
        """
        :param input_feature: num of input feature
        :param hidden_feature: num of hidden feature
        :param p_dropout: drop out prob.
        :param num_stage: number of residual blocks
        :param node_n: number of nodes in graph
        """
        super(GCN_class, self).__init__()

        self.gcin = GraphConvolution(input_feature, hidden_feature, node_n=node_n)
        self.gcout = GraphConvolution(hidden_feature, input_feature, node_n=node_n)
        self.bnin = nn.BatchNorm1d(node_n * hidden_feature)
        self.bnout = nn.BatchNorm1d(node_n * input_feature)
        self.lin = nn.Linear(node_n * input_feature, classes)
        self.do = nn.Dropout(p_dropout)
        self.act_f = nn.ReLU()
        self.act_flin = nn.LogSoftmax(dim=1)

    def forward(self, x):

        if len(x.shape) == 3:
            b, n, f = x.shape
        else:
            b = 1
            n, f = x.shape

        y = self.gcin(x)
        if b > 1:
            y = self.bnin(y.view(b, -1)).view(y.shape)
        y = self.act_f(y)
        y = self.do(y)

        y = self.gcout(y)
        if b > 1:
            y = self.bnout(y.view(b, -1)).view(y.shape)
        y = self.act_f(y)
        y = self.do(y)

        y = y.view(-1, n * f)
        y = self.lin(y)
        y = self.act_flin(y)

        return y