In [None]:
!pip install vidgear
!pip install ffmpeg-python
!pip install munkres
!pip install nvidia-pyindex
!pip install nvidia-tensorrt

In [None]:
import os
import sys
import argparse
import ast
import cv2
import time
import torch
from vidgear.gears import CamGear
import numpy as np
import json
import matplotlib.pyplot as plt
import torchvision
import ffmpeg
import munkres
from collections import defaultdict
from torch import nn
from torchvision import transforms
import codecs

## visualization.py

In [None]:
def joints_dict():
    joints = {
        "coco": {
            "keypoints": {
                0: "nose",
                1: "left_eye",
                2: "right_eye",
                3: "left_ear",
                4: "right_ear",
                5: "left_shoulder",
                6: "right_shoulder",
                7: "left_elbow",
                8: "right_elbow",
                9: "left_wrist",
                10: "right_wrist",
                11: "left_hip",
                12: "right_hip",
                13: "left_knee",
                14: "right_knee",
                15: "left_ankle",
                16: "right_ankle"
            },
            "skeleton": [
                # # [16, 14], [14, 12], [17, 15], [15, 13], [12, 13], [6, 12], [7, 13], [6, 7], [6, 8],
                # # [7, 9], [8, 10], [9, 11], [2, 3], [1, 2], [1, 3], [2, 4], [3, 5], [4, 6], [5, 7]
                # [15, 13], [13, 11], [16, 14], [14, 12], [11, 12], [5, 11], [6, 12], [5, 6], [5, 7],
                # [6, 8], [7, 9], [8, 10], [1, 2], [0, 1], [0, 2], [1, 3], [2, 4], [3, 5], [4, 6]
                [15, 13], [13, 11], [16, 14], [14, 12], [11, 12], [5, 11], [6, 12], [5, 6], [5, 7],
                [6, 8], [7, 9], [8, 10], [1, 2], [0, 1], [0, 2], [1, 3], [2, 4],  # [3, 5], [4, 6]
                [0, 5], [0, 6]
            ],
            "order": [0, 1, 2, 3, 4, 5, 6, 11, 12, 7, 8, 9, 10, 13, 14, 15, 16]
        },
        "mpii": {
            "keypoints": {
                0: "right_ankle",
                1: "right_knee",
                2: "right_hip",
                3: "left_hip",
                4: "left_knee",
                5: "left_ankle",
                6: "pelvis",
                7: "thorax",
                8: "upper_neck",
                9: "head top",
                10: "right_wrist",
                11: "right_elbow",
                12: "right_shoulder",
                13: "left_shoulder",
                14: "left_elbow",
                15: "left_wrist"
            },
            "skeleton": [
                # [5, 4], [4, 3], [0, 1], [1, 2], [3, 2], [13, 3], [12, 2], [13, 12], [13, 14],
                # [12, 11], [14, 15], [11, 10], # [2, 3], [1, 2], [1, 3], [2, 4], [3, 5], [4, 6], [5, 7]
                [5, 4], [4, 3], [0, 1], [1, 2], [3, 2], [3, 6], [2, 6], [6, 7], [7, 8], [8, 9],
                [13, 7], [12, 7], [13, 14], [12, 11], [14, 15], [11, 10],
            ],
            "order": None  # [0, 1, 2, 3, 4, 5, 6, 11, 12, 7, 8, 9, 10, 13, 14]
        },
        "crowdpose": {
            "keypoints": {
                0: "right_ankle",
                1: "right_knee",
                2: "right_hip",
                3: "left_hip",
                4: "left_knee",
                5: "left_ankle",
                6: "pelvis",
                7: "thorax",
                8: "upper_neck",
                9: "head top",
                10: "right_wrist",
                11: "right_elbow",
                12: "right_shoulder",
                13: "left_shoulder",
                14: "left_elbow"
            },
            "skeleton": [
                [5, 4], [4, 3], [0, 1], [1, 2], [3, 2], [3, 6], [2, 6], [6, 7], [7, 8], [8, 9],
                [13, 7], [12, 7], [13, 14], [12, 11], [11, 10],
            ],
            "order": [0, 1, 2, 3, 4, 5, 6, 11, 12, 7, 8, 9, 10, 13]
        },
    }
    return joints

