In [None]:
import importlib
import os
import pickle
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, List
import sys
from pathlib import Path
import time

import cv2
# import fire
import numpy as np
import numpy.typing as npt
# import cupy as np
# import cupy.typing as npt
importlib.reload(np)
import torch
import torch.utils.data
from tqdm import tqdm
from pyquaternion import Quaternion

from aputils import Video, camera_config
from scripts._path_init import *
from visualDet3D.data.pipeline import build_augmentator
from visualDet3D.networks.detectors.monodtr_detector import MonoDTR
from visualDet3D.networks.utils.utils import BackProjection, BBox3dProjector
from visualDet3D.utils.utils import cfg_from_file

In [None]:
from submodules.Yolov5_StrongSORT_OSNet.trackers.multi_tracker_zoo import create_tracker
from submodules.Yolov5_StrongSORT_OSNet.yolov5.utils.general import scale_boxes
from submodules.Yolov5_StrongSORT_OSNet.yolov5.utils.plots import Annotator, colors
from submodules.Yolov5_StrongSORT_OSNet.trackers.strong_sort.strong_sort import StrongSORT

print('CUDA available: {}'.format(torch.cuda.is_available()))

In [None]:
import monodtrutils
importlib.reload(monodtrutils)
from monodtrutils import NuScenesMonoDataset, Detection, format_detections

In [None]:
VIDEO_DIR = "./data/boston-seaport/"
names = ['Car', 'Pedestrian', 'Cyclist']

In [None]:
config = "config/config.py"
gpu = 1
checkpoint_path = "./workdirs/MonoDTR/checkpoint/MonoDTR.pth"


# Read Config
cfg: "Any" = cfg_from_file(config)

# Force GPU selection in command line
cfg.trainer.gpu = gpu
torch.cuda.set_device(cfg.trainer.gpu)

cfg.is_running_test_set = True

In [None]:
# Create StrongSORT model
FILE = Path('').resolve()
reid_weights = FILE / "weights/osnet_x0_25_msmt17.pt"  # model.pt path
device = torch.device(int(gpu))
half = False
tracker = create_tracker("strongsort", reid_weights, device, half)
tracker.model.warmup()
print('loaded tracker')

In [None]:
# Create detection the model
detector = MonoDTR(cfg.detector)
detector = detector.cuda()
state_dict = torch.load(checkpoint_path, map_location='cuda:{}'.format(cfg.trainer.gpu))
new_dict = state_dict.copy()
detector.load_state_dict(new_dict, strict=False)
detector.eval()
print('loaded detector')

In [None]:
# Run evaluation
dataset = NuScenesMonoDataset(cfg, VIDEO_DIR)
print('constructed dataset')

