In [None]:
#Install norfair dependencies
!pip install commonmark-0.9.1-py2.py3-none-any.whl
!pip install rich-9.13.0-py3-none-any.whl
#Install norfair
! pip install norfair[metrics,video]

In [None]:
#imports
import numpy as np
from norfair import Detection, Tracker

In [None]:
#inference helper function
def inference(img, model, test_size): 
    bboxes = []
    bbclasses = []
    scores = []
    
    preproc = ValTransform(legacy = False)

    tensor_img, _ = preproc(img, None, test_size)
    tensor_img = torch.from_numpy(tensor_img).unsqueeze(0)
    tensor_img = tensor_img.float()
    tensor_img = tensor_img.cuda()

    with torch.no_grad():
        outputs = model(tensor_img)
        outputs = postprocess(
                    outputs, num_classes, confthre,
                    nmsthre, class_agnostic=True
                )

    if outputs[0] is None:
        return [], [], []
    
    outputs = outputs[0].cpu()
    bboxes = outputs[:, 0:4]

    bboxes /= min(test_size[0] / img.shape[0], test_size[1] / img.shape[1])
    bbclasses = outputs[:, 6]
    scores = outputs[:, 4] * outputs[:, 5]
    
    return bboxes, bbclasses, scores

In [None]:
##############################################################
#                      Tracking helpers                      #
##############################################################


# Helper to convert bbox in format [x_min, y_min, x_max, y_max, score] to norfair.Detection class
def to_norfair(detects, frame_id):
    result = []
    for x_min, y_min, x_max, y_max, score in detects:
        xc, yc = (x_min + x_max) / 2, (y_min + y_max) / 2
        w, h = x_max - x_min, y_max - y_min
        result.append(Detection(points=np.array([xc, yc]), scores=np.array([score]), data=np.array([w, h, frame_id])))
        
    return result

# Euclidean distance function to match detections on this frame with tracked_objects from previous frames
def euclidean_distance(detection, tracked_object):
    return np.linalg.norm(detection.points - tracked_object.estimate)

In [None]:
#Prediction tracking function
def tracking_predictions(iter_test):
    
    submission_dict = {
    'id': [],
    'prediction_string': [],
    }
    # Tracker will update tracks based on detections from current frame
    # Matching based on euclidean distance between bbox centers of detections 
    # from current frame and tracked_objects based on previous frames
    # You can check it's parameters in norfair docs
    # https://github.com/tryolabs/norfair/blob/master/docs/README.md
    tracker = Tracker(
        distance_function=euclidean_distance, 
        distance_threshold=30,
        hit_inertia_min=3,
        hit_inertia_max=6,
        initialization_delay=1,
    )
    
    # Save frame_id into detection to know which tracks have no detections on current frame
    frame_id = 0

    for (image_np, sample_prediction_df) in iter_test:

        bboxes, bbclasses, scores = yolox_inference(image_np[:,:,::-1], model, test_size)

        predictions = []
        detects = []
        for i in range(len(bboxes)):
            box = bboxes[i]
            cls_id = int(bbclasses[i])
            score = scores[i]
            if score < confthre:
                continue
            x_min = int(box[0])
            y_min = int(box[1])
            x_max = int(box[2])
            y_max = int(box[3])
            detects.append([x_min, y_min, x_max, y_max, score])

            bbox_width = x_max - x_min
            bbox_height = y_max - y_min

            predictions.append('{:.2f} {} {} {} {}'.format(score, x_min, y_min, bbox_width, bbox_height))

        #######################################################
        #                      Tracking                       #
        #######################################################

        # Update tracks using detects from current frame
        tracked_objects = tracker.update(detections=to_norfair(detects, frame_id))
        for tobj in tracked_objects:
            bbox_width, bbox_height, last_detected_frame_id = tobj.last_detection.data
            if last_detected_frame_id == frame_id:  # Skip objects that were detected on current frame
                continue

            # Add objects that have no detections on current frame to predictions
            xc, yc = tobj.estimate[0]
            x_min, y_min = int(round(xc - bbox_width / 2)), int(round(yc - bbox_height / 2))
            score = tobj.last_detection.scores[0]

            predictions.append('{:.2f} {} {} {} {}'.format(score, x_min, y_min, bbox_width, bbox_height))
        #######################################################

        prediction_str = ' '.join(predictions)
        sample_prediction_df['annotations'] = prediction_str
        env.predict(sample_prediction_df)
        if prediction_str:
            submission_dict['id'].append(frame_id)
            submission_dict['prediction_string'].append(prediction_str)
        print('Prediction:', prediction_str)
        frame_id += 1
    return submission_dict