In [None]:
def draw_points(image, points, color_palette='tab20', palette_samples=16, confidence_threshold=0.3):
    """
    Draws `points` on `image`.

    Args:
        image: image in opencv format
        points: list of points to be drawn.
            Shape: (nof_points, 3)
            Format: each point should contain (y, x, confidence)
        color_palette: name of a matplotlib color palette
            Default: 'tab20'
        palette_samples: number of different colors sampled from the `color_palette`
            Default: 16
        confidence_threshold: only points with a confidence higher than this threshold will be drawn. Range: [0, 1]
            Default: 0.3

    Returns:
        A new image with overlaid points

    """
    try:
        colors = np.round(
            np.array(plt.get_cmap(color_palette).colors) * 255
        ).astype(np.uint8)[:, ::-1].tolist()
    except AttributeError:  # if palette has not pre-defined colors
        colors = np.round(
            np.array(plt.get_cmap(color_palette)(np.linspace(0, 1, palette_samples))) * 255
        ).astype(np.uint8)[:, -2::-1].tolist()

    circle_size = max(1, min(image.shape[:2]) // 160)  # ToDo Shape it taking into account the size of the detection
    # circle_size = max(2, int(np.sqrt(np.max(np.max(points, axis=0) - np.min(points, axis=0)) // 16)))

    for i, pt in enumerate(points):
        if pt[2] > confidence_threshold:
            image = cv2.circle(image, (int(pt[1]), int(pt[0])), circle_size, tuple(colors[i % len(colors)]), -1)

    return image

In [None]:
def draw_skeleton(image, points, skeleton, color_palette='Set2', palette_samples=8, person_index=0,
                  confidence_threshold=0.3):
    """
    Draws a `skeleton` on `image`.

    Args:
        image: image in opencv format
        points: list of points to be drawn.
            Shape: (nof_points, 3)
            Format: each point should contain (y, x, confidence)
        skeleton: list of joints to be drawn
            Shape: (nof_joints, 2)
            Format: each joint should contain (point_a, point_b) where `point_a` and `point_b` are an index in `points`
        color_palette: name of a matplotlib color palette
            Default: 'Set2'
        palette_samples: number of different colors sampled from the `color_palette`
            Default: 8
        person_index: index of the person in `image`
            Default: 0
        confidence_threshold: only points with a confidence higher than this threshold will be drawn. Range: [0, 1]
            Default: 0.3

    Returns:
        A new image with overlaid joints

    """
    try:
        colors = np.round(
            np.array(plt.get_cmap(color_palette).colors) * 255
        ).astype(np.uint8)[:, ::-1].tolist()
    except AttributeError:  # if palette has not pre-defined colors
        colors = np.round(
            np.array(plt.get_cmap(color_palette)(np.linspace(0, 1, palette_samples))) * 255
        ).astype(np.uint8)[:, -2::-1].tolist()

    for i, joint in enumerate(skeleton):
        pt1, pt2 = points[joint]
        if pt1[2] > confidence_threshold and pt2[2] > confidence_threshold:
            image = cv2.line(
                image, (int(pt1[1]), int(pt1[0])), (int(pt2[1]), int(pt2[0])),
                tuple(colors[person_index % len(colors)]), 2
            )

    return image

In [None]:
def draw_points_and_skeleton(image, points, skeleton, points_color_palette='tab20', points_palette_samples=16,
                             skeleton_color_palette='Set2', skeleton_palette_samples=8, person_index=0,
                             confidence_threshold=0.3):
    """
    Draws `points` and `skeleton` on `image`.

    Args:
        image: image in opencv format
        points: list of points to be drawn.
            Shape: (nof_points, 3)
            Format: each point should contain (y, x, confidence)
        skeleton: list of joints to be drawn
            Shape: (nof_joints, 2)
            Format: each joint should contain (point_a, point_b) where `point_a` and `point_b` are an index in `points`
        points_color_palette: name of a matplotlib color palette
            Default: 'tab20'
        points_palette_samples: number of different colors sampled from the `color_palette`
            Default: 16
        skeleton_color_palette: name of a matplotlib color palette
            Default: 'Set2'
        skeleton_palette_samples: number of different colors sampled from the `color_palette`
            Default: 8
        person_index: index of the person in `image`
            Default: 0
        confidence_threshold: only points with a confidence higher than this threshold will be drawn. Range: [0, 1]
            Default: 0.3

    Returns:
        A new image with overlaid joints

    """
    image = draw_skeleton(image, points, skeleton, color_palette=skeleton_color_palette,
                          palette_samples=skeleton_palette_samples, person_index=person_index,
                          confidence_threshold=confidence_threshold)
    image = draw_points(image, points, color_palette=points_color_palette, palette_samples=points_palette_samples,
                        confidence_threshold=confidence_threshold)
    return image

In [None]:
def save_images(images, target, joint_target, output, joint_output, joint_visibility, summary_writer=None, step=0,
                prefix=''):
    """
    Creates a grid of images with gt joints and a grid with predicted joints.
    This is a basic function for debugging purposes only.

    If summary_writer is not None, the grid will be written in that SummaryWriter with name "{prefix}_images" and
    "{prefix}_predictions".

    Args:
        images (torch.Tensor): a tensor of images with shape (batch x channels x height x width).
        target (torch.Tensor): a tensor of gt heatmaps with shape (batch x channels x height x width).
        joint_target (torch.Tensor): a tensor of gt joints with shape (batch x joints x 2).
        output (torch.Tensor): a tensor of predicted heatmaps with shape (batch x channels x height x width).
        joint_output (torch.Tensor): a tensor of predicted joints with shape (batch x joints x 2).
        joint_visibility (torch.Tensor): a tensor of joint visibility with shape (batch x joints).
        summary_writer (tb.SummaryWriter): a SummaryWriter where write the grids.
            Default: None
        step (int): summary_writer step.
            Default: 0
        prefix (str): summary_writer name prefix.
            Default: ""

    Returns:
        A pair of images which are built from torchvision.utils.make_grid
    """
    # Input images with gt
    images_ok = images.detach().clone()
    images_ok[:, 0].mul_(0.229).add_(0.485)
    images_ok[:, 1].mul_(0.224).add_(0.456)
    images_ok[:, 2].mul_(0.225).add_(0.406)
    for i in range(images.shape[0]):
        joints = joint_target[i] * 4.
        joints_vis = joint_visibility[i]

        for joint, joint_vis in zip(joints, joints_vis):
            if joint_vis[0]:
                a = int(joint[1].item())
                b = int(joint[0].item())
                # images_ok[i][:, a-1:a+1, b-1:b+1] = torch.tensor([1, 0, 0])
                images_ok[i][0, a - 1:a + 1, b - 1:b + 1] = 1
                images_ok[i][1:, a - 1:a + 1, b - 1:b + 1] = 0
    grid_gt = torchvision.utils.make_grid(images_ok, nrow=int(images_ok.shape[0] ** 0.5), padding=2, normalize=False)
    if summary_writer is not None:
        summary_writer.add_image(prefix + 'images', grid_gt, global_step=step)

    # Input images with prediction
    images_ok = images.detach().clone()
    images_ok[:, 0].mul_(0.229).add_(0.485)
    images_ok[:, 1].mul_(0.224).add_(0.456)
    images_ok[:, 2].mul_(0.225).add_(0.406)
    for i in range(images.shape[0]):
        joints = joint_output[i] * 4.
        joints_vis = joint_visibility[i]

        for joint, joint_vis in zip(joints, joints_vis):
            if joint_vis[0]:
                a = int(joint[1].item())
                b = int(joint[0].item())
                # images_ok[i][:, a-1:a+1, b-1:b+1] = torch.tensor([1, 0, 0])
                images_ok[i][0, a - 1:a + 1, b - 1:b + 1] = 1
                images_ok[i][1:, a - 1:a + 1, b - 1:b + 1] = 0
    grid_pred = torchvision.utils.make_grid(images_ok, nrow=int(images_ok.shape[0] ** 0.5), padding=2, normalize=False)
    if summary_writer is not None:
        summary_writer.add_image(prefix + 'predictions', grid_pred, global_step=step)

    # Heatmaps
    # ToDo
    # for h in range(0,17):
    #     heatmap = torchvision.utils.make_grid(output[h].detach(), nrow=int(np.sqrt(output.shape[0])),
    #                                            padding=2, normalize=True, range=(0, 1))
    #     summary_writer.add_image('train_heatmap_%d' % h, heatmap, global_step=step + epoch*len_dl_train)

    return grid_gt, grid_pred

## utils.py

In [None]:
def flip_tensor(tensor, dim=0):
    """
    flip the tensor on the dimension dim
    """
    inv_idx = torch.arange(tensor.shape[dim] - 1, -1, -1).to(tensor.device)
    return tensor.index_select(dim, inv_idx)

In [None]:
def flip_back(output_flipped, matched_parts):
    assert len(output_flipped.shape) == 4, 'output_flipped has to be [batch_size, num_joints, height, width]'

    output_flipped = flip_tensor(output_flipped, dim=-1)

    for pair in matched_parts:
        tmp = output_flipped[:, pair[0]].clone()
        output_flipped[:, pair[0]] = output_flipped[:, pair[1]]
        output_flipped[:, pair[1]] = tmp

    return output_flipped

In [None]:
def fliplr_joints(joints, joints_vis, width, matched_parts):
    # Flip horizontal
    joints[:, 0] = width - joints[:, 0] - 1

    # Change left-right parts
    for pair in matched_parts:
        joints[pair[0], :], joints[pair[1], :] = \
            joints[pair[1], :], joints[pair[0], :].copy()
        joints_vis[pair[0], :], joints_vis[pair[1], :] = \
            joints_vis[pair[1], :], joints_vis[pair[0], :].copy()

    return joints * joints_vis, joints_vis

In [None]:
def get_affine_transform(center, scale, rot, output_size, shift=np.array([0, 0], dtype=np.float32), inv=0):
    if not isinstance(scale, np.ndarray) and not isinstance(scale, list):
        print(scale)
        scale = np.array([scale, scale])

    scale_tmp = scale * 1.0 * 200.0  # It was scale_tmp = scale * 200.0
    src_w = scale_tmp[0]
    dst_w = output_size[0]
    dst_h = output_size[1]

    rot_rad = np.pi * rot / 180
    src_dir = get_dir([0, src_w * -0.5], rot_rad)
    dst_dir = np.array([0, dst_w * -0.5], np.float32)

    src = np.zeros((3, 2), dtype=np.float32)
    dst = np.zeros((3, 2), dtype=np.float32)
    src[0, :] = center + scale_tmp * shift
    src[1, :] = center + src_dir + scale_tmp * shift
    dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
    dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir

    src[2:, :] = get_3rd_point(src[0, :], src[1, :])
    dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

    if inv:
        trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
    else:
        trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))

    return trans

In [None]:
def affine_transform(pt, t):
    new_pt = np.array([pt[0], pt[1], 1.]).T
    new_pt = np.dot(t, new_pt)
    return new_pt[:2]

In [None]:
def get_3rd_point(a, b):
    direct = a - b
    return b + np.array([-direct[1], direct[0]], dtype=np.float32)

In [None]:
def get_dir(src_point, rot_rad):
    sn, cs = np.sin(rot_rad), np.cos(rot_rad)

    src_result = [0, 0]
    src_result[0] = src_point[0] * cs - src_point[1] * sn
    src_result[1] = src_point[0] * sn + src_point[1] * cs

    return src_result

In [None]:
def crop(img, center, scale, output_size, rot=0, interpolation=cv2.INTER_LINEAR):
    trans = get_affine_transform(center, scale, rot, output_size)

    dst_img = cv2.warpAffine(
        img, trans, (int(output_size[0]), int(output_size[1])),
        flags=interpolation
    )

    return dst_img

In [None]:
def calc_dists(preds, target, normalize):
    preds = preds.type(torch.float32)
    target = target.type(torch.float32)
    dists = torch.zeros((preds.shape[1], preds.shape[0])).to(preds.device)
    for n in range(preds.shape[0]):
        for c in range(preds.shape[1]):
            if target[n, c, 0] > 1 and target[n, c, 1] > 1:
                normed_preds = preds[n, c, :] / normalize[n]
                normed_targets = target[n, c, :] / normalize[n]
                # # dists[c, n] = np.linalg.norm(normed_preds - normed_targets)
                dists[c, n] = torch.norm(normed_preds - normed_targets)
            else:
                dists[c, n] = -1
    return dists

In [None]:
def dist_acc(dists, thr=0.5):
    """
    Return percentage below threshold while ignoring values with a -1
    """
    dist_cal = torch.ne(dists, -1)
    num_dist_cal = dist_cal.sum()
    if num_dist_cal > 0:
        return torch.lt(dists[dist_cal], thr).float().sum() / num_dist_cal
    else:
        return -1

In [None]:
def evaluate_pck_accuracy(output, target, hm_type='gaussian', thr=0.5):
    """
    Calculate accuracy according to PCK,
    but uses ground truth heatmap rather than x,y locations
    First value to be returned is average accuracy across 'idxs',
    followed by individual accuracies
    """
    idx = list(range(output.shape[1]))
    if hm_type == 'gaussian':
        pred, _ = get_max_preds(output)
        target, _ = get_max_preds(target)
        h = output.shape[2]
        w = output.shape[3]
        norm = torch.ones((pred.shape[0], 2)) * torch.tensor([h, w],
                                                             dtype=torch.float32) / 10  # Why they divide this by 10?
        norm = norm.to(output.device)
    else:
        raise NotImplementedError
    dists = calc_dists(pred, target, norm)

    acc = torch.zeros(len(idx)).to(dists.device)
    avg_acc = 0
    cnt = 0

    for i in range(len(idx)):
        acc[i] = dist_acc(dists[idx[i]], thr=thr)
        if acc[i] >= 0:
            avg_acc = avg_acc + acc[i]
            cnt += 1

    avg_acc = avg_acc / cnt if cnt != 0 else 0
    return acc, avg_acc, cnt, pred, target

In [None]:
def bbox_area(bbox):
    """
    Area of a bounding box (a rectangles).

    Args:
        bbox (:class:`np.ndarray`): rectangle in the form (x_min, y_min, x_max, y_max)

    Returns:
        float: Bounding box area.
    """
    x1, y1, x2, y2 = bbox

    dx = x2 - x1
    dy = y2 - y1

    return dx * dy

