This notebook is an example how to pipeline two models. A video stream from a local camera is processed by the person detection model. The person detection results are then processed by the pose detection model, one person bbox at a time.
Combined result is then displayed.
OpenCV is required to run this sample.

In [11]:
import degirum as dg # import DeGirum PySDK
import cv2 # OpenCV

In [12]:
# connect to default model zoo
zoo = dg.connect_model_zoo()

In [13]:
# Some helper functions

def show(img, capt = "<image>"):
    # show opencv image
    cv2.imshow(capt, img)
    key = cv2.waitKey(1) & 0xFF
    if key == ord('x') or key == ord('q'):
        raise KeyboardInterrupt
    
def crop(img, bbox):
    # crop opencv image to given bbox
    return img[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]

In [14]:
# load models for DeGirum Orca AI accelerator
# (change model name to "...n2x_cpu_1" to run it on CPU)
people_det_model = zoo.load_model("yolo_v5s_person_det--512x512_quant_n2x_orca_1")
pose_model = zoo.load_model("mobilenet_v1_posenet_coco_keypoints--353x481_quant_n2x_orca_1")

# adjust pose model properties
pose_model.output_pose_threshold = 0.2 # lower threshold
pose_model.overlay_line_width = 1
pose_model.overlay_alpha = 1
pose_model.overlay_show_probabilities = False
pose_model.overlay_show_labels = False

# select OpenCV backend: needed to have overlay image in OpenCV format
people_det_model.image_backend = 'opencv'
pose_model.image_backend = 'opencv' 

In [17]:
# open video stream from local camera #0
stream = cv2.VideoCapture(0)

# define iterator function, which returns frames from camera 
def source():
    while True:
        ret, frame = stream.read()
        yield frame

In [18]:
try:
    # run person detection model on a camera stream
    for people in people_det_model.predict_batch(source()):

        # prepare list of bboxes of detected person
        person_boxes = [person['bbox'] for person in people.results]
        if len(person_boxes) == 0:
            continue

        # prepare list of images cropped around each detected person
        person_crops = [ crop(people.image, box) for box in person_boxes ]

        # for each detected person detect the pose
        all_poses = None # accumulated result
        for poses, box in zip(pose_model.predict_batch(person_crops), person_boxes):

            for r in poses.results: # convert pose coordinates to back to original image
                for p in r['landmarks']:
                    p['landmark'][0] += box[0]
                    p['landmark'][1] += box[1]

            if all_poses is None: # accumulate all detected poses
                all_poses = poses
                all_poses._input_image = people.image_overlay
            else:
                all_poses._inference_results += poses.results

        show(all_poses.image_overlay, "Poses")
    
except KeyboardInterrupt:
    pass # ignore KeyboardInterrupt errors
finally:
    cv2.destroyAllWindows() # close OpenCV windows


In [19]:
stream.release() # release camera stream