In [41]:
import os
import sys
import polars as pl
import matplotlib.pyplot as plt
import numpy as np
import msgpack as mp
import msgpack_numpy as mpn
import toml
import cv2
from cv2 import aruco
import pandas as pd

Parameters

In [104]:
DATASET_FORMAT = 'POSE'

### Modify the parameter.toml for different directory
Defining folders and files

In [42]:
data_pth = toml.load(os.path.join(os.path.dirname(os.getcwd()), 'parameters.toml'))['dataset']['pth']
process_file = os.listdir(data_pth)[os.listdir(data_pth).index('00_analysis')]
analysis_folder = os.path.join(data_pth, process_file)
analysis_file = os.path.join(analysis_folder, 'folder_names.txt')

print(process_file)
print(data_pth)
print(analysis_file)

with open(analysis_file, "r") as f:
    video_folders_list = f.readlines()
    video_folders_list = [i.strip() for i in video_folders_list]

00_analysis
F:\\aruco_recent_recordings\\data_for_labelling_aug_1_2023
F:\\aruco_recent_recordings\\data_for_labelling_aug_1_2023\00_analysis\folder_names.txt


### Calibration files path

In [43]:
_calib_folder_name = "calibration_00"
_webcam_calib_pth = os.path.join(data_pth, os.path.dirname(process_file), _calib_folder_name, "webcam_calibration.msgpack")

with open(_webcam_calib_pth, "rb") as f:
    webcam_calib = mp.Unpacker(f, object_hook=mpn.decode)
    _temp = next(webcam_calib)
    _webcam_cam_mat = _temp[0]
    _webcam_dist = _temp[1]

_webcam_cam_mat

array([[671.25534529,   0.        , 678.00736213],
       [  0.        , 692.23316717, 443.37269229],
       [  0.        ,   0.        ,   1.        ]])

### ArUco dictionary and parameters

In [44]:
marker_size = 0.05

marker_points = np.array([[-marker_size / 2, marker_size / 2, 0],
                            [marker_size / 2, marker_size / 2, 0],
                            [marker_size / 2, -marker_size / 2, 0],
                            [-marker_size / 2, -marker_size / 2, 0]], dtype=np.float32)

ARUCO_PARAMETERS = aruco.DetectorParameters()
ARUCO_DICT = aruco.getPredefinedDictionary(aruco.DICT_ARUCO_MIP_36H12)
detector = aruco.ArucoDetector(ARUCO_DICT, ARUCO_PARAMETERS)
markerLength = marker_size
markerSeperation = 0.01

board = aruco.GridBoard(
        size= [1,1],
        markerLength=markerLength,
        markerSeparation=markerSeperation,
        dictionary=ARUCO_DICT)

def my_estimatePoseSingleMarkers(corners, marker_points, mtx, distortion):
    trash = []
    rvecs = []
    tvecs = []
    for c in corners:
        nada, R, t = cv2.solvePnP(marker_points, c, mtx, distortion, False, flags= cv2.SOLVEPNP_ITERATIVE)
        R = R.T
        t = t.T
        rvecs.append(R)
        tvecs.append(t)
        trash.append(nada)
    return rvecs, tvecs, trash

In [45]:
_video_path = os.path.join(data_pth, os.path.dirname(process_file), video_folders_list[0], 'webcam_color.msgpack')
_video_file = mp.Unpacker(open(_video_path, "rb"), object_hook=mpn.decode)


default_ids = [12, 88, 89]
data = {"frame_id":[],"marker_ids":[], "corners":[], "tvec":[], "rvec":[]}

counter = 0

detector = aruco.ArucoDetector(ARUCO_DICT, ARUCO_PARAMETERS)


for _frame in _video_file:
    shape = _frame.shape
    gray = cv2.cvtColor(_frame, cv2.COLOR_BGR2GRAY)
    
    corners, ids, rejectedImgPoints = detector.detectMarkers(gray)
    corners, ids, rejectedImgPoints,_ = detector.refineDetectedMarkers(image=gray,board=board ,detectedCorners=corners, detectedIds=ids, 
                                                                    rejectedCorners=rejectedImgPoints, cameraMatrix=_webcam_cam_mat, 
                                                                    distCoeffs=_webcam_dist)

    rotation_vectors, translation_vectors, _ = my_estimatePoseSingleMarkers(corners, marker_points, _webcam_cam_mat, _webcam_dist)

    if ids is None:
        data["frame_id"].append(counter)
        data["marker_ids"].append(None)
        data["corners"].append(None)
        data["tvec"].append(None)
        data["rvec"].append(None)

    if ids is not None:
        data["frame_id"].append(counter)
        data["marker_ids"].append(ids)
        data["corners"].append(corners)
        data["tvec"].append(translation_vectors)
        data["rvec"].append(rotation_vectors)

    counter+=1


In [46]:
coordinate = {str(default_ids[0]):{"x":[], "y":[], "z":[], "rx":[], "ry":[], "rz":[]},
              str(default_ids[1]):{"x":[], "y":[], "z":[], "rx":[], "ry":[], "rz":[]},
              str(default_ids[2]):{"x":[], "y":[], "z":[], "rx":[], "ry":[], "rz":[]}}

doesnt_exist = [] # list of ids that doesnt exist in a frame

