In [1]:
# top_down_video_demo_full_frame_without_det.py
import os
from argparse import ArgumentParser

import cv2
import numpy as np

from mmpose.apis import (vis_pose_result)
import json

In [2]:
#inference.py
import os

import cv2
import mmcv
import numpy as np
import torch
from mmcv.parallel import collate, scatter
from mmcv.runner import load_checkpoint

from mmpose.core.post_processing import oks_nms
from mmpose.datasets.pipelines import Compose
from mmpose.models import build_posenet
from mmpose.utils.hooks import OutputHook

In [3]:
#top_down.py
import math
import warnings

import cv2
import mmcv
import numpy as np
from mmcv.image import imwrite
from mmcv.visualization.image import imshow

# from .. import builder
# from ..registry import POSENETS
# from .base import BasePose

try:
    from mmcv.runner import auto_fp16
except ImportError:
    warnings.warn('auto_fp16 from mmpose will be deprecated from v0.15.0'
                  'Please install mmcv>=1.1.4')
    from mmpose.core import auto_fp16

## Inference on ideos 

In [4]:
def init_pose_model(config, checkpoint=None, device='cuda:0'):
    """Initialize a pose model from config file.

    Args:
        config (str or :obj:`mmcv.Config`): Config file path or the config
            object.
        checkpoint (str, optional): Checkpoint path. If left as None, the model
            will not load any weights.

    Returns:
        nn.Module: The constructed detector.
    """
    if isinstance(config, str):
        config = mmcv.Config.fromfile(config)
    elif not isinstance(config, mmcv.Config):
        raise TypeError('config must be a filename or Config object, '
                        f'but got {type(config)}')
    config.model.pretrained = None
    model = build_posenet(config.model)
    if checkpoint is not None:
        # load model checkpoint
        load_checkpoint(model, checkpoint, map_location=device)
    # save the config in the model for convenience
    model.cfg = config
    model.to(device)
    model.eval()
    return model

In [5]:
def _xyxy2xywh(bbox_xyxy):
    """Transform the bbox format from x1y1x2y2 to xywh.

    Args:
        bbox_xyxy (np.ndarray): Bounding boxes (with scores), shaped (n, 4) or
            (n, 5). (left, top, right, bottom, [score])

    Returns:
        np.ndarray: Bounding boxes (with scores),
          shaped (n, 4) or (n, 5). (left, top, width, height, [score])
    """
    bbox_xywh = bbox_xyxy.copy()
    bbox_xywh[:, 2] = bbox_xywh[:, 2] - bbox_xywh[:, 0] + 1
    bbox_xywh[:, 3] = bbox_xywh[:, 3] - bbox_xywh[:, 1] + 1

    return bbox_xywh

In [6]:
class LoadImage:
    """A simple pipeline to load image."""

    def __init__(self, color_type='color', channel_order='rgb'):
        self.color_type = color_type
        self.channel_order = channel_order

    def __call__(self, results):
        """Call function to load images into results.

        Args:
            results (dict): A result dict contains the img_or_path.

        Returns:
            dict: ``results`` will be returned containing loaded image.
        """
        if isinstance(results['img_or_path'], str):
            results['image_file'] = results['img_or_path']
            img = mmcv.imread(results['img_or_path'], self.color_type,
                              self.channel_order)
        elif isinstance(results['img_or_path'], np.ndarray):
            results['image_file'] = ''
            if self.color_type == 'color' and self.channel_order == 'rgb':
                img = cv2.cvtColor(results['img_or_path'], cv2.COLOR_BGR2RGB)
        else:
            raise TypeError('"img_or_path" must be a numpy array or a str or '
                            'a pathlib.Path object')

        results['img'] = img
        return results

In [7]:
def _box2cs(cfg, box):
    """This encodes bbox(x,y,w,h) into (center, scale)

    Args:
        x, y, w, h

    Returns:
        tuple: A tuple containing center and scale.

        - np.ndarray[float32](2,): Center of the bbox (x, y).
        - np.ndarray[float32](2,): Scale of the bbox w & h.
    """

    x, y, w, h = box[:4]
    input_size = cfg.data_cfg['image_size']
    aspect_ratio = input_size[0] / input_size[1]
    center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)

    if w > aspect_ratio * h:
        h = w * 1.0 / aspect_ratio
    elif w < aspect_ratio * h:
        w = h * aspect_ratio

    # pixel std is 200.0
    scale = np.array([w / 200.0, h / 200.0], dtype=np.float32)

    scale = scale * 1.25

    return center, scale

