In [1]:
# import libs
import cv2
import mediapipe as mp

from skimage.draw import line


In [2]:
# pose estimation utils
mp_pose = mp.solutions.pose

# joint's coordinate diff b/w frames
coord_diff = [0] * 33
# joint's coordinate @ prev frame
prev_coord = [[0, 0] for _ in range(33)]


In [3]:
# amplify distance between joints in consecutive frames (for rendering only)
AMPLIFICATION_FACTOR = 50

# smoothen joint activity transition - lesser the better
SMOOTHENING_FACTOR = 0.3

# minimum distance difference needed to get rendered
DISTANCE_THRESHOLD = 0.2

# width of skeleton line
LINE_SIZE = 15


In [36]:
# selected mediapipe keypoints
SELECTED_KEYPOINTS = [11, 12, 13, 14, 15, 16, 17, 18, 23, 24, 25, 26, 27, 28]

# selected connections
SELECTED_BONES = [
    (11, 12),
    (11, 13),
    (11, 23),
    (12, 14),
    (12, 24),
    (13, 15),
    (14, 16),
    (23, 24),
    (23, 25),
    (24, 26),
    (25, 27),
    (26, 28),
]


In [37]:
def calc_coord(lm, coord_diff, prev_coord):
    """Loop through each joint, calculate coordinate difference between frames

    Args:
        lm (NormalizedLandmarks): list of normalized landmarks predicted
        coord_diff (list[float]): list of joint coordinate difference
        prev_coord (list[list[float]]): list of joint's coordinate list (x,y only)
    """
    # for i in range(33):
    for i in SELECTED_KEYPOINTS:
        # NOTE: MANHATTAN DISTANCE is used instead of EUCLIDEAN DISTANCE
        temp = min(1, (abs(prev_coord[i][0] - lm[i].x) +
                       abs(prev_coord[i][1] - lm[i].y)) * AMPLIFICATION_FACTOR)
        coord_diff[i] = coord_diff[i] + \
            (temp - coord_diff[i]) * SMOOTHENING_FACTOR
        prev_coord[i] = [lm[i].x, lm[i].y]


In [38]:
def visualize_skeleton(img, lm, height, width, clr):
    """Visualize default skeleton

    Args:
        img (ndarray): numpy array of frame
        lm (NormalizedLandmarks): list of normalized landmarks predicted
        height (int): height of frame 
        width (int): width of frame
        clr (tuple[int])): tuple of RGB values 
    """
    # visualize skeleton for each joint-to-joint connections
    for item in SELECTED_BONES:
        img = cv2.line(img, (int(lm[item[0]].x * width), int(lm[item[0]].y * height)),
                       (int(lm[item[1]].x * width), int(lm[item[1]].y * height)), clr, LINE_SIZE)


In [39]:

BLUE_MAX = 50
BLUE_SPREAD = 100

GREEN_MAX = 50
GREEN_SPREAD = 100

RED_MAX = 255
RED_SPREAD = 40


def draw_hm_skeleton(img, lm, coord_diff, prev_coord, height, width):
    """Draw skeleton with joint activity measure

    Args:
        img (ndarray): numpy array of frame
        lm (NormalizedLandmarks): list of normalized landmarks predicted
        coord_diff (list[float]): list of joint coordinate difference
        prev_coord (list[list[float]]): list of joint's coordinate list (x,y only)
        height (int): height of frame 
        width (int): width of frame

    Returns:
        ndarray: numpy array of modified frame
    """

    calc_coord(lm, coord_diff, prev_coord)

    # uncomment this line if you need to render default white skeleton also
    visualize_skeleton(img, lm, height, width, (255, 255, 255))

    # for each joint-to-joint connection / line, visualize activity
    for item in SELECTED_BONES:

        # line iterator -scikit image
        lines = list(zip(*line(int(lm[item[0]].x * width),
                               int(lm[item[0]].y * height),
                               int(lm[item[1]].x * width),
                               int(lm[item[1]].y * height))))

        # 1st joint in a connection
        if coord_diff[item[0]] > DISTANCE_THRESHOLD:
            for i in range(0, int(len(lines) * coord_diff[item[0]])-1):
                img = cv2.line(img,
                               (lines[i][0], lines[i][1]),
                               (lines[i+1][0], lines[i+1][1]),
                               (BLUE_MAX + int((1 - coord_diff[item[0]]) * BLUE_SPREAD) + i,
                                   GREEN_MAX +
                                   int((1 - coord_diff[item[0]])
                                       * GREEN_SPREAD) + i,
                                   RED_MAX - int(coord_diff[0] * RED_SPREAD)), LINE_SIZE)

        # 2nd joint in a connection
        if coord_diff[item[1]] > DISTANCE_THRESHOLD:
            for i in range(len(lines) - 1, len(lines) - int(len(lines) * coord_diff[item[1]]), -1):
                img = cv2.line(img,
                               (lines[i][0], lines[i][1]),
                               (lines[i-1][0], lines[i-1][1]),
                               (BLUE_MAX + int((1 - coord_diff[item[1]]) * BLUE_SPREAD) + len(lines) - 1 - i,
                                   GREEN_MAX +
                                   int((1 - coord_diff[item[1]]) *
                                       GREEN_SPREAD) + len(lines) - 1 - i,
                                   RED_MAX - int(coord_diff[1] * RED_SPREAD)), LINE_SIZE)

    return img


In [40]:

# capture
cap = cv2.VideoCapture('sample.mp4')

# init output
out = cv2.VideoWriter('output.mp4', cv2.VideoWriter_fourcc(
    *'mp4v'), 24, (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))


with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    # while vid file is available and opened or webcam is available and working
    while cap.isOpened():
        ret, frame = cap.read()

        if ret:
            # openCV uses BGR color format but Mediapipe uses RGB
            img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img.flags.writeable = False

            results = pose.process(img)

            img.flags.writeable = True
            img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)

            # try operations only if landmarks were found
            try:
                lm = results.pose_landmarks.landmark    # landmarks
                img = draw_hm_skeleton(
                    img, lm, coord_diff, prev_coord, frame.shape[0], frame.shape[1])
            except:
                pass

            # feed
            cv2.imshow('feed', img)
            out.write(img)

            # break feed
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
        else:
            break

    # release feed
    cap.release()
    out.release()
    cv2.destroyAllWindows()