In [None]:
def bbox_intersection(bbox_a, bbox_b):
    """
    Intersection between two buonding boxes (two rectangles).

    Args:
        bbox_a (:class:`np.ndarray`): rectangle in the form (x_min, y_min, x_max, y_max)
        bbox_b (:class:`np.ndarray`): rectangle in the form (x_min, y_min, x_max, y_max)

    Returns:
        (:class:`np.ndarray`, float):
            Intersection limits and area.

            Format: (x_min, y_min, x_max, y_max), area
    """
    x1 = np.max((bbox_a[0], bbox_b[0]))  # Left
    x2 = np.min((bbox_a[2], bbox_b[2]))  # Right
    y1 = np.max((bbox_a[1], bbox_b[1]))  # Top
    y2 = np.min((bbox_a[3], bbox_b[3]))  # Bottom

    if x2 < x1 or y2 < y1:
        bbox_i = np.asarray([0, 0, 0, 0])
        area_i = 0
    else:
        bbox_i = np.asarray([x1, y1, x2, y2], dtype=bbox_a.dtype)
        area_i = bbox_area(bbox_i)

    return bbox_i, area_i

In [None]:
def bbox_union(bbox_a, bbox_b):
    """
    Union between two buonding boxes (two rectangles).

    Args:
        bbox_a (:class:`np.ndarray`): rectangle in the form (x_min, y_min, x_max, y_max)
        bbox_b (:class:`np.ndarray`): rectangle in the form (x_min, y_min, x_max, y_max)

    Returns:
        float: Union.
    """
    area_a = bbox_area(bbox_a)
    area_b = bbox_area(bbox_b)

    bbox_i, area_i = bbox_intersection(bbox_a, bbox_b)
    area_u = area_a + area_b - area_i

    return area_u

In [None]:
def bbox_iou(bbox_a, bbox_b):
    """
    Intersection over Union (IoU) between two buonding boxes (two rectangles).

    Args:
        bbox_a (:class:`np.ndarray`): rectangle in the form (x_min, y_min, x_max, y_max)
        bbox_b (:class:`np.ndarray`): rectangle in the form (x_min, y_min, x_max, y_max)

    Returns:
        float: Intersection over Union (IoU).
    """
    area_u = bbox_union(bbox_a, bbox_b)
    bbox_i, area_i = bbox_intersection(bbox_a, bbox_b)

    iou = area_i / area_u

    return iou

In [None]:
def oks_iou(g, d, a_g, a_d, sigmas=None, in_vis_thre=None):
    if not isinstance(sigmas, np.ndarray):
        sigmas = np.array(
            [.26, .25, .25, .35, .35, .79, .79, .72, .72, .62, .62, 1.07, 1.07, .87, .87, .89, .89]) / 10.0
    vars = (sigmas * 2) ** 2
    xg = g[:, 0]
    yg = g[:, 1]
    vg = g[:, 2]
    ious = np.zeros((d.shape[0]))
    for n_d in range(0, d.shape[0]):
        xd = d[n_d, :, 0]
        yd = d[n_d, :, 1]
        vd = d[n_d, :, 2]
        dx = xd - xg
        dy = yd - yg
        e = (dx ** 2 + dy ** 2) / vars / ((a_g + a_d[n_d]) / 2 + np.spacing(1)) / 2
        if in_vis_thre is not None:
            ind = list(vg > in_vis_thre) and list(vd > in_vis_thre)
            e = e[ind]
        ious[n_d] = np.sum(np.exp(-e)) / e.shape[0] if e.shape[0] != 0 else 0.0
    return ious

In [None]:
def compute_similarity_matrices(bboxes_a, bboxes_b, poses_a, poses_b):
    assert len(bboxes_a) == len(poses_a) and len(bboxes_b) == len(poses_b)

    result_bbox = np.zeros((len(bboxes_a), len(bboxes_b)), dtype=np.float32)
    result_pose = np.zeros((len(poses_a), len(poses_b)), dtype=np.float32)

    for i, (bbox_a, pose_a) in enumerate(zip(bboxes_a, poses_a)):
        area_bboxes_b = np.asarray([bbox_area(bbox_b) for bbox_b in bboxes_b])
        result_pose[i, :] = oks_iou(pose_a, poses_b, bbox_area(bbox_a), area_bboxes_b)
        for j, (bbox_b, pose_b) in enumerate(zip(bboxes_b, poses_b)):
            result_bbox[i, j] = bbox_iou(bbox_a, bbox_b)

    return result_bbox, result_pose

In [None]:
def find_person_id_associations(boxes, pts, prev_boxes, prev_pts, prev_person_ids, next_person_id=0,
                                pose_alpha=0.5, similarity_threshold=0.5, smoothing_alpha=0.):
    """
    Find associations between previous and current skeletons and apply temporal smoothing.
    It requires previous and current bounding boxes, skeletons, and previous person_ids.

    Args:
        boxes (:class:`np.ndarray`): current person bounding boxes
        pts (:class:`np.ndarray`): current human joints
        prev_boxes (:class:`np.ndarray`): previous person bounding boxes
        prev_pts (:class:`np.ndarray`): previous human joints
        prev_person_ids (:class:`np.ndarray`): previous person ids
        next_person_id (int): the id that will be assigned to the next novel detected person
            Default: 0
        pose_alpha (float): parameter to weight between bounding box similarity and pose (oks) similarity.
            pose_alpha * pose_similarity + (1 - pose_alpha) * bbox_similarity
            Default: 0.5
        similarity_threshold (float): lower similarity threshold to have a correct match between previous and
            current detections.
            Default: 0.5
        smoothing_alpha (float): linear temporal smoothing filter. Set 0 to disable, 1 to keep the previous detection.
            Default: 0.1

    Returns:
            (:class:`np.ndarray`, :class:`np.ndarray`, :class:`np.ndarray`):
                A list with (boxes, pts, person_ids) where boxes and pts are temporally smoothed.
    """
    bbox_similarity_matrix, pose_similarity_matrix = compute_similarity_matrices(boxes, prev_boxes, pts, prev_pts)
    similarity_matrix = pose_similarity_matrix * pose_alpha + bbox_similarity_matrix * (1 - pose_alpha)

    m = munkres.Munkres()
    assignments = np.asarray(m.compute((1 - similarity_matrix).tolist()))  # Munkres require a cost => 1 - similarity

    person_ids = np.ones(len(pts), dtype=np.int32) * -1
    for assignment in assignments:
        if similarity_matrix[assignment[0], assignment[1]] > similarity_threshold:
            person_ids[assignment[0]] = prev_person_ids[assignment[1]]
            if smoothing_alpha:
                boxes[assignment[0]] = (1 - smoothing_alpha) * boxes[assignment[0]] + \
                                       smoothing_alpha * prev_boxes[assignment[1]]
                pts[assignment[0]] = (1 - smoothing_alpha) * pts[assignment[0]] + \
                                     smoothing_alpha * prev_pts[assignment[1]]

    person_ids[person_ids == -1] = np.arange(next_person_id, next_person_id + np.sum(person_ids == -1))

    return boxes, pts, person_ids