In [8]:
def _inference_single_pose_model(model,
                                 img_or_path,
                                 bboxes,
                                 dataset,
                                 return_heatmap=False):

    cfg = model.cfg
    device = next(model.parameters()).device

    # build the data pipeline
    channel_order = cfg.test_pipeline[0].get('channel_order', 'rgb')
    test_pipeline = [LoadImage(channel_order=channel_order)
                     ] + cfg.test_pipeline[1:]
    test_pipeline = Compose(test_pipeline)

    assert len(bboxes[0]) in [4, 5]

    flip_pairs = None
   
    if dataset in 'AnimalHorse10Dataset':
        flip_pairs = []
        
    else:
        raise NotImplementedError()

    batch_data = []
    for bbox in bboxes:
        center, scale = _box2cs(cfg, bbox)

        # prepare data
        data = {
            'img_or_path':
            img_or_path,
            'center':
            center,
            'scale':
            scale,
            'bbox_score':
            bbox[4] if len(bbox) == 5 else 1,
            'bbox_id':
            0,  # need to be assigned if batch_size > 1
            'dataset':
            dataset,
            'joints_3d':
            np.zeros((cfg.data_cfg.num_joints, 3), dtype=np.float32),
            'joints_3d_visible':
            np.zeros((cfg.data_cfg.num_joints, 3), dtype=np.float32),
            'rotation':
            0,
            'ann_info': {
                'image_size': np.array(cfg.data_cfg['image_size']),
                'num_joints': cfg.data_cfg['num_joints'],
                'flip_pairs': flip_pairs
            }
        }
        data = test_pipeline(data)
        batch_data.append(data)

    batch_data = collate(batch_data, samples_per_gpu=1)

    if next(model.parameters()).is_cuda:
        # scatter not work so just move image to cuda device
        batch_data['img'] = batch_data['img'].to(device)
    # get all img_metas of each bounding box
    batch_data['img_metas'] = [
        img_metas[0] for img_metas in batch_data['img_metas'].data
    ]

    # forward the model
    with torch.no_grad():
        result = model(
            img=batch_data['img'],
            img_metas=batch_data['img_metas'],
            return_loss=False,
            return_heatmap=return_heatmap)

    return result['preds'], result['output_heatmap']

In [9]:
def inference_top_down_pose_model(model,
                                  img_or_path,
                                  person_results,
                                  bbox_thr=None,
                                  format='xywh',
                                  dataset='TopDownCocoDataset',
                                  return_heatmap=False,
                                  outputs=None):

    # only two kinds of bbox format is supported.
    assert format in ['xyxy', 'xywh']

    pose_results = []
    returned_outputs = []

    if len(person_results) == 0:
        return pose_results, returned_outputs

    # Change for-loop preprocess each bbox to preprocess all bboxes at once.
    bboxes = np.array([box['bbox'] for box in person_results])

    # Select bboxes by score threshold
    if bbox_thr is not None:
        assert bboxes.shape[1] == 5
        valid_idx = np.where(bboxes[:, 4] > bbox_thr)[0]
        bboxes = bboxes[valid_idx]
        person_results = [person_results[i] for i in valid_idx]

    if format == 'xyxy':
        bboxes_xyxy = bboxes
        bboxes_xywh = _xyxy2xywh(bboxes)
    else:
        # format is already 'xywh'
        bboxes_xywh = bboxes
        bboxes_xyxy = _xywh2xyxy(bboxes)

    # if bbox_thr remove all bounding box
    if len(bboxes_xywh) == 0:
        return [], []
    print('bboxes', bboxes_xywh)
    with OutputHook(model, outputs=outputs, as_tensor=False) as h:
        # poses is results['pred'] # N x 17x 3
        poses, heatmap = _inference_single_pose_model(
            model,
            img_or_path,
            bboxes_xywh,
            dataset,
            return_heatmap=return_heatmap)

        if return_heatmap:
            h.layer_outputs['heatmap'] = heatmap

        returned_outputs.append(h.layer_outputs)

    assert len(poses) == len(person_results), print(
        len(poses), len(person_results), len(bboxes_xyxy))
    for pose, person_result, bbox_xyxy in zip(poses, person_results,
                                              bboxes_xyxy):
        pose_result = person_result.copy()
        pose_result['keypoints'] = pose
        pose_result['bbox'] = bbox_xyxy
        pose_results.append(pose_result)

    return pose_results, returned_outputs

