In [6]:
import os 
import shutil
from pupil_apriltags import Detector
import cv2
import numpy as np
import json
import scipy as sp
import socket
import pickle
from IPython.display import clear_output

In [7]:
def load_calibration_file(calibration_file_path):
    with open(calibration_file_path, 'r') as infile:
        reconstruction = json.load(infile)
        mtx = np.array(reconstruction['mtx'])
        dist = np.array(reconstruction['dist'])
    return mtx, dist

mtx, dist = load_calibration_file('calibration_data.json')

In [8]:
fx, fy, cx, cy = mtx[0,0], mtx[1,1], mtx[0,2], mtx[1,2]

In [9]:
#check for available cameras
for i in range(10):
    camera = cv2.VideoCapture(i)
    ret, frame = camera.read()
    if ret:
        print('Camera found at index: ', i)
        camera.release()

Camera found at index:  0
Camera found at index:  1
Camera found at index:  2


In [10]:
def detector_superimpose(img, detector, tag_size=0.16):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    detections = detector.detect(gray, estimate_tag_pose= True, camera_params=[fx, fy, cx, cy], tag_size=tag_size)
    for detection in detections:
        center = detection.center
        cv2.circle(img, (int(center[0]), int(center[1])), 5, (0, 0, 255), -1)
        cv2.putText(img, str(detection.tag_id), (int(center[0]), int(center[1])), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
        #pose data is in = detection.pose_R, detection.pose_t
        # quat = sp.spatial.transform.Rotation.from_matrix(detection.pose_R).as_quat()
        # cv2.putText(img, str(quat), (int(center[0]), int(center[1]+30)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
        euler = sp.spatial.transform.Rotation.from_matrix(detection.pose_R).as_euler('xyz', degrees=True)
        cv2.putText(img, str(euler), (int(center[0]), int(center[1]+60)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
    
def live_feed(camera_id):
    cap = cv2.VideoCapture(camera_id)
    detector = Detector(families='tag36h11', nthreads=1, quad_decimate=1.0, quad_sigma=0.0, refine_edges=1, decode_sharpening=0.25, debug=0)
    while True:
        ret, frame = cap.read()
        if ret:
            detector_superimpose(frame, detector)
            cv2.imshow('frame', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        else:
            print("Camera is not working")
            break
    cap.release()
    cv2.destroyAllWindows()

live_feed(1)

In [None]:
posisional_data = {
    "tag_id": int,
    "pose_t": [float, float, float],
    "pose_quat": [float, float, float, float]
}

def getPoseData(img, detector, tag_size=0.16):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    detections = detector.detect(gray, estimate_tag_pose= True, camera_params=[fx, fy, cx, cy], tag_size=tag_size)
    for detection in detections:
        #tag id
        tag_id = detection.tag_id
        #translation vector
        pose_t = detection.pose_t
        #quaternion
        pose_quat = sp.spatial.transform.Rotation.from_matrix(detection.pose_R).as_quat()
        #conver both to list
        pose_t = pose_t.tolist()
        pose_quat = pose_quat.tolist()
        return tag_id, pose_t, pose_quat

def transmitter(camera_id, ip, port):
    detector = Detector(families='tag36h11', nthreads=1, quad_decimate=1.0, quad_sigma=0.0, refine_edges=1, decode_sharpening=0.25, debug=0)
    cap = cv2.VideoCapture(camera_id)
    while True:
        ret, frame = cap.read()
        if ret:
            try:
                tag_id, pose_t, pose_quat = getPoseData(frame, detector)
                if tag_id is not None:
                    posisional_data['tag_id'] = tag_id
                    posisional_data['pose_t'] = pose_t
                    posisional_data['pose_quat'] = pose_quat
                    print(posisional_data)
                    data = pickle.dumps(posisional_data)
                    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
                        s.connect((ip, port))
                        s.sendall(data)
                        print("Data sent")
                       
            except Exception as e:
                if e.args[0] == "cannot unpack non-iterable NoneType object":
                    pass
                else:
                    print(e)
        else:
            print("Camera is not working")
            break
    cap.release()
    cv2.destroyAllWindows()


transmitter(1, 'localhost', 1337)