In [None]:
def get_multi_stage_outputs(model, image,
                            with_flip=False, project2image=False, size_projected=None,
                            nof_joints=17, max_batch_size=128):
    heatmaps_avg = 0
    num_heatmaps = 0
    heatmaps = []
    tags = []

    # inference
    # outputs is a list with (default) shape
    #   [(batch, nof_joints*2, height//4, width//4), (batch, nof_joints, height//2, width//2)]
    # but it could also be (no checkpoints with this configuration)
    #   [(batch, nof_joints*2, height//4, width//4), (batch, nof_joints*2, height//2, width//2), (batch, nof_joints, height, width)]
    if len(image) <= max_batch_size:
        outputs = model(image)
    else:
        outputs = [
            torch.empty((image.shape[0], nof_joints * 2, image.shape[-2] // 4, image.shape[-1] // 4),
                        device=image.device),
            torch.empty((image.shape[0], nof_joints, image.shape[-2] // 2, image.shape[-1] // 2),
                        device=image.device)
        ]
        for i in range(0, len(image), max_batch_size):
            out = model(image[i:i + max_batch_size])
            outputs[0][i:i + max_batch_size] = out[0]
            outputs[1][i:i + max_batch_size] = out[1]

    # get higher output resolution
    higher_resolution = (outputs[-1].shape[-2], outputs[-1].shape[-1])

    for i, output in enumerate(outputs):
        if i != len(outputs) - 1:
            output = torch.nn.functional.interpolate(
                output,
                size=higher_resolution,
                mode='bilinear',
                align_corners=False
            )

        heatmaps_avg += output[:, :nof_joints]
        num_heatmaps += 1

        if output.shape[1] > nof_joints:
            tags.append(output[:, nof_joints:])

    if num_heatmaps > 0:
        heatmaps.append(heatmaps_avg / num_heatmaps)

    if with_flip:  # ToDo
        raise NotImplementedError
        # if 'coco' in cfg.DATASET.DATASET:
        #     dataset_name = 'COCO'
        # elif 'crowd_pose' in cfg.DATASET.DATASET:
        #     dataset_name = 'CROWDPOSE'
        # else:
        #     raise ValueError('Please implement flip_index for new dataset: %s.' % cfg.DATASET.DATASET)
        # flip_index = FLIP_CONFIG[dataset_name + '_WITH_CENTER'] \
        #     if cfg.DATASET.WITH_CENTER else FLIP_CONFIG[dataset_name]
        #
        # heatmaps_avg = 0
        # num_heatmaps = 0
        # outputs_flip = model(torch.flip(image, [3]))
        # for i in range(len(outputs_flip)):
        #     output = outputs_flip[i]
        #     if len(outputs_flip) > 1 and i != len(outputs_flip) - 1:
        #         output = torch.nn.functional.interpolate(
        #             output,
        #             size=(outputs_flip[-1].size(2), outputs_flip[-1].size(3)),
        #             mode='bilinear',
        #             align_corners=False
        #         )
        #     output = torch.flip(output, [3])
        #     outputs.append(output)
        #
        #     offset_feat = cfg.DATASET.NUM_JOINTS \
        #         if cfg.LOSS.WITH_HEATMAPS_LOSS[i] else 0
        #
        #     if cfg.LOSS.WITH_HEATMAPS_LOSS[i] and cfg.TEST.WITH_HEATMAPS[i]:
        #         heatmaps_avg += \
        #             output[:, :cfg.DATASET.NUM_JOINTS][:, flip_index, :, :]
        #         num_heatmaps += 1
        #
        #     if cfg.LOSS.WITH_AE_LOSS[i] and cfg.TEST.WITH_AE[i]:
        #         tags.append(output[:, offset_feat:])
        #         if cfg.MODEL.TAG_PER_JOINT:
        #             tags[-1] = tags[-1][:, flip_index, :, :]
        #
        # heatmaps.append(heatmaps_avg/num_heatmaps)

    if project2image and size_projected:
        heatmaps = [
            torch.nn.functional.interpolate(
                hms,
                size=(size_projected[1], size_projected[0]),
                mode='bilinear',
                align_corners=False
            )
            for hms in heatmaps
        ]

        tags = [
            torch.nn.functional.interpolate(
                tms,
                size=(size_projected[1], size_projected[0]),
                mode='bilinear',
                align_corners=False
            )
            for tms in tags
        ]

    return outputs, heatmaps, tags

In [None]:
def aggregate_results(scale_factor, final_heatmaps, tags_list, heatmaps, tags, with_flip=False, project2image=False):
    if scale_factor == 1:
        if final_heatmaps is not None and not project2image:
            tags = [
                torch.nn.functional.interpolate(
                    tms,
                    size=(final_heatmaps.size(2), final_heatmaps.size(3)),
                    mode='bilinear',
                    align_corners=False
                )
                for tms in tags
            ]
        for tms in tags:
            tags_list.append(torch.unsqueeze(tms, dim=4))

    heatmaps_avg = (heatmaps[0] + heatmaps[1]) / 2.0 if with_flip else heatmaps[0]

    if final_heatmaps is None:
        final_heatmaps = heatmaps_avg
    elif project2image:
        final_heatmaps += heatmaps_avg
    else:
        final_heatmaps += torch.nn.functional.interpolate(
            heatmaps_avg,
            size=(final_heatmaps.size(2), final_heatmaps.size(3)),
            mode='bilinear',
            align_corners=False
        )

    return final_heatmaps, tags_list

In [None]:
def transform_preds(coords, center, scale, output_size):
    # target_coords = np.zeros(coords.shape)
    target_coords = coords.copy()
    trans = get_affine_transform(center, scale, 0, output_size, inv=1)
    for p in range(coords.shape[0]):
        target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans)
    return target_coords

In [None]:
def resize(image, input_size, interpolation=cv2.INTER_LINEAR):
    h, w, _ = image.shape

    center = np.array([int(w / 2.0 + 0.5), int(h / 2.0 + 0.5)])
    if w < h:
        w_resized = input_size
        h_resized = int((input_size / w * h + 63) // 64 * 64)
        scale_w = w / 200.0
        scale_h = h_resized / w_resized * w / 200.0
    else:
        h_resized = input_size
        w_resized = int((input_size / h * w + 63) // 64 * 64)
        scale_h = h / 200.0
        scale_w = w_resized / h_resized * h / 200.0

    scale = np.array([scale_w, scale_h])
    trans = get_affine_transform(center, scale, 0, (w_resized, h_resized))

    image_resized = cv2.warpAffine(
        image,
        trans,
        (int(w_resized), int(h_resized)),
        flags=interpolation
    )

    return image_resized, center, scale

In [None]:
def get_multi_scale_size(image, input_size, current_scale, min_scale):
    h, w, _ = image.shape
    center = np.array([int(w / 2.0 + 0.5), int(h / 2.0 + 0.5)])

    # calculate the size for min_scale
    min_input_size = int((min_scale * input_size + 63) // 64 * 64)
    if w < h:
        w_resized = int(min_input_size * current_scale / min_scale)
        h_resized = int(
            int((min_input_size / w * h + 63) // 64 * 64) * current_scale / min_scale
        )
        scale_w = w / 200.0
        scale_h = h_resized / w_resized * w / 200.0
    else:
        h_resized = int(min_input_size * current_scale / min_scale)
        w_resized = int(
            int((min_input_size / h * w + 63) // 64 * 64) * current_scale / min_scale
        )
        scale_h = h / 200.0
        scale_w = w_resized / h_resized * h / 200.0

    return (w_resized, h_resized), center, np.array([scale_w, scale_h])

In [None]:
def resize_align_multi_scale(image, input_size, current_scale, min_scale, interpolation=cv2.INTER_LINEAR):
    size_resized, center, scale = get_multi_scale_size(
        image, input_size, current_scale, min_scale
    )
    trans = get_affine_transform(center, scale, 0, size_resized)

    image_resized = cv2.warpAffine(
        image,
        trans,
        size_resized,
        # (int(w_resized), int(h_resized)),
        flags=interpolation
    )

    return image_resized, size_resized, center, scale

In [None]:
def get_final_preds(grouped_joints, center, scale, heatmap_size):
    final_results = []
    # for each image
    for i in range(len(grouped_joints)):
        final_results.insert(i, [])
        # for each detected person
        for person in grouped_joints[i]:
            # joints = np.zeros((person.shape[0], 3))
            joints = transform_preds(person.cpu().numpy(), center, scale, heatmap_size)
            final_results[i].append(joints)

    return final_results

## modules.py

In [None]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None, bn_momentum=0.1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes, momentum=bn_momentum)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes, momentum=bn_momentum)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion, momentum=bn_momentum)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, bn_momentum=0.1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes, momentum=bn_momentum)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(inplanes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes, momentum=bn_momentum)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out

## higherHRNet.py

In [None]:
class StageModule(nn.Module):
    def __init__(self, stage, output_branches, c, bn_momentum):
        super(StageModule, self).__init__()
        self.stage = stage
        self.output_branches = output_branches

        self.branches = nn.ModuleList()
        for i in range(self.stage):
            w = c * (2 ** i)
            branch = nn.Sequential(
                BasicBlock(w, w, bn_momentum=bn_momentum),
                BasicBlock(w, w, bn_momentum=bn_momentum),
                BasicBlock(w, w, bn_momentum=bn_momentum),
                BasicBlock(w, w, bn_momentum=bn_momentum),
            )
            self.branches.append(branch)

        self.fuse_layers = nn.ModuleList()
        # for each output_branches (i.e. each branch in all cases but the very last one)
        for i in range(self.output_branches):
            self.fuse_layers.append(nn.ModuleList())
            for j in range(self.stage):  # for each branch
                if i == j:
                    self.fuse_layers[-1].append(nn.Sequential())  # Used in place of "None" because it is callable
                elif i < j:
                    self.fuse_layers[-1].append(nn.Sequential(
                        nn.Conv2d(c * (2 ** j), c * (2 ** i), kernel_size=(1, 1), stride=(1, 1), bias=False),
                        nn.BatchNorm2d(c * (2 ** i), eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
                        nn.Upsample(scale_factor=(2.0 ** (j - i)), mode='nearest'),
                    ))
                elif i > j:
                    ops = []
                    for k in range(i - j - 1):
                        ops.append(nn.Sequential(
                            nn.Conv2d(c * (2 ** j), c * (2 ** j), kernel_size=(3, 3), stride=(2, 2), padding=(1, 1),
                                      bias=False),
                            nn.BatchNorm2d(c * (2 ** j), eps=1e-05, momentum=0.1, affine=True,
                                           track_running_stats=True),
                            nn.ReLU(inplace=True),
                        ))
                    ops.append(nn.Sequential(
                        nn.Conv2d(c * (2 ** j), c * (2 ** i), kernel_size=(3, 3), stride=(2, 2), padding=(1, 1),
                                  bias=False),
                        nn.BatchNorm2d(c * (2 ** i), eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
                    ))
                    self.fuse_layers[-1].append(nn.Sequential(*ops))

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        assert len(self.branches) == len(x)

        x = [branch(b) for branch, b in zip(self.branches, x)]

        x_fused = []
        for i in range(len(self.fuse_layers)):
            for j in range(0, len(self.branches)):
                if j == 0:
                    x_fused.append(self.fuse_layers[i][0](x[0]))
                else:
                    x_fused[i] = x_fused[i] + self.fuse_layers[i][j](x[j])

        for i in range(len(x_fused)):
            x_fused[i] = self.relu(x_fused[i])

        return x_fused

In [None]:
class HigherHRNet(nn.Module):
    def __init__(self, c=48, nof_joints=17, bn_momentum=0.1):
        super(HigherHRNet, self).__init__()

        # Input (stem net)
        self.conv1 = nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        self.bn1 = nn.BatchNorm2d(64, eps=1e-05, momentum=bn_momentum, affine=True, track_running_stats=True)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        self.bn2 = nn.BatchNorm2d(64, eps=1e-05, momentum=bn_momentum, affine=True, track_running_stats=True)
        self.relu = nn.ReLU(inplace=True)

        # Stage 1 (layer1) - First group of bottleneck (resnet) modules
        downsample = nn.Sequential(
            nn.Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False),
            nn.BatchNorm2d(256, eps=1e-05, momentum=bn_momentum, affine=True, track_running_stats=True),
        )
        self.layer1 = nn.Sequential(
            Bottleneck(64, 64, downsample=downsample),
            Bottleneck(256, 64),
            Bottleneck(256, 64),
            Bottleneck(256, 64),
        )

        # Fusion layer 1 (transition1) - Creation of the first two branches (one full and one half resolution)
        self.transition1 = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(256, c, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),
                nn.BatchNorm2d(c, eps=1e-05, momentum=bn_momentum, affine=True, track_running_stats=True),
                nn.ReLU(inplace=True),
            ),
            nn.Sequential(nn.Sequential(  # Double Sequential to fit with official pretrained weights
                nn.Conv2d(256, c * (2 ** 1), kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False),
                nn.BatchNorm2d(c * (2 ** 1), eps=1e-05, momentum=bn_momentum, affine=True, track_running_stats=True),
                nn.ReLU(inplace=True),
            )),
        ])

        # Stage 2 (stage2) - Second module with 1 group of bottleneck (resnet) modules. This has 2 branches
        self.stage2 = nn.Sequential(
            StageModule(stage=2, output_branches=2, c=c, bn_momentum=bn_momentum),
        )

        # Fusion layer 2 (transition2) - Creation of the third branch (1/4 resolution)
        self.transition2 = nn.ModuleList([
            nn.Sequential(),  # None,   - Used in place of "None" because it is callable
            nn.Sequential(),  # None,   - Used in place of "None" because it is callable
            nn.Sequential(nn.Sequential(  # Double Sequential to fit with official pretrained weights
                nn.Conv2d(c * (2 ** 1), c * (2 ** 2), kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False),
                nn.BatchNorm2d(c * (2 ** 2), eps=1e-05, momentum=bn_momentum, affine=True, track_running_stats=True),
                nn.ReLU(inplace=True),
            )),  # ToDo Why the new branch derives from the "upper" branch only?
        ])

        # Stage 3 (stage3) - Third module with 4 groups of bottleneck (resnet) modules. This has 3 branches
        self.stage3 = nn.Sequential(
            StageModule(stage=3, output_branches=3, c=c, bn_momentum=bn_momentum),
            StageModule(stage=3, output_branches=3, c=c, bn_momentum=bn_momentum),
            StageModule(stage=3, output_branches=3, c=c, bn_momentum=bn_momentum),
            StageModule(stage=3, output_branches=3, c=c, bn_momentum=bn_momentum),
        )

        # Fusion layer 3 (transition3) - Creation of the fourth branch (1/8 resolution)
        self.transition3 = nn.ModuleList([
            nn.Sequential(),  # None,   - Used in place of "None" because it is callable
            nn.Sequential(),  # None,   - Used in place of "None" because it is callable
            nn.Sequential(),  # None,   - Used in place of "None" because it is callable
            nn.Sequential(nn.Sequential(  # Double Sequential to fit with official pretrained weights
                nn.Conv2d(c * (2 ** 2), c * (2 ** 3), kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False),
                nn.BatchNorm2d(c * (2 ** 3), eps=1e-05, momentum=bn_momentum, affine=True, track_running_stats=True),
                nn.ReLU(inplace=True),
            )),  # ToDo Why the new branch derives from the "upper" branch only?
        ])

        # Stage 4 (stage4) - Fourth module with 3 groups of bottleneck (resnet) modules. This has 4 branches
        self.stage4 = nn.Sequential(
            StageModule(stage=4, output_branches=4, c=c, bn_momentum=bn_momentum),
            StageModule(stage=4, output_branches=4, c=c, bn_momentum=bn_momentum),
            StageModule(stage=4, output_branches=1, c=c, bn_momentum=bn_momentum),
        )

        # New HigherHRNet section

        # Final blocks
        self.num_deconvs = 1
        self.final_layers = []

        # "We only predict tagmaps at the lowest resolution, instead of using all resolutions"
        # At the lower resolution, both heatmaps and tagmaps are predicted for every joint
        #   -> output channels are nof_joints * 2
        self.final_layers.append(nn.Conv2d(c, nof_joints * 2, kernel_size=(1, 1), stride=(1, 1)))
        for i in range(self.num_deconvs):
            self.final_layers.append(nn.Conv2d(c, nof_joints, kernel_size=(1, 1), stride=(1, 1)))

        self.final_layers = nn.ModuleList(self.final_layers)

        # Deconv layers
        self.deconv_layers = []
        input_channels = c
        for i in range(self.num_deconvs):
            if True:
                # See comment above about "nof_joints * 2" at lower resolution
                if i == 0:
                    input_channels += nof_joints * 2
                else:
                    input_channels += nof_joints
            output_channels = c

            deconv_kernel, padding, output_padding = 4, 1, 0

            layers = []
            layers.append(nn.Sequential(
                nn.ConvTranspose2d(input_channels, output_channels, kernel_size=deconv_kernel, stride=2,
                                   padding=padding, output_padding=output_padding, bias=False),
                nn.BatchNorm2d(output_channels, momentum=bn_momentum),
                nn.ReLU(inplace=True)
            ))
            for _ in range(4):
                layers.append(nn.Sequential(
                    BasicBlock(output_channels, output_channels),
                ))
            self.deconv_layers.append(nn.Sequential(*layers))
            input_channels = output_channels

        self.deconv_layers = nn.ModuleList(self.deconv_layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = [trans(x) for trans in self.transition1]  # Since now, x is a list (# == nof branches)

        x = self.stage2(x)
        # x = [trans(x[-1]) for trans in self.transition2]  # New branch derives from the "upper" branch only
        x = [
            self.transition2[0](x[0]),
            self.transition2[1](x[1]),
            self.transition2[2](x[-1])
        ]  # New branch derives from the "upper" branch only

        x = self.stage3(x)
        # x = [trans(x) for trans in self.transition3]  # New branch derives from the "upper" branch only
        x = [
            self.transition3[0](x[0]),
            self.transition3[1](x[1]),
            self.transition3[2](x[2]),
            self.transition3[3](x[-1])
        ]  # New branch derives from the "upper" branch only

        x = self.stage4(x)

        final_outputs = []
        x = x[0]
        y = self.final_layers[0](x)
        final_outputs.append(y)

        for i in range(self.num_deconvs):
            if True:
                x = torch.cat((x, y), 1)

            x = self.deconv_layers[i](x)
            y = self.final_layers[i + 1](x)
            final_outputs.append(y)

        return final_outputs

## heatMapParser.py

In [None]:
def py_max_match(scores):
    m = munkres.Munkres()
    assoc = m.compute(scores)
    assoc = np.array(assoc).astype(np.int32)
    return assoc

In [None]:
class HeatmapParser(object):
    def __init__(self,
                 num_joints=17,
                 joint_set='coco',
                 max_num_people=30,
                 nms_kernel=5, nms_stride=1, nms_padding=2,
                 detection_threshold=0.1, tag_threshold=1., use_detection_val=True, ignore_too_much=True
                 ):
        """
        Heatmap Parser running on pytorch
        """
        assert joint_set in ('coco', 'crowdpose')

        self.num_joints = num_joints
        self.joint_set = joint_set
        self.max_num_people = max_num_people
        self.tag_per_joint = True
        self.maxpool = torch.nn.MaxPool2d(nms_kernel, nms_stride, nms_padding)
        self.detection_threshold = detection_threshold
        self.tag_threshold = tag_threshold
        self.use_detection_val = use_detection_val
        self.ignore_too_much = ignore_too_much

    def nms(self, det):
        maxm = self.maxpool(det)
        maxm = torch.eq(maxm, det).float()
        det = det * maxm
        return det

    def match_by_tag_torch(self, data):
        joint_order = joints_dict()[self.joint_set]['order']

        tag_k, loc_k, val_k = data
        device = tag_k.device
        default_ = torch.zeros((self.num_joints, 3 + tag_k.shape[2]), device=device)

        loc_k = loc_k.float()
        joint_k = torch.cat((loc_k, val_k[..., None], tag_k), dim=2)  # nx30x2, nx30x1, nx30x1

        joint_dict = defaultdict(lambda: default_.clone().detach())
        tag_dict = {}
        for i in range(self.num_joints):
            idx = joint_order[i]

            tags = tag_k[idx]
            joints = joint_k[idx]
            mask = joints[:, 2] > self.detection_threshold
            tags = tags[mask]
            joints = joints[mask]

            if joints.shape[0] == 0:
                continue

            if i == 0 or len(joint_dict) == 0:
                for tag, joint in zip(tags, joints):
                    key = tag[0]
                    joint_dict[key.item()][idx] = joint
                    tag_dict[key.item()] = [tag]
            else:
                grouped_keys = list(joint_dict.keys())[:self.max_num_people]
                grouped_tags = [torch.mean(torch.as_tensor(tag_dict[i]), dim=0, keepdim=True) for i in grouped_keys]

                if self.ignore_too_much and len(grouped_keys) == self.max_num_people:
                    continue

                grouped_tags = torch.as_tensor(grouped_tags, device=device)
                if len(grouped_tags.shape) < 2:
                    grouped_tags = grouped_tags.unsqueeze(0)

                diff = joints[:, None, 3:] - grouped_tags[None, :, :]
                diff_normed = torch.norm(diff, p=2, dim=2)
                diff_saved = diff_normed.clone().detach()

                if self.use_detection_val:
                    diff_normed = torch.round(diff_normed) * 100 - joints[:, 2:3]

                num_added = diff.shape[0]
                num_grouped = diff.shape[1]

                if num_added > num_grouped:
                    diff_normed = torch.cat(
                        (diff_normed, torch.zeros((num_added, num_added - num_grouped), device=device) + 1e10),
                        dim=1
                    )

                pairs = py_max_match(diff_normed.detach().cpu().numpy())
                for row, col in pairs:
                    if (
                            row < num_added
                            and col < num_grouped
                            and diff_saved[row][col] < self.tag_threshold
                    ):
                        key = grouped_keys[col]
                        joint_dict[key][idx] = joints[row]
                        tag_dict[key].append(tags[row])
                    else:
                        key = tags[row][0].item()
                        joint_dict[key][idx] = joints[row]
                        tag_dict[key] = [tags[row]]

        # # added to correctly limit the overall number of people
        # # this shouldn't be needed if self.ignore_too_much is True
        # if len(joint_dict.keys()) > self.max_num_people:
        #     # create a dictionary with {confidence: joint_dict key}
        #     joint_confidence = {torch.mean(v[:, 2]).item(): k for k, v in joint_dict.items()}
        #     # filter joint_dict to keep the first self.max_num_people elements with higher joint confidence
        #     joint_dict = {joint_confidence[k]: joint_dict[joint_confidence[k]]
        #                   for k in sorted(joint_confidence.keys(), reverse=True)[:self.max_num_people]}

        # ret = torch.tensor([joint_dict[i] for i in joint_dict], dtype=torch.float32, device=device)
        if len(joint_dict) > 0:
            ret = torch.stack([joint_dict[i] for i in joint_dict])
        else:
            # if no people are detected, return a tensor with size 0
            size = list(default_.size())
            size.insert(0, 0)
            ret = torch.zeros(size)
        return ret

    def match_torch(self, tag_k, loc_k, val_k):
        match = lambda x: self.match_by_tag_torch(x)
        return list(map(match, zip(tag_k, loc_k, val_k)))

    def top_k_torch(self, det, tag):
        # det = torch.Tensor(det, requires_grad=False)
        # tag = torch.Tensor(tag, requires_grad=False)

        det = self.nms(det)
        num_images = det.size(0)
        num_joints = det.size(1)
        h = det.size(2)
        w = det.size(3)
        det = det.view(num_images, num_joints, -1)
        val_k, ind = det.topk(self.max_num_people, dim=2)

        tag = tag.view(tag.size(0), tag.size(1), w * h, -1)
        if not self.tag_per_joint:
            tag = tag.expand(-1, self.num_joints, -1, -1)

        tag_k = torch.stack(
            [
                torch.gather(tag[:, :, :, i], 2, ind)
                for i in range(tag.size(3))
            ],
            dim=3
        )

        # added to reduce the number of unique tags
        tag_k = (tag_k * 10).round() / 10  # ToDo parametrize this

        x = ind % w
        y = (ind // w).long()

        ind_k = torch.stack((x, y), dim=3)

        ret = {
            'tag_k': tag_k,
            'loc_k': ind_k,
            'val_k': val_k
        }

        return ret

    def adjust_torch(self, ans, det):
        for batch_id, people in enumerate(ans):
            for people_id, i in enumerate(people):
                for joint_id, joint in enumerate(i):
                    if joint[2] > 0:
                        y, x = joint[0:2]
                        xx, yy = int(x), int(y)
                        # print(batch_id, joint_id, det[batch_id].shape)
                        tmp = det[batch_id][joint_id]
                        if tmp[xx, min(yy + 1, tmp.shape[1] - 1)] > tmp[xx, max(yy - 1, 0)]:
                            y += 0.25
                        else:
                            y -= 0.25

                        if tmp[min(xx + 1, tmp.shape[0] - 1), yy] > tmp[max(0, xx - 1), yy]:
                            x += 0.25
                        else:
                            x -= 0.25
                        ans[batch_id][people_id, joint_id, 0] = y + 0.5
                        ans[batch_id][people_id, joint_id, 1] = x + 0.5
        return ans

    def refine_torch(self, det, tag, keypoints):
        """
        Given initial keypoint predictions, we identify missing joints
        :param det: torch.tensor of size (17, 128, 128)
        :param tag: torch.tensor of size (17, 128, 128) if not flip
        :param keypoints: torch.tensor of size (17, 4) if not flip, last dim is (x, y, det score, tag score)
        :return:
        """
        if len(tag.shape) == 3:
            # tag shape: (17, 128, 128, 1)
            tag = tag[:, :, :, None]

        tags = []
        for i in range(keypoints.shape[0]):
            if keypoints[i, 2] > 0:
                # save tag value of detected keypoint
                x, y = keypoints[i][:2].type(torch.int32)
                tags.append(tag[i, y, x])

        # mean tag of current detected people
        prev_tag = torch.tensor(tags, device=tag.device).mean(dim=0, keepdim=True)
        ans = []

        for i in range(keypoints.shape[0]):
            # score of joints i at all position
            tmp = det[i, :, :]
            # distance of all tag values with mean tag of current detected people
            tt = (((tag[i, :, :] - prev_tag[None, None, :]) ** 2).sum(dim=2) ** 0.5)
            tmp2 = tmp - torch.round(tt)

            def unravel_index(index, shape):
                out = []
                for dim in reversed(shape):
                    out.append(index % dim)
                    index = index // dim
                return tuple(reversed(out))

            # find maximum position
            y, x = unravel_index(torch.argmax(tmp2), tmp.shape)
            xx = x.clone().detach()
            yy = y.clone().detach()
            x = x.float()
            y = y.float()
            # detection score at maximum position
            val = tmp[yy, xx]
            # offset by 0.5
            x += 0.5
            y += 0.5

            # add a quarter offset
            if tmp[yy, min(xx + 1, tmp.shape[1] - 1)] > tmp[yy, max(xx - 1, 0)]:
                x += 0.25
            else:
                x -= 0.25

            if tmp[min(yy + 1, tmp.shape[0] - 1), xx] > tmp[max(0, yy - 1), xx]:
                y += 0.25
            else:
                y -= 0.25

            ans.append((x, y, val))
        ans = torch.tensor(ans)

        if ans is not None:
            for i in range(det.shape[0]):
                # add keypoint if it is not detected
                if ans[i, 2] > 0 and keypoints[i, 2] == 0:
                    # if ans[i, 2] > 0.01 and keypoints[i, 2] == 0:
                    keypoints[i, :2] = ans[i, :2]
                    keypoints[i, 2] = ans[i, 2]

        return keypoints

    def parse(self, det, tag, adjust=True, refine=True):
        ans = self.match_torch(**self.top_k_torch(det, tag))

        if adjust:
            ans = self.adjust_torch(ans, det)

        scores = [i[:, 2].mean() for i in ans[0]]

        if refine:
            # for each image
            for i in range(len(ans)):
                # for each detected person
                for j in range(len(ans[i])):
                    det_ = det[i]
                    tag_ = tag[i]
                    ans_ = ans[i][j]
                    if not self.tag_per_joint:
                        tag_ = torch.repeat(tag_, (self.num_joints, 1, 1, 1))
                    ans[i][j] = self.refine_torch(det_, tag_, ans_)
            # after this refinement step, there may be multiple detections with almost identical keypoints...
            # an attempt to aggregate them is done afterwards in SimpleHigherHRNet

        return ans, scores

# tensorrt.py

In [None]:
from collections import OrderedDict, namedtuple
import tensorrt as trt


def torch_device_from_trt(device):
    if device == trt.TensorLocation.DEVICE:
        return torch.device("cuda")
    elif device == trt.TensorLocation.HOST:
        return torch.device("cpu")
    else:
        return TypeError("%s is not supported by torch" % device)


def torch_dtype_from_trt(dtype):
    if dtype == trt.int8:
        return torch.int8
    elif trt.__version__ >= '7.0' and dtype == trt.bool:
        return torch.bool
    elif dtype == trt.int32:
        return torch.int32
    elif dtype == trt.float16:
        return torch.float16
    elif dtype == trt.float32:
        return torch.float32
    else:
        raise TypeError("%s is not supported by torch" % dtype)


class TRTModule_HigherHRNet(torch.nn.Module):
    """
    TensorRT wrapper for HigherHRNet.
    Args:
        path (str): Path to the .engine file for trt inference.
        device (:class:`torch.device` or str): The cuda device to be used (cpu not supported)
    """

    def __init__(self, path=None, device=None):
        super(TRTModule_HigherHRNet, self).__init__()

        logger = trt.Logger(trt.Logger.INFO)

        with open(path, 'rb') as f, trt.Runtime(logger) as runtime:
            self.engine = runtime.deserialize_cuda_engine(f.read())
        if self.engine is not None:
            self.context = self.engine.create_execution_context()

        self.input_names = ['images']
        self.output_names = []
        self.input_flattener = None
        self.output_flattener = None

        Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr'))

        self.bindings = OrderedDict()
        fp16 = False  # default updated below
        dynamic = False
        for i in range(self.engine.num_bindings):
            name = self.engine.get_binding_name(i)
            dtype = trt.nptype(self.engine.get_binding_dtype(i))

            if self.engine.binding_is_input(i):
                if -1 in tuple(self.engine.get_binding_shape(i)):  # dynamic
                    dynamic = True
                    self.context.set_binding_shape(i, tuple(self.engine.get_profile_shape(0, i)[2]))
                if dtype == np.float16:
                    fp16 = True
            else:
                self.output_names.append(name)

            shape = tuple(self.context.get_binding_shape(i))
            im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device)

            self.bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr()))

        self.binding_addrs = OrderedDict((n, d.ptr) for n, d in self.bindings.items())
        self.batch_size = self.bindings['images'].shape[0]

    def forward(self, *inputs):
        """Forward of the model. For more details, please refer to models.higherhrnet.HigherHRNet.forward ."""
        bindings = [None] * (len(self.input_names) + len(self.output_names))

        if self.input_flattener is not None:
            inputs = self.input_flattener.flatten(inputs)

        for i, input_name in enumerate(self.input_names):
            idx = self.engine.get_binding_index(input_name)
            shape = tuple(inputs[i].shape)
            bindings[idx] = inputs[i].contiguous().data_ptr()
            self.context.set_binding_shape(idx, shape)

        # create output tensors
        outputs = [None] * len(self.output_names)
        for i, output_name in enumerate(self.output_names):
            idx = self.engine.get_binding_index(output_name)
            dtype = torch_dtype_from_trt(self.engine.get_binding_dtype(idx))
            shape = tuple(self.context.get_binding_shape(idx))
            device = torch_device_from_trt(self.engine.get_location(idx))
            output = torch.empty(size=shape, dtype=dtype, device=device)
            outputs[i] = output
            bindings[idx] = output.data_ptr()

        self.context.execute_async_v2(
            bindings, torch.cuda.current_stream().cuda_stream
        )

        if self.output_flattener is not None:
            outputs = self.output_flattener.unflatten(outputs)
        else:
            outputs = tuple(outputs)
            if len(outputs) == 1:
                outputs = outputs[0]

        return outputs

    def enable_profiling(self):
        if not self.context.profiler:
            self.context.profiler = trt.Profiler()

