In [17]:
import sys
sys.path.insert(0, "../")

import glob
import json
from tqdm import tqdm
import numpy as np
import cv2
from scipy.optimize import curve_fit
import torch

from models.custom import HorizonModel
import horizon.transforms as T
from utils.horizon import draw_horizon

def read_sensor_data(sensors_fp: str, sensor="rgb_camera"):
    with open(sensors_fp) as f:
        data = json.load(f)
    return data[sensor]

class Undistorter:

    def __init__(self, sensor_data):
        self.map1, self.map2 = self.init_undistort_maps(sensor_data)

    @staticmethod
    def init_undistort_maps(sensor_data):
        K = np.array(sensor_data["K"])
        D = np.array(sensor_data["D"])
        Knew = np.array(sensor_data["Knew"])
        resolution = sensor_data["resolution"][::-1]
        map1, map2 = cv2.fisheye.initUndistortRectifyMap(
            K, D, np.eye(3), Knew, resolution, cv2.CV_16SC2)
        return map1, map2

    def __call__(self, img):
        return cv2.remap(img, self.map1, self.map2, 
                         interpolation=cv2.INTER_LINEAR, 
                         borderMode=cv2.BORDER_CONSTANT)
    
def postprocess_horizon(x_pitch, x_theta):    
    x_pitch, x_theta = x_pitch.softmax(1), x_theta.softmax(1)
    x_pitch = x_pitch.squeeze().cpu().numpy()
    x_theta = x_theta.squeeze().cpu().numpy()
    x = np.linspace(0, 1, len(x_pitch), endpoint=False)

    # curve fitting
    def gaussian(x, A, mu, sigma):
        return A * np.exp(-(x - mu)**2 / (2 * sigma**2))

    # Initial guess for the parameters (amplitude, mean, standard deviation)
    initial_pitch_guess = [x_pitch.max(), x_pitch.argmax() / x_pitch.shape[-1], 0.001]
    initial_theta_guess = [x_theta.max(), x_theta.argmax() / x_theta.shape[-1], 0.001]

    x_data = np.linspace(0, 1, x_pitch.shape[0], endpoint=False)
    params_pitch, cov_pitch = curve_fit(gaussian, x_data, x_pitch, p0=initial_pitch_guess)
    params_theta, cov_theta = curve_fit(gaussian, x_data, x_theta, p0=initial_theta_guess)

    return params_pitch[1], params_theta[1]

In [20]:
model = HorizonModel("/Users/kevinserrano/Downloads/tmp/best.pt", device="mps")
transform = T.horizon_base_RGB(1280)

sensors_fpaths = glob.glob("/Users/kevinserrano/Downloads/tmp/911/const/sensors.json")
for sensors_fp in sensors_fpaths:
    sensors_data = read_sensor_data(sensors_fp)
    undistorter = Undistorter(sensors_data)
    video_fp = sensors_fp.replace("const/sensors.json", "rgb.mp4")

    # read video, iterate over frames, convert and write new video
    cap = cv2.VideoCapture(video_fp)
    fps = cap.get(cv2.CAP_PROP_FPS)
    resolution = [1280, 1280] #tuple(sensors_data["resolution"])
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(video_fp.replace(".mp4", "_undistorted.mp4"), fourcc, fps, resolution)

    # get number of frames
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    pbar = tqdm(total=length)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # undistort frame
        frame = undistorter(frame)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = transform(image=frame, keypoints=[])["image"]

        # predict horizon
        with torch.inference_mode():
            x_pitch, x_theta = model(frame.unsqueeze(0).to(model.device))
            pitch, theta = postprocess_horizon(x_pitch, x_theta)

        # draw horizon
        frame = draw_horizon((frame.permute(1,2,0) * 255).numpy().astype(np.uint8),
                             pitch_theta=[pitch, theta], color=(0, 0, 255), diameter=2)

        # write frame
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        out.write(frame)
        pbar.update(1)
    
    pbar.close()
    cap.release()
    out.release()

YOLOv5 🚀 22e72e2 Python-3.10.13 torch-2.1.0 MPS

Loaded weights from /Users/kevinserrano/Downloads/tmp/best.pt
100%|██████████| 343/343 [00:20<00:00, 16.40it/s]