In [10]:
cattle_part = 'leg_back'
# set arguments
args = {'device':'cuda:0', 'out_video_root': 'inference_result/video_result/cattle_'+cattle_part, 
       'pose_checkpoint': 'temp_logs/cattle_'+cattle_part+'/resnet/best.pth',
       'pose_config': 'myConfigs/train_'+cattle_part+'_resnet.py',
       'show':False, 'video_path': 'data/demo_videos/cattle_single_2.mov', 
       'kpt_thr': 0.6}
print(args['video_path'])
temp_vpath = args['video_path']
print(f'vis_{os.path.basename(temp_vpath)}')

data/demo_videos/cattle_single_2.mov
vis_cattle_single_2.mov


In [11]:
pose_model = init_pose_model(
        args['pose_config'], args['pose_checkpoint'], device=args['device'].lower())
temp_vpath = args['video_path']
dataset = pose_model.cfg.data['test']['type']
print('dataset', dataset)
cap = cv2.VideoCapture(temp_vpath)
assert cap.isOpened(), f'Faild to load video file {temp_vpath}'
print('cap', cap)
fps = cap.get(5)
print('fps', fps)
size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
        int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
print('frame size', size)
if args['out_video_root'] == '':
    save_out_video = False
else:
    os.makedirs(args['out_video_root'], exist_ok=True)
    save_out_video = True

if save_out_video:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    videoWriter = cv2.VideoWriter(
        os.path.join(args['out_video_root'],
                     f'vis_{os.path.basename(temp_vpath)}'), fourcc,
        fps, size)

# optional
return_heatmap = False

# e.g. use ('backbone', ) to return backbone feature
output_layer_names = None

pose_res_json = {'result': []}
pose_res_list = []
img_idx = 0

Use load_from_local loader
dataset AnimalHorse10Dataset
cap <VideoCapture 0x7f1e21c33b10>
fps 30.00598921940507
frame size (1920, 1080)


In [12]:
while (cap.isOpened()):
    flag, img = cap.read()
    if not flag:
        break
    # current frame image pose result
    temp_img_pose = {'id': img_idx, 'keypoints': []}
    # keep the person class bounding boxes.
    person_results = [{'bbox': np.array([0, 0, size[0], size[1]])}]

    # test a single image, with a list of bboxes.
    pose_results, returned_outputs = inference_top_down_pose_model(
        pose_model,
        img,
        person_results,
        format='xyxy',
        dataset=dataset,
        return_heatmap=return_heatmap,
        outputs=output_layer_names)
    # update data
    temp_img_pose['id'] = img_idx
    temp_img_pose['keypoints'] = pose_results[0]['keypoints'].tolist()
    if img_idx % 100 == 0:
        print('complete ', img_idx, ' frames.')
    img_idx+=1

    # show the results
    vis_img = vis_pose_result(
        pose_model,
        img,
        pose_results,
        dataset=dataset,
        kpt_score_thr=args['kpt_thr'],
        show=False)

    if args['show']:
        cv2.imshow('Image', vis_img)

    if save_out_video:
        videoWriter.write(vis_img)
    # save current pose result
    pose_res_list.append(temp_img_pose)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
if save_out_video:
    videoWriter.release()
cv2.destroyAllWindows()
# update pose result list
print('frames count: ', len(pose_res_list))
pose_res_json['result'] = pose_res_list
# save json data
with open('./video_'+cattle_part+'.json', 'w') as json_file:
    json.dump(pose_res_json, json_file)

bboxes [[   0    0 1921 1081]]


AttributeError: 'ConfigDict' object has no attribute 'test_pipeline'

## Visualize Result