## simpleHRNet.py

In [None]:
class SimpleHigherHRNet:
    """
    SimpleHigherHRNet class.

    The class provides a simple and customizable method to load the HigherHRNet network, load the official pre-trained
    weights, and predict the human pose on single images or a batch of images.
    """

    def __init__(self,
                 c,
                 nof_joints,
                 checkpoint_path,
                 model_name='HigherHRNet',
                 resolution=512,
                 interpolation=cv2.INTER_LINEAR,
                 return_heatmaps=False,
                 return_bounding_boxes=False,
                 filter_redundant_poses=True,
                 max_nof_people=30,
                 max_batch_size=32,
                 device=torch.device("cpu"),
                 enable_tensorrt=False):
        """
        Initializes a new SimpleHigherHRNet object.
        HigherHRNet is initialized on the torch.device("device") and
        its pre-trained weights will be loaded from disk.

        Args:
            c (int): number of channels (when using HigherHRNet model).
            nof_joints (int): number of joints.
            checkpoint_path (str): path to an official higherhrnet checkpoint.
            model_name (str): model name (just HigherHRNet at the moment).
                Valid names for HigherHRNet are: `HigherHRNet`, `higherhrnet`
                Default: "HigherHRNet"
            resolution (int): higherhrnet input resolution - format: int == min(width, height).
                Default: 512
            interpolation (int): opencv interpolation algorithm.
                Default: cv2.INTER_LINEAR
            return_heatmaps (bool): if True, heatmaps will be returned along with poses by self.predict.
                Default: False
            return_bounding_boxes (bool): if True, bounding boxes will be returned along with poses by self.predict.
                Default: False
            filter_redundant_poses (bool): if True, redundant poses (poses being almost identical) are filtered out.
                Default: True
            max_nof_people (int): maximum number of detectable people.
                Default: 30
            max_batch_size (int): maximum batch size used in higherhrnet inference.
                Useless without multiperson=True.
                Default: 16
            device (:class:`torch.device` or str): the higherhrnet (and yolo) inference will be run on this device.
                Default: torch.device("cpu")
            enable_tensorrt (bool): Enables tensorrt inference for HigherHRnet.
                If enabled, a `.engine` file is expected as `checkpoint_path`.
                Default: False
        """

        self.c = c
        self.nof_joints = nof_joints
        self.checkpoint_path = checkpoint_path
        self.model_name = model_name
        self.resolution = resolution
        self.interpolation = interpolation
        self.return_heatmaps = return_heatmaps
        self.return_bounding_boxes = return_bounding_boxes
        self.filter_redundant_poses = filter_redundant_poses
        self.max_nof_people = max_nof_people
        self.max_batch_size = max_batch_size
        self.device = device
        self.enable_tensorrt = enable_tensorrt

        # assert nof_joints in (14, 15, 17)
        if self.nof_joints == 14:
            self.joint_set = 'crowdpose'
        elif self.nof_joints == 15:
            self.joint_set = 'mpii'
        elif self.nof_joints == 17:
            self.joint_set = 'coco'
        else:
            raise ValueError('Wrong number of joints.')

        if model_name in ('HigherHRNet', 'higherhrnet'):
            self.model = HigherHRNet(c=c, nof_joints=nof_joints)
        else:
            raise ValueError('Wrong model name.')

        if not self.enable_tensorrt:
            checkpoint = torch.load(checkpoint_path, map_location=self.device)
            if 'model' in checkpoint:
                checkpoint = checkpoint['model']
            # fix issue with official high-resolution weights
            checkpoint = OrderedDict([(k[2:] if k[:2] == '1.' else k, v) for k, v in checkpoint.items()])
            self.model.load_state_dict(checkpoint)
            if 'cuda' in str(self.device):
                print("device: 'cuda' - ", end="")

                if 'cuda' == str(self.device):
                    # if device is set to 'cuda', all available GPUs will be used
                    print("%d GPU(s) will be used" % torch.cuda.device_count())
                    device_ids = None
                else:
                    # if device is set to 'cuda:IDS', only that/those device(s) will be used
                    print("GPU(s) '%s' will be used" % str(self.device))
                    device_ids = [int(x) for x in str(self.device)[5:].split(',')]

                self.model = torch.nn.DataParallel(self.model, device_ids=device_ids)

            elif 'cpu' == str(self.device):
                print("device: 'cpu'")
            else:
                raise ValueError('Wrong device name.')

            self.model = self.model.to(device)
            self.model.eval()
        else:
            if device.type == 'cpu':
                raise ValueError('TensorRT does not support cpu device.')
            # self.model = TRTModule_HigherHRNet(path=checkpoint_path, device=self.device)

        self.output_parser = HeatmapParser(num_joints=self.nof_joints,
                                           joint_set=self.joint_set,
                                           max_num_people=self.max_nof_people,
                                           ignore_too_much=True,
                                           detection_threshold=0.3)

        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def predict(self, image):
        """
        Predicts the human pose on a single image or a stack of n images.

        Args:
            image (:class:`np.ndarray`):
                the image(s) on which the human pose will be estimated.

                image is expected to be in the opencv format.
                image can be:
                    - a single image with shape=(height, width, BGR color channel)
                    - a stack of n images with shape=(n, height, width, BGR color channel)

        Returns:
            :class:`np.ndarray` or list:
                a numpy array containing human joints for each (detected) person.

                Format:
                    if image is a single image:
                        shape=(# of people, # of joints (nof_joints), 3);  dtype=(np.float32).
                    if image is a stack of n images:
                        list of n np.ndarrays with
                        shape=(# of people, # of joints (nof_joints), 3);  dtype=(np.float32).

                Each joint has 3 values: (y position, x position, joint confidence).

                If self.return_heatmaps, the class returns a list with (heatmaps, human joints)
                If self.return_bounding_boxes, the class returns a list with (bounding boxes, human joints)
                If self.return_heatmaps and self.return_bounding_boxes, the class returns a list with
                    (heatmaps, bounding boxes, human joints)
        """
        if len(image.shape) == 3:
            return self._predict_single(image)
        elif len(image.shape) == 4:
            return self._predict_batch(image)
        else:
            raise ValueError('Wrong image format.')

    def _predict_single(self, image):
        ret = self._predict_batch(image[None, ...])
        if len(ret) > 1:  # heatmaps and/or bboxes and joints
            ret = [r[0] for r in ret]
        else:  # joints only
            ret = ret[0]
        return ret

    def _predict_batch(self, image):
        with torch.no_grad():

            heatmaps_list = None
            tags_list = []

            # scales and base (size, center, scale)
            scales = (1,)  # ToDo add support to multiple scales

            scales = sorted(scales, reverse=True)
            base_size, base_center, base_scale = get_multi_scale_size(
                image[0], self.resolution, 1, 1
            )

            # for each scale (at the moment, just one scale)
            for idx, scale in enumerate(scales):
                # rescale image, convert to tensor, move to device
                images = list()
                for img in image:
                    image, size_resized, _, _ = resize_align_multi_scale(
                        img, self.resolution, scale, min(scales), interpolation=self.interpolation
                    )
                    image = self.transform(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)).unsqueeze(dim=0)
                    image = image.to(self.device)
                    images.append(image)
                images = torch.cat(images)

                # inference
                # output: list of HigherHRNet outputs (heatmaps)
                # avg_heatmaps: averaged heatmaps
                # tags: per-pixel identity ids.
                #       See Newell et al., Associative Embedding: End-to-End Learning for Joint Detection and
                #           Grouping, NIPS 2017. https://arxiv.org/abs/1611.05424 or
                #           http://papers.nips.cc/paper/6822-associative-embedding-end-to-end-learning-for-joint-detection-and-grouping
                outputs, heatmaps, tags = get_multi_stage_outputs(
                    self.model, images, with_flip=False, project2image=True, size_projected=size_resized,
                    nof_joints=self.nof_joints, max_batch_size=self.max_batch_size
                )

                # aggregate the multiple heatmaps and tags
                heatmaps_list, tags_list = aggregate_results(
                    scale, heatmaps_list, tags_list, heatmaps, tags, with_flip=False, project2image=True
                )

            heatmaps = heatmaps_list.float() / len(scales)
            tags = torch.cat(tags_list, dim=4)

            # refine prediction
            # grouped has the shape (people, joints, 4) -> 4: (x, y, confidence, tag)
            # scores has the shape (people, ) and corresponds to the person confidence before refinement
            grouped, scores = self.output_parser.parse(
                heatmaps, tags, adjust=True, refine=True  # ToDo parametrize these two parameters
            )

            # get final predictions
            final_results = get_final_preds(
                grouped, base_center, base_scale, [heatmaps.shape[3], heatmaps.shape[2]]
            )

            if self.filter_redundant_poses:
                # filter redundant poses - this step filters out poses whose joints have, on average, a difference
                #   lower than 3 pixels
                # this is useful when refine=True in self.output_parser.parse because that step joins together
                #   skeleton parts belonging to the same people (but then it does not remove redundant skeletons)
                final_pts = []
                # for each image
                for i in range(len(final_results)):
                    final_pts.insert(i, list())
                    # for each person
                    for pts in final_results[i]:
                        if len(final_pts[i]) > 0:
                            diff = np.mean(np.abs(np.array(final_pts[i])[..., :2] - pts[..., :2]), axis=(1, 2))
                            if np.any(diff < 3):  # average diff between this pose and another one is less than 3 pixels
                                continue
                        final_pts[i].append(pts)
                final_results = final_pts

            pts = []
            boxes = []
            for i in range(len(final_results)):
                pts.insert(i, np.asarray(final_results[i]))
                if len(pts[i]) > 0:
                    pts[i][..., [0, 1]] = pts[i][..., [1, 0]]  # restoring (y, x) order as in SimpleHRNet
                    pts[i] = pts[i][..., :3]

                    if self.return_bounding_boxes:
                        left_top = np.min(pts[i][..., 0:2], axis=1)
                        right_bottom = np.max(pts[i][..., 0:2], axis=1)
                        # [x1, y1, x2, y2]
                        boxes.insert(i, np.stack(
                            [left_top[:, 1], left_top[:, 0], right_bottom[:, 1], right_bottom[:, 0]], axis=-1
                        ))
                else:
                    boxes.insert(i, [])

        res = list()
        if self.return_heatmaps:
            res.append(heatmaps)
        if self.return_bounding_boxes:
            res.append(boxes)
        res.append(pts)

        if len(res) > 1:
            return res
        else:
            return res[0]