In [None]:
imgs = []
time_load = []
time_detect = []
time_track = []
with torch.no_grad():
    detector.eval()
    result_path = os.path.join(cfg.path.preprocessed_path, 'data')
    if os.path.isdir(result_path):
        os.system("rm -r {}".format(result_path))
        print("clean up the recorder directory of {}".format(result_path))
    os.mkdir(result_path)
    print("rebuild {}".format(result_path))

    projector = BBox3dProjector().cuda()
    backprojector = BackProjection().cuda()

    trackings = {}
    output = None
    dt, seen = [0.0, 0.0, 0.0, 0.0], 0
    curr_frame, prev_frame = None, None
    detections: "List[Detection]" = []
    trackings = []
    for index in tqdm(range(len(dataset))): 
        t1 = time.time()
        data = dataset[index]
        if isinstance(data['calib'], list):
            P2 = data['calib'][0]
        else:
            P2 = data['calib']
        collated_data = dataset.collate_fn([data])
            

        # images: torch.Tensor [N=1 x 3 x h x w]
        # P2: [np.array[3 x 4]]
        images, P2 = collated_data
        camera_translation = np.array(data['camera_translation'])
        camera_rotation = Quaternion(data['camera_rotation'])
        time_load.append(time.time() - t1)
    
        t1 = time.time()
        scores, bbox, clss = detector([
            images.cuda().float().contiguous(),
            torch.tensor(np.array(P2)).cuda().float()
        ])
        # scores, bbox, obj_names = test_mono_detection(collated_data, detector, None, cfg=cfg)
        assert scores.shape[0] == bbox.shape[0] and scores.shape[0] == clss.shape[0]
        bbox_2d = bbox[:, 0:4]

        if bbox.shape[1] <= 4:
            raise Exception('Should run 3D')
        bbox_3d_state = bbox[:, 4:]  # [cx,cy,z,w,h,l,alpha, bot, top]
        P2 = P2[0]
        bbox_3d_state_3d = backprojector(bbox_3d_state, P2)  # [x, y, z, w,h ,l, alpha, bot, top]

        _, _, thetas = projector(bbox_3d_state_3d, bbox_3d_state_3d.new(P2))

        original_P = data['original_P']
        scale_x = original_P[0, 0] / P2[0, 0]
        scale_y = original_P[1, 1] / P2[1, 1]
        
        shift_left = original_P[0, 2] / scale_x - P2[0, 2]
        shift_top  = original_P[1, 2] / scale_y - P2[1, 2]
        bbox_2d[:, 0:4:2] += shift_left
        bbox_2d[:, 1:4:2] += shift_top

        bbox_2d[:, 0:4:2] *= scale_x
        bbox_2d[:, 1:4:2] *= scale_y

        detection = format_detections(scores, bbox_2d, bbox_3d_state_3d, thetas, clss)
        bbox_3d__ = []
        for i, b in enumerate(detection.bbox_3d_state_3d):
            b = b[:3]
            b = np.array(b.cpu())
            b = camera_rotation.rotate(b)
            b = b + camera_translation
            bbox_3d__.append(b)
        detection.bbox_3d_state_3d = torch.tensor(np.array(bbox_3d__)).to(device)
            
        detections.append(detection)
        time_detect.append(time.time() - t1)
        
        
        t1 = time.time()
        im = collated_data[0]
        im0 = np.ascontiguousarray(data['original_image'].copy()[:, :, ::-1])
        curr_frame = im0.copy()
        annotator = Annotator(im0, line_width=2, pil=not ascii)
        if prev_frame is not None and curr_frame is not None:
            if hasattr(tracker, 'tracker') and hasattr(tracker.tracker, 'camera_update'):
                tracker.tracker.camera_update(prev_frame, curr_frame)
        det = bbox_2d
        if scores.size()[0] == 0:
            det = torch.tensor([])
        else:
            det = torch.cat([
                bbox_2d,
                scores.unsqueeze(1),
                clss.unsqueeze(1),
            ], dim=1)
        tracking = []
        if det is not None and len(det):
            # Rescale boxes from img_size to im0 size
            # det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im0.shape).round()  # xyxy
            outputs = tracker.update(det.cpu(), im0)
            time_track.append(time.time() - t1)
            if len(outputs) > 0:
                for j, (output, conf) in enumerate(zip(outputs, det[:, 4])):
                    bboxes = output[0:4]
                    id = int(output[4])
                    cls = int(output[5])
                    index = int(output[7])
                    if output[8] == 0:
                        bbox_3d = bbox_3d_state_3d[index][:3]
                        bbox_3d = camera_rotation.rotate(bbox_3d)
                        bbox_3d = camera_translation + bbox_3d
                        bbox_3d = bbox_3d.tolist()
                    else:
                        bbox_3d = None

                    bbox_left = output[0]
                    bbox_top = output[1]
                    bbox_w = output[2] - output[0]
                    bbox_h = output[3] - output[1]
                    label = f'{id} {names[cls]} {conf:.2f}'
                    annotator.box_label(bboxes, label, color=colors(cls, True))
                    tracking.append((id, cls, index, bboxes.tolist(), bbox_3d, output[8]))
        else:
            time_track.append(time.time() - t1)
        
        trackings.append(tracking)
        
        im0 = annotator.result()
        imgs.append(im0)
        prev_frame = curr_frame

In [None]:
fps = 20
h = imgs[0].shape[0]
w = imgs[0].shape[1]
video_writer = cv2.VideoWriter('./test.mp4', cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))

for im0 in imgs:
    video_writer.write(im0[:, :, ::-1])

video_writer.release()
cv2.destroyAllWindows()

In [None]:
print(sum(time_load))

In [None]:
print(sum(time_detect))

In [None]:
print(sum(time_track))

In [None]:
import json
with open('time_load.json', "w") as f:
    json.dump(time_load, f)

In [None]:
def tolist(l):
    if isinstance(l, list):
        return l
    return l.tolist()

In [None]:
with open('detection.json', 'w') as f:
    json.dump(
        [
            {
                "bbox": tolist(d.bbox_2d),
                "clss": tolist(d.clss),
                "bbox_3d": tolist(d.bbox_3d_state_3d),
                "thetas": tolist(d.thetas),
                "scores": tolist(d.scores),
            }
            for d
            in detections
        ],
        f,
    )

In [None]:
with open('trackings.json', 'w') as f:
    json.dump(
        [
            [
                {
                    "id": d[0],
                    "cls": d[1],
                    "index": d[2],
                    "bbox": d[3],
                    "bbox_3d": d[4],
                    "detection_age": d[5],
                }
                for d
                in t
            ]
            for t
            in trackings
        ],
        f,
    )

In [None]:
trackings[100]