In [1]:
import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import os
import cv2
from cv2 import aruco
from tqdm import tqdm
import msgpack as mp
import msgpack_numpy as mpn
from datetime import datetime

## Path definition

In [10]:
_pth = os.path.dirname(os.path.dirname(os.getcwd()))
_parent_folder = "example_dataset"
_calib_folder_name = "calib_undistort_aruco"

_webcam_calib_folder = os.path.join(
    _pth, _parent_folder, "calibration", _calib_folder_name
)
_webcam_calib_video = os.path.join(_webcam_calib_folder, "webcam_color.msgpack")


In [11]:
_webcam_calib_folder

'/home/sujith/Documents/programs/example_dataset/calibration/calib_undistort_aruco'

In [14]:
vals = mp.Unpacker(open(_webcam_calib_video, 'rb'), object_hook=mpn.decode)
list(vals)

[[800, 1200],
 [[array([[[637., 291.],
           [633., 353.],
           [570., 350.],
           [575., 288.]]], dtype=float32),
   array([[[641., 215.],
           [637., 277.],
           [575., 275.],
           [579., 214.]]], dtype=float32),
   array([[[646., 141.],
           [642., 202.],
           [580., 201.],
           [584., 141.]]], dtype=float32),
   array([[[561., 288.],
           [557., 350.],
           [497., 348.],
           [500., 287.]]], dtype=float32),
   array([[[566., 214.],
           [562., 275.],
           [502., 274.],
           [506., 213.]]], dtype=float32),
   array([[[571., 141.],
           [567., 201.],
           [507., 200.],
           [511., 140.]]], dtype=float32),
   array([[[488., 287.],
           [484., 347.],
           [425., 345.],
           [430., 284.]]], dtype=float32),
   array([[[493., 213.],
           [489., 273.],
           [430., 271.],
           [435., 212.]]], dtype=float32),
   array([[[499., 141.],
           [493.,

# define calibration file

In [4]:

_recording_dir = os.path.join(os.path.dirname(os.getcwd()), '..', "example_dataset")
_data_dir = "mono2_3d_trial0"
_data_path = os.path.join(
    _recording_dir, "recordings", _data_dir, "webcam_color.msgpack"
)

ar_parameters = aruco.DetectorParameters()
ar_dict = aruco.getPredefinedDictionary(aruco.DICT_APRILTAG_36H11)
ar_detector = aruco.ArucoDetector(ar_dict, ar_parameters)
markerLength = 0.06
markerSeperation = 0.01

default_ids = [4, 8, 12, 14, 20]

board = aruco.GridBoard(
    size=[1, 1],
    markerLength=markerLength,
    markerSeparation=markerSeperation,
    dictionary=ar_dict,
)

In [4]:
def estimate_pose_single_markers(
    corners, marker_size, camera_matrix, distortion_coefficients
):
    marker_points = np.array(
        [
            [-marker_size / 2, marker_size / 2, 0],
            [marker_size / 2, marker_size / 2, 0],
            [marker_size / 2, -marker_size / 2, 0],
            [-marker_size / 2, -marker_size / 2, 0],
        ],
        dtype=np.float32,
    )
    rvecs = []
    tvecs = []
    for corner in corners:
        _, r, t = cv2.solvePnP(
            marker_points,
            corner,
            camera_matrix,
            distortion_coefficients,
            True,
            flags=cv2.SOLVEPNP_IPPE_SQUARE,
        )
        if r is not None and t is not None:
            r = np.array(r).reshape(1, 3).tolist()
            t = np.array(t).reshape(1, 3).tolist()
            rvecs.append(r)
            tvecs.append(t)
    return np.array(rvecs, dtype=np.float32), np.array(tvecs, dtype=np.float32)


In [5]:

ARUCO_PARAMETERS = aruco.DetectorParameters()
ARUCO_DICT = aruco.getPredefinedDictionary(aruco.DICT_ARUCO_ORIGINAL)
detector = aruco.ArucoDetector(ARUCO_DICT, ARUCO_PARAMETERS)
markerLength = 0.05
markerSeperation = 0.01

board = aruco.GridBoard(
    size=[1, 1],
    markerLength=markerLength,
    markerSeparation=markerSeperation,
    dictionary=ARUCO_DICT,
)

In [6]:
calibration_flags = cv2.CALIB_CB_CLUSTERING
term_criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 1e-6)

In [7]:

corners, ids = [], []
with open(_data_path, "rb") as _f:
    unpacker = mp.Unpacker(_f, object_hook=mpn.decode)
    img_size = next(unpacker)
    for _frame in tqdm(unpacker):
        _c, _i = _frame
        corners.append(_c)
        ids.append(_i)

timestamp = []
sync = []

with open(
    os.path.join(os.path.dirname(_data_path), "webcam_timestamp.msgpack"), "rb"
) as _f:
    unpacker = mp.Unpacker(_f, object_hook=mpn.decode)
    for _p in unpacker:
        sync.append(_p[0])
        timestamp.append(_p[1])
sync = np.array(sync).astype(bool)

1636it [00:00, 136029.68it/s]


In [8]:

ar_df = {"time": timestamp, "sync": sync}
ar_df = pl.from_dict(ar_df)
if type(ar_df["time"][0]) is not datetime:
    ar_df = ar_df.with_columns(pl.col("time").str.to_datetime())

NameError: name 'datetime' is not defined