# if __name__ == '__main__':
#     hhrnet = SimpleHigherHRNet(
#         c=32, nof_joints=17, checkpoint_path='./weights/pose_higher_hrnet_w32_512.pth',
#         resolution=512, device='cpu'
#     )
#     # img = np.ones((384, 256, 3), dtype=np.uint8)

#     import cv2
#     img = cv2.imread('./sample.jpg', cv2.IMREAD_ANYCOLOR)

#     hhrnet.predict(img)

## main.py

In [None]:
import cv2

cap = cv2.VideoCapture("/content/drive/MyDrive/pose_estimation/clip5.mov")
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(length)

116


In [None]:
# live-demo.py
import os
sys.path.insert(1, os.getcwd())

def main(camera_id, filename, hrnet_c, hrnet_j, hrnet_weights, hrnet_joints_set, image_resolution, disable_tracking,
         max_nof_people, max_batch_size, disable_vidgear, save_video, video_format, video_framerate, device,
         extract_pts, enable_tensorrt):
  device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # if device is not None:
    #     device = torch.device(device)
    # else:
    #     if torch.cuda.is_available():
    #         torch.backends.cudnn.deterministic = True
    #         device = torch.device('cuda:0')
    #     else:
    #         device = torch.device('cpu')

  print(device)

  has_display = 'DISPLAY' in os.environ.keys() #or sys.platform == 'darwin'
  video_writer = None

  if filename is not None:
      video = cv2.VideoCapture(filename)
      assert video.isOpened()
  else:
      print('Video not found')

  model = SimpleHigherHRNet(
      hrnet_c,
      hrnet_j,
      hrnet_weights,
      resolution=image_resolution,
      return_bounding_boxes=not disable_tracking,
      max_nof_people=max_nof_people,
      max_batch_size=max_batch_size,
      device=device,
      enable_tensorrt=enable_tensorrt
  )

  frame_count = 0
  pts_dict = {}
  t1 = time.time()
  while True:
    t = time.time()
    frame_count += 1
    ret, frame = video.read()

    if(ret and frame is not None):
      # print(frame_count)
      pts = model.predict(frame)
      print(pts)
    #   if not disable_tracking:
    #       boxes, pts1 = pts
    #   if not disable_tracking:
    #       if len(pts) > 0:
    #           if prev_pts is None and prev_person_ids is None:
    #               person_ids = np.arange(next_person_id, len(pts1) + next_person_id, dtype=np.int32)
    #               next_person_id = len(pts1) + 1
    #           else:
    #               boxes, pts1, person_ids = find_person_id_associations(
    #                   boxes=boxes, pts=pts, prev_boxes=prev_boxes, prev_pts=prev_pts, prev_person_ids=prev_person_ids,
    #                   next_person_id=next_person_id, pose_alpha=0.2, similarity_threshold=0.4, smoothing_alpha=0.1,
    #               )
    #               next_person_id = max(next_person_id, np.max(person_ids) + 1)
    #       else:
    #           person_ids = np.array((), dtype=np.int32)

    #       prev_boxes = boxes.copy()
    #       prev_pts = pts1.copy()
    #       prev_person_ids = person_ids

    #   else:
    #       person_ids = np.arange(len(pts), dtype=np.int32)

    #   for i, (pt, pid) in enumerate(zip(pts, person_ids)):
    #       # print(i,pt,pid)
    #       frame = draw_points_and_skeleton(frame, pt, joints_dict()[hrnet_joints_set]['skeleton'], person_index=pid,
    #                                         points_color_palette='gist_rainbow', skeleton_color_palette='jet',
    #                                         points_palette_samples=10)
    #       if extract_pts:
    #           pts_dict[frame_count] = (pt[:],pid)
    # if not ret:
    #   break

  # print(pts.shape)

  # print(len(pts_dict),len(pt))


  # with open("sample.json", "w") as outfile:
  #     json.dump(pts_dict, outfile)

  # json.dump(pts_dict, codecs.open("sample.json", 'w', encoding='utf-8'),
  #         separators=(',', ':'),
  #         sort_keys=True,
  #         indent=4) ### this saves the array in .json format
  # print(pts_dict)
  # import json

  # file = open('content1.txt','w+')
  # x = str(pts_dict)
  # file.write(x)
  # file.close()


  # with open('data.json', 'w') as fp:
  #   json.dump(pts_dict, fp, sort_keys=True, indent=4)

  t2 = time.time()
  print('\nTime elapsed == ', t2 - t1)

  # if extract_pts:
  #     np.savez_compressed("output_pts", pts_dict)
  # if save_video:
  #     video_writer.release()

