# Pre-Requisites

Download **A4 - 20mm squares - 13x9 verticies, 14x10 squares** variant from https://markhedleyjones.com/projects/calibration-checkerboard-collection and print it to take several photos of it from the IP camera on the line where the checker board is clearly visible at different angles and orientations as possible. Check the reference ZIP file which contains the checkerboard images here which i have taken for testing it at the office

Zip File link:https://drive.google.com/file/d/1CijilFYJrcAxvWLI7jY-4R5tW8J3Kmld/view?usp=sharing

after taking photos as described, create a folder named calib and paste all the photos into it.

visit   https://shiqiliu-67.github.io/apriltag-generator/
for downloading the apriltags, stick them physically onto the background where the camera needs to be calibrated to.

please install the following dependencies first:
- pupil_apriltags
- yaml
- cv2
- pickle
- numpy

In [None]:
!pip install pupil_apriltags opencv-python pyyaml numpy pickle

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com



[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


# Code for saving camera intrinsics

In [3]:
import cv2
import numpy as np
import glob
import pickle
import os

def generate_camera_intrinsics(
    calib_folder="calib", 
    pattern_size=(13, 9),       # 13 x 9 inner corners
    square_size=0.02            # 20mm squares in meters
):
    objp = np.zeros((pattern_size[0]*pattern_size[1], 3), np.float32)
    objp[:, :2] = np.mgrid[0:pattern_size[0], 0:pattern_size[1]].T.reshape(-1, 2)
    objp *= square_size

    objpoints = []  # 3D points in real-world space
    imgpoints = []  # 2D points in image plane

    images = glob.glob(os.path.join(calib_folder, '*.jpg'))

    if not images:
        raise RuntimeError(f"No images found in {calib_folder}")

    print(f"[INFO] Found {len(images)} calibration images.")

    for fname in images:
        img = cv2.imread(fname)
        if img is None:
            print(f"[WARNING] Could not read image: {fname}")
            continue
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

        ret, corners = cv2.findChessboardCorners(gray, pattern_size, None)

        if ret:
            objpoints.append(objp)
            corners2 = cv2.cornerSubPix(
                gray, corners, (11, 11), (-1, -1),
                criteria=(cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
            )
            imgpoints.append(corners2)

            # Optional: show visual feedback
            vis = img.copy()
            cv2.drawChessboardCorners(vis, pattern_size, corners2, ret)
            cv2.imshow('Detected Corners', vis)
            cv2.waitKey(100)
        else:
            print(f"[WARNING] Corners not found in: {fname}")

    cv2.destroyAllWindows()

    if len(objpoints) < 5:
        raise RuntimeError("Not enough valid calibration images with detected corners.")

    # Calibrate
    ret, K, dist, rvecs, tvecs = cv2.calibrateCamera(
        objpoints, imgpoints, gray.shape[::-1], None, None
    )

    print("[INFO] Calibration successful.")
    print("Camera matrix:\n", K)
    print("Distortion coefficients:\n", dist)

    # Save calibration data
    with open("camera_intrinsics.pkl", "wb") as f:
        pickle.dump({'K': K, 'dist': dist}, f)

    print("[INFO] Saved to camera_intrinsics.pkl")

if __name__ == "__main__":
    generate_camera_intrinsics("calib", pattern_size=(13, 9), square_size=0.02)


[INFO] Found 13 calibration images.
[INFO] Calibration successful.
Camera matrix:
 [[1.23905339e+03 0.00000000e+00 9.85581262e+02]
 [0.00000000e+00 1.24885756e+03 5.28625307e+02]
 [0.00000000e+00 0.00000000e+00 1.00000000e+00]]
Distortion coefficients:
 [[-0.33954079  0.12188955  0.0033804  -0.00256683  0.00663979]]
[INFO] Saved to camera_intrinsics.pkl


# 1. Code for saving Coords(Single-Threaded)
(Multi-Threaded available at 3)

In [None]:
import cv2
import numpy as np
import os
import yaml
from pupil_apriltags import Detector

def save_reference_poses(rtsp_url, tag_size=0.04):
    cap = cv2.VideoCapture(rtsp_url)
    if not cap.isOpened():
        print("[ERROR] Cannot open RTSP stream.")
        return

    with open("camera_intrinsics.pkl", "rb") as f:
        data = pickle.load(f)
    K = data['K']
    dist = data['dist']

    fx, fy = K[0, 0], K[1, 1]
    cx, cy = K[0, 2], K[1, 2]
    camera_params = [fx, fy, cx, cy]

    detector = Detector(families='tag36h11')

    reference_data = {}

    print("[INFO] Press 's' to save the current tag pose. Press 'q' to quit.")
    while True:
        ret, frame = cap.read()
        if not ret:
            continue

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        detections = detector.detect(
            gray,
            estimate_tag_pose=True,
            camera_params=camera_params,
            tag_size=tag_size
        )

        for det in detections:
            rvec, _ = cv2.Rodrigues(det.pose_R)
            tvec = det.pose_t
            tag_id = det.tag_id

            cv2.drawFrameAxes(frame, K, dist, rvec, tvec, tag_size / 2)
            cv2.putText(frame, f"ID {tag_id}", tuple(det.center.astype(int)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

        cv2.imshow("Save Reference Pose", frame)
        key = cv2.waitKey(1)

        if key == ord('s'):
            for det in detections:
                rvec, _ = cv2.Rodrigues(det.pose_R)
                tvec = det.pose_t
                reference_data[det.tag_id] = {
                    'rvec': rvec.ravel().tolist(),
                    'tvec': tvec.ravel().tolist()
                }
            cv2.imwrite("reference_snapshot.jpg", frame)
            with open("reference_pose.yaml", "w") as f:
                yaml.dump(reference_data, f)
            print("[INFO] Reference pose and snapshot saved.")
            print("[INFO] Saved pose for detected tags.")
        
        elif key == ord('q'):
            with open("reference_pose.yaml", "w") as f:
                yaml.dump(reference_data, f, sort_keys=True)
            print("[INFO] Reference poses saved to reference_pose.yaml")
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    rtsp_url = "rtsp://your_rtsp_stream"
    save_reference_poses(rtsp_url)


# 2. Code for Recalibration(Multi-Threaded)

In [None]:
import cv2
import numpy as np
import pickle
import yaml
from pupil_apriltags import Detector

import cv2
import numpy as np
import yaml
import threading
import pickle
from pupil_apriltags import Detector

class VideoCaptureAsync:
    def __init__(self, src=0):
        self.src = src
        self.cap = cv2.VideoCapture(self.src)
        self.grabbed, self.frame = self.cap.read()
        self.started = False
        self.read_lock = threading.Lock()

    def start(self):
        if self.started:
            print('[WARNING] Video capture already started.')
            return None
        self.started = True
        self.thread = threading.Thread(target=self.update, daemon=True)
        self.thread.start()
        return self

    def update(self):
        while self.started:
            grabbed, frame = self.cap.read()
            with self.read_lock:
                self.grabbed = grabbed
                self.frame = frame

    def read(self):
        with self.read_lock:
            frame = self.frame.copy() if self.frame is not None else None
            grabbed = self.grabbed
        return grabbed, frame

    def stop(self):
        self.started = False
        self.thread.join()
        self.cap.release()

# === Parameters ===
RTSP_URL    = "rtsp://your_rtsp_stream"
TAG_SIZE    = 0.04      # meters
TOLERANCE_T = 0.01      # meters
TOLERANCE_R = 5         # degrees
FONT        = cv2.FONT_HERSHEY_SIMPLEX
ARROW_THICKNESS = 4
WIN_W, WIN_H = 1280, 720

# === Load Camera Intrinsics ===
with open("camera_intrinsics.pkl", "rb") as f:
    intr = pickle.load(f)
K    = intr["K"]
dist = intr["dist"]
fx, fy = K[0,0], K[1,1]
cx, cy = K[0,2], K[1,2]
camera_params = [fx, fy, cx, cy]

# === Load Reference Poses & Snapshot ===
with open("reference_pose.yaml", "r") as f:
    reference_data = yaml.safe_load(f)
reference_data = {int(k): v for k, v in reference_data.items()}

snapshot = cv2.imread("reference_snapshot.jpg")
snapshot_resized = cv2.resize(snapshot, (WIN_W, WIN_H)) if snapshot is not None else None

# === Init Camera & Tag Detector ===
cap = VideoCaptureAsync(rtsp_url).start()

detector = Detector(families="tag36h11", nthreads=4)

def draw_guides(frame, t_diff, r_diff):
    h, w = frame.shape[:2]
    cx_, cy_ = w // 2, h // 2

    t_cm = np.round(t_diff * 100, 2)
    r_deg = np.round(r_diff, 2)

    guidance = []

    # X-axis
    if t_diff[0] > TOLERANCE_T:
        cv2.arrowedLine(frame, (int(w*0.9), cy_), (int(w*0.8), cy_), (0,0,255), ARROW_THICKNESS)
        guidance.append(("Right", t_cm[0]))
    elif t_diff[0] < -TOLERANCE_T:
        cv2.arrowedLine(frame, (int(w*0.1), cy_), (int(w*0.2), cy_), (0,0,255), ARROW_THICKNESS)
        guidance.append(("Left", abs(t_cm[0])))

    # Y-axis
    if t_diff[1] > TOLERANCE_T:
        cv2.arrowedLine(frame, (cx_, int(h*0.9)), (cx_, int(h*0.8)), (0,255,0), ARROW_THICKNESS)
        guidance.append(("Down", t_cm[1]))
    elif t_diff[1] < -TOLERANCE_T:
        cv2.arrowedLine(frame, (cx_, int(h*0.1)), (cx_, int(h*0.2)), (0,255,0), ARROW_THICKNESS)
        guidance.append(("Up", abs(t_cm[1])))

    # Z-axis
    if t_diff[2] > TOLERANCE_T:
        guidance.append(("Forward", t_cm[2]))
    elif t_diff[2] < -TOLERANCE_T:
        guidance.append(("Backward", abs(t_cm[2])))

    # display each on correct edge
    for direction, offset in guidance:
        text = f"Move {direction} by {offset:.1f} cm"
        sz, _ = cv2.getTextSize(text, FONT, 0.6, 2)

        if direction == "Left":
            pos = (10, cy_)
        elif direction == "Right":
            pos = (w - sz[0] - 10, cy_)
        elif direction == "Up":
            pos = ((w - sz[0]) // 2, 30)
        elif direction == "Down":
            pos = ((w - sz[0]) // 2, h - 10)
        elif direction == "Forward":
            pos = ((w - sz[0]) // 2, cy_ - 40)
        else:  # Backward
            pos = ((w - sz[0]) // 2, cy_ + 40)

        cv2.putText(frame, text, pos, FONT, 0.6, (0,255,255), 2)

    # Rotation (top-left)
    axis_names = ["Yaw (Z)", "Pitch (Y)", "Roll (X)"]
    for i, angle in enumerate(r_deg):
        if abs(angle) > TOLERANCE_R:
            sign = "↻" if r_diff[i] > 0 else "↺"
            txt = f"Rotate {axis_names[i]} {sign} by {abs(angle):.1f}°"
            cv2.putText(frame, txt, (10, 30 + 30 * (i + 1)), FONT, 0.6, (0, 0, 255), 2)


cv2.namedWindow("Recalibration", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Recalibration", WIN_W, WIN_H)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    dets = detector.detect(gray, estimate_tag_pose=True, camera_params=camera_params, tag_size=TAG_SIZE)

    for det in dets:
        tid = det.tag_id
        if tid not in reference_data:
            continue

        rvec_now, _ = cv2.Rodrigues(det.pose_R)
        tvec_now = det.pose_t.ravel()
        ref = reference_data[tid]
        rvec_ref = np.array(ref["rvec"])
        tvec_ref = np.array(ref["tvec"])

        dR = cv2.Rodrigues(det.pose_R @ cv2.Rodrigues(rvec_ref)[0].T)[0].ravel()
        r_diff = np.rad2deg(dR)
        t_diff = tvec_now - tvec_ref

        cv2.drawFrameAxes(frame, K, dist, rvec_now, tvec_now, TAG_SIZE / 2)
        draw_guides(frame, t_diff, r_diff)

    # Overlay reference image if within threshold
    if dets and snapshot_resized is not None:
        if (np.all(np.abs(t_diff) < 1.5 * TOLERANCE_T) and
            np.all(np.abs(r_diff) < 1.5 * TOLERANCE_R)):
            snapshot_overlay = cv2.resize(snapshot_resized, (frame.shape[1], frame.shape[0]))
            frame = cv2.addWeighted(frame, 0.7, snapshot_overlay, 0.3, 0)

    # Display main window
    cv2.imshow("Recalibration", cv2.resize(frame, (WIN_W, WIN_H)))

    # Display separate reference snapshot
    if snapshot_resized is not None:
        cv2.imshow("Reference Snapshot", snapshot_resized)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.stop()
cv2.destroyAllWindows()


# 3.Multi-Threading approach for saving co-ords

In [None]:
import cv2
import numpy as np
import yaml
import threading
import pickle
from pupil_apriltags import Detector

class VideoCaptureAsync:
    def __init__(self, src=0):
        self.src = src
        self.cap = cv2.VideoCapture(self.src)
        self.grabbed, self.frame = self.cap.read()
        self.started = False
        self.read_lock = threading.Lock()

    def start(self):
        if self.started:
            print('[WARNING] Video capture already started.')
            return None
        self.started = True
        self.thread = threading.Thread(target=self.update, daemon=True)
        self.thread.start()
        return self

    def update(self):
        while self.started:
            grabbed, frame = self.cap.read()
            with self.read_lock:
                self.grabbed = grabbed
                self.frame = frame

    def read(self):
        with self.read_lock:
            frame = self.frame.copy() if self.frame is not None else None
            grabbed = self.grabbed
        return grabbed, frame

    def stop(self):
        self.started = False
        self.thread.join()
        self.cap.release()


def save_reference_poses(rtsp_url, tag_size=0.04):
    cap = VideoCaptureAsync(rtsp_url).start()
    
    with open("camera_intrinsics.pkl", "rb") as f:
        data = pickle.load(f)
    K = data['K']
    dist = data['dist']

    fx, fy = K[0, 0], K[1, 1]
    cx, cy = K[0, 2], K[1, 2]
    camera_params = [fx, fy, cx, cy]

    detector = Detector(families='tag36h11')

    reference_data = {}

    print("[INFO] Press 's' to save the current tag pose. Press 'q' to quit.")
    while True:
        ret, frame = cap.read()
        if not ret or frame is None:
            continue

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        detections = detector.detect(
            gray,
            estimate_tag_pose=True,
            camera_params=camera_params,
            tag_size=tag_size
        )

        for det in detections:
            rvec, _ = cv2.Rodrigues(det.pose_R)
            tvec = det.pose_t
            tag_id = det.tag_id

            cv2.drawFrameAxes(frame, K, dist, rvec, tvec, tag_size / 2)
            cv2.putText(frame, f"ID {tag_id}", tuple(det.center.astype(int)), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

        cv2.imshow("Save Reference Pose", frame)
        key = cv2.waitKey(1)

        if key == ord('s'):
            for det in detections:
                rvec, _ = cv2.Rodrigues(det.pose_R)
                tvec = det.pose_t
                reference_data[det.tag_id] = {
                    'rvec': rvec.ravel().tolist(),
                    'tvec': tvec.ravel().tolist()
                }
            cv2.imwrite("reference_snapshot.jpg", frame)
            with open("reference_pose.yaml", "w") as f:
                yaml.dump(reference_data, f)
            print("[INFO] Reference pose and snapshot saved.")
        
        elif key == ord('q'):
            with open("reference_pose.yaml", "w") as f:
                yaml.dump(reference_data, f, sort_keys=True)
            print("[INFO] Reference poses saved to reference_pose.yaml")
            break

    cap.stop()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    rtsp_url = "rtsp://your_rtsp_stream"
    save_reference_poses(rtsp_url)