for i in range(len(data["frame_id"])):
    if data["marker_ids"][i] is not None:

        if default_ids[0] not in data["marker_ids"][i]:
            doesnt_exist.append(default_ids[0])
        if default_ids[1] not in data["marker_ids"][i]:
            doesnt_exist.append(default_ids[1])
        if default_ids[2] not in data["marker_ids"][i]:
            doesnt_exist.append(default_ids[2])

        for j in range(len(data["marker_ids"][i])):
            if data["marker_ids"][i][j] in default_ids:
                coordinate[str(data["marker_ids"][i][j][0])]["x"].append(data["tvec"][i][j][0][0])
                coordinate[str(data["marker_ids"][i][j][0])]["y"].append(data["tvec"][i][j][0][1])
                coordinate[str(data["marker_ids"][i][j][0])]["z"].append(data["tvec"][i][j][0][2])
                coordinate[str(data["marker_ids"][i][j][0])]["rx"].append(data["rvec"][i][j][0][0])
                coordinate[str(data["marker_ids"][i][j][0])]["ry"].append(data["rvec"][i][j][0][1])
                coordinate[str(data["marker_ids"][i][j][0])]["rz"].append(data["rvec"][i][j][0][2])
        for k in doesnt_exist:
            coordinate[str(k)]["x"].append(np.nan)
            coordinate[str(k)]["y"].append(np.nan)
            coordinate[str(k)]["z"].append(np.nan)
            coordinate[str(k)]["rx"].append(np.nan)
            coordinate[str(k)]["ry"].append(np.nan)
            coordinate[str(k)]["rz"].append(np.nan)
        doesnt_exist = []
    else:
        for k in default_ids:
            coordinate[str(k)]["x"].append(np.nan)
            coordinate[str(k)]["y"].append(np.nan)
            coordinate[str(k)]["z"].append(np.nan)
            coordinate[str(k)]["rx"].append(np.nan)
            coordinate[str(k)]["ry"].append(np.nan)
            coordinate[str(k)]["rz"].append(np.nan)

In [100]:
ar_df_12 = pd.DataFrame(coordinate['12'])
ar_df_12['sort'] = (ar_df_12['z'].diff()>1)
drop_idx_12 = ar_df_12.query('sort == True').index

ar_df_88 = pd.DataFrame(coordinate['88'])
ar_df_88['sort'] = (ar_df_88['z'].diff()>1)
drop_idx_88 = ar_df_88.query('sort == True').index

ar_df_89 = pd.DataFrame(coordinate['89'])
ar_df_89['sort'] = (ar_df_89['z'].diff()>1)
drop_idx_89 = ar_df_89.query('sort == True').index

drop_ids = []
for i in drop_idx_12:
    drop_ids.append(i)
for i in drop_idx_88:
    drop_ids.append(i)
for i in drop_idx_89:
    drop_ids.append(i)
drop = {'drop':drop_ids}
drop_ids = pl.DataFrame(drop)
drops = drop_ids['drop'].unique().to_numpy()

In [107]:
_video_path = os.path.join(data_pth, os.path.dirname(process_file), video_folders_list[0], 'webcam_color.msgpack')
_video_file = mp.Unpacker(open(_video_path, "rb"), object_hook=mpn.decode)

default_ids = [12, 88, 89]
data = {"frame_id":[],"marker_ids":[], "corners":[], "tvec":[], "rvec":[]}

counter = 0

detector = aruco.ArucoDetector(ARUCO_DICT, ARUCO_PARAMETERS)
second_counter = 0

for _frame in _video_file:
    
    height, width = _frame.shape[:2]
    gray = cv2.cvtColor(_frame, cv2.COLOR_BGR2GRAY)
    
    markerCorners, ids, rejectedImgPoints = detector.detectMarkers(gray)
    
    markerCorners, ids, rejectedImgPoints,_ = detector.refineDetectedMarkers(image=gray,board=board ,detectedCorners=corners, detectedIds=ids, 
                                                                rejectedCorners=rejectedImgPoints, cameraMatrix=_webcam_cam_mat, 
                                                                distCoeffs=_webcam_dist)

    counter += 1
    
    if counter in drops:
        continue
    
    for i in range(len(ids)):
        second_counter += 1
        _markerCorners = markerCorners[i][0]
        bbox_x,bbox_y,bbox_width,bbox_height = cv2.boundingRect(_markerCorners)

        bbox_x = bbox_x / width
        bbox_y = bbox_y / height

        bbox_center_x = bbox_x + bbox_width / (2 * width)
        bbox_center_y = bbox_y + bbox_height / (2 * height)

        bbox_width = bbox_width / width
        bbox_height = bbox_height / height
        if ids[i][0] == default_ids[0]:
            _class_name = "0"
        elif ids[i][0] == default_ids[1]:
            _class_name = "1"
        elif ids[i][0] == default_ids[2]:
            _class_name = "2"
        else:
            continue
    
    # if DATASET_FORMAT == "SEGMENTATION":
    #     label_writer.writerow([_class_name,_markerCorners[0][0]/width, _markerCorners[0][1]/height, _markerCorners[1][0]/width, _markerCorners[1][1]/height, _markerCorners[2][0]/width, _markerCorners[2][1]/height, _markerCorners[3][0]/width, _markerCorners[3][1]/height, _markerCorners[0][0]/width, _markerCorners[0][1]/height])
    # elif DATASET_FORMAT == "POSE":
    #     label_writer.writerow([_class_name, bbox_center_x, bbox_center_y, bbox_width, bbox_height,_markerCorners[0][0]/width, _markerCorners[0][1]/height, _markerCorners[1][0]/width, _markerCorners[1][1]/height, _markerCorners[2][0]/width, _markerCorners[2][1]/height, _markerCorners[3][0]/width, _markerCorners[3][1]/height, _markerCorners[0][0]/width, _markerCorners[0][1]/height])

In [103]:
counter, second_counter

(717, 716)

In [102]:
counter = 608

array([608], dtype=int64)