In [None]:
main(0,
     '/content/drive/MyDrive/pose_estimation/clip5.mov',
     32,17,
     '/content/drive/MyDrive/pose_estimation/pose_higher_hrnet_w32_512.pth',
     'coco',
     512,True,30,16,True,True,
     'MJPG',30,'cuda',True,False)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  [1.60927734e+03 1.50416016e+03 8.52503777e-02]]

 [[6.65332031e+02 1.44966797e+03 2.36589611e-01]
  [6.54785156e+02 1.45669922e+03 2.60977864e-01]
  [6.61816406e+02 1.44439453e+03 2.27561265e-01]
  [6.51269531e+02 1.47076172e+03 2.72644639e-01]
  [6.49511719e+02 1.44615234e+03 4.72829416e-02]
  [6.81152344e+02 1.49185547e+03 8.09499025e-01]
  [6.56542969e+02 1.45494141e+03 5.37024438e-01]
  [7.40917969e+02 1.46900391e+03 6.53678060e-01]
  [6.10839844e+02 1.45142578e+03 7.81689435e-02]
  [7.90136719e+02 1.46197266e+03 2.27583110e-01]
  [8.23535156e+02 1.47427734e+03 1.01772219e-01]
  [7.37402344e+02 1.55162109e+03 7.64564633e-01]
  [7.30371094e+02 1.51646484e+03 5.58500409e-01]
  [8.05957031e+02 1.49712891e+03 8.49026322e-01]
  [7.98925781e+02 1.46548828e+03 6.05640173e-01]
  [9.21972656e+02 1.50064453e+03 8.28598082e-01]
  [8.88574219e+02 1.52349609e+03 6.70340002e-01]]

 [[3.15527344e+02 1.34771484e+03 5.75023144e-02]
