starting point is changed to april tag in place of yellow

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import grey_dilation
from pupil_apriltags import Detector
import time
import heapq
import math

In [None]:
# ---------------- CONFIG ----------------
CELL_SIZE = 10                      # pixel size of a grid cell
DETECTOR = Detector(families='tag36h11')  # AprilTag detector (adjust family if needed)
CAM_INDEX = 1                       # camera index (change to 0 if required)
APRILTAG_ID = 0                     # which AprilTag ID is the start tag
INFLATION_RADIUS = 3                # obstacle inflation (in grid cells)
RESIZE_FACTOR = 10                  # factor used to shrink image for pathfinding
TIMEOUT_SECONDS = 10                # timeout for initial path find
# ----------------------------------------

In [None]:
def pixel_to_grid(pixel, cell_size=CELL_SIZE):
    """Convert (x, y) pixel coordinate to (row, col) grid coordinate."""
    # pixel is (x, y) as returned by contour centroid or AprilTag center
    return (pixel[1] // cell_size, pixel[0] // cell_size)

def grid_to_pixel(grid, cell_size=CELL_SIZE):
    """Convert (row, col) grid coordinate back to pixel center coordinate (x, y)."""
    # return (x, y)
    return (grid[1] * cell_size + cell_size // 2, grid[0] * cell_size + cell_size // 2)

def resize_for_pathfinding(mask, factor):
    """Downsample binary mask by factor for pathfinding grid."""
    small = cv2.resize(mask, (mask.shape[1] // factor, mask.shape[0] // factor),
                       interpolation=cv2.INTER_NEAREST)
    # convert to 0/1 grid where 1 = obstacle
    return (small > 0).astype(np.uint8)

def inflate_obstacles(grid, inflation_radius=1):
    """Inflate obstacles in grid using grey_dilation from scipy.ndimage."""
    return grey_dilation(grid, size=(2 * inflation_radius + 1, 2 * inflation_radius + 1))

In [None]:
def detect_start_end_obstacles(image, target_id=APRILTAG_ID):
    """
    Detect:
      - start: as AprilTag center (pixel coords) with tag_id == target_id
      - end: centroid of pink block
      - obstacles: combined red + green masks
    Returns (start_pixel_or_None, end_pixel_or_None, obstacle_mask)
    """
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    # --- Start: AprilTag center (pixel coords) ---
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    results = DETECTOR.detect(gray)
    start = None
    for r in results:
        if r.tag_id == target_id:
            start = tuple(map(int, r.center))  # (x, y)
            break

    # --- End: Pink block (centroid) ---
    # HSV pink range (keeps your previous settings)
    pink_mask = cv2.inRange(hsv, (160, 100, 120), (175, 255, 255))

    # --- Red obstacles (both ends of HSV hue) ---
    lower_red1 = cv2.inRange(hsv, (0, 120, 50), (10, 255, 255))
    upper_red2 = cv2.inRange(hsv, (160, 120, 50), (165, 255, 255))
    red_mask = cv2.bitwise_or(lower_red1, upper_red2)

    # --- Green obstacles ---
    green_mask = cv2.inRange(hsv, (40, 40, 40), (80, 255, 255))

    # Combine red & green as obstacles
    obstacle_mask = cv2.bitwise_or(red_mask, green_mask)

    def find_centroid(mask):
        cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        if cnts:
            largest = max(cnts, key=cv2.contourArea)
            M = cv2.moments(largest)
            if M["m00"] != 0:
                cx = int(M["m10"] / M["m00"])
                cy = int(M["m01"] / M["m00"])
                return (cx, cy)
        return None

    end = find_centroid(pink_mask)

    return start, end, obstacle_mask

In [None]:
def heuristic(a, b):
    """Euclidean distance heuristic for A* (a and b are grid (r,c) tuples)."""
    return math.hypot(a[0] - b[0], a[1] - b[1])

def astar(grid, start, end):
    """A* algorithm on binary grid where 1 = obstacle, 0 = free."""
    neighbors = [
        (0, 1), (1, 0), (0, -1), (-1, 0),
        (1, 1), (-1, -1), (1, -1), (-1, 1)
    ]

    close_set = set()
    came_from = {}
    gscore = {start: 0}
    fscore = {start: heuristic(start, end)}
    oheap = [(fscore[start], start)]

    while oheap:
        _, current = heapq.heappop(oheap)

        if current == end:
            # reconstruct path
            path = []
            while current in came_from:
                path.append(current)
                current = came_from[current]
            path.append(start)
            path.reverse()
            return path

        close_set.add(current)

        for dx, dy in neighbors:
            neighbor = (current[0] + dx, current[1] + dy)
            # check bounds
            if 0 <= neighbor[0] < grid.shape[0] and 0 <= neighbor[1] < grid.shape[1]:
                if grid[neighbor[0], neighbor[1]] == 1:
                    continue  # obstacle
            else:
                continue  # out of bounds

            tentative_g = gscore[current] + heuristic(current, neighbor)

            if neighbor in close_set and tentative_g >= gscore.get(neighbor, float('inf')):
                continue

            if tentative_g < gscore.get(neighbor, float('inf')):
                came_from[neighbor] = current
                gscore[neighbor] = tentative_g
                fscore[neighbor] = tentative_g + heuristic(neighbor, end)
                heapq.heappush(oheap, (fscore[neighbor], neighbor))

    return []  # no path

In [None]:
def visualize_grid(grid, start=None, end=None, title="Grid with Obstacles, Start & End"):
    """Simple grayscale visualization of the grid with optional start/end overlays."""
    vis = (grid * 255).astype(np.uint8)
    # overlay markers (on the coarse grid indices)
    if start:
        vis[start[0], start[1]] = 100
    if end:
        vis[end[0], end[1]] = 200

    plt.figure(figsize=(6, 6))
    plt.imshow(vis, cmap='gray')
    plt.title(title)
    plt.axis('off')
    plt.show()

In [None]:
def get_direction_changes(path):
    """Return grid points where direction changes (turning points)."""
    if len(path) < 3:
        return []

    turns = []
    prev_dir = (path[1][0] - path[0][0], path[1][1] - path[0][1])
    for i in range(2, len(path)):
        curr_dir = (path[i][0] - path[i-1][0], path[i][1] - path[i-1][1])
        if curr_dir != prev_dir:
            turns.append(path[i-1])
        prev_dir = curr_dir
    return turns

def detect_apriltag(frame, target_id=APRILTAG_ID):
    """Return pixel (x,y) of AprilTag center if found, else None."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    results = DETECTOR.detect(gray)
    for r in results:
        if r.tag_id == target_id:
            return tuple(map(int, r.center))
    return None

In [None]:
# -------------------- Main: find path once using AprilTag start --------------------
cap = cv2.VideoCapture(CAM_INDEX)
path = []
path_pixels = []
frame = None

start_time = time.time()
inflated_grid = None  # for later visualization if needed
start_pixel = None
end_pixel = None

print("Starting initial pathfinding loop (AprilTag = start)...")

while True:
    ret, frame = cap.read()
    if not ret:
        print("Camera read failed; retrying...")
        time.sleep(0.1)
        continue

    start_pixel, end_pixel, obstacle_mask = detect_start_end_obstacles(frame)

    print("Detected -> Start (AprilTag):", start_pixel, " End (pink):", end_pixel)

    # only proceed if both start & end are available
    if start_pixel and end_pixel:
        # Convert obstacle mask to downsampled grid
        obstacle_grid = resize_for_pathfinding(obstacle_mask, RESIZE_FACTOR)

        # Convert pixel coordinates to grid coords (coarse)
        start_grid = pixel_to_grid(start_pixel)
        end_grid = pixel_to_grid(end_pixel)

        # guard: ensure start/end within obstacle_grid bounds
        if not (0 <= start_grid[0] < obstacle_grid.shape[0] and 0 <= start_grid[1] < obstacle_grid.shape[1]):
            print("Start grid out of bounds after conversion. Waiting for a better detection...")
        elif not (0 <= end_grid[0] < obstacle_grid.shape[0] and 0 <= end_grid[1] < obstacle_grid.shape[1]):
            print("End grid out of bounds after conversion. Waiting for a better detection...")
        else:
            # Clear start & end in obstacle grid to avoid being treated as obstacles
            obstacle_grid[start_grid[0], start_grid[1]] = 0
            obstacle_grid[end_grid[0], end_grid[1]] = 0

            # Inflate obstacles in the grid
            inflated_grid = inflate_obstacles(obstacle_grid, inflation_radius=INFLATION_RADIUS)

            # Visualize coarse grid (optional)
            visualize_grid(inflated_grid, start_grid, end_grid, title="Inflated Grid")

            # Run A*
            print("Running A* ...")
            path = astar(inflated_grid, start_grid, end_grid)

            if path:
                print("Path found! Breaking initial loop.")
                # convert coarse path grid -> pixel coordinates for drawing on original frame
                path_pixels = [grid_to_pixel(pt) for pt in path]
                break
            else:
                print("No path found (try moving tag/end or adjust inflation/resolution).")

    # Optional timeout to avoid infinite loop
    if time.time() - start_time > TIMEOUT_SECONDS:
        print(f"Timeout ({TIMEOUT_SECONDS}s): Could not find valid path in time.")
        break

cap.release()

if not path:
    print("No path computed; exiting script.")
    # still try to visualize obstacle grid if present
    if inflated_grid is not None and start_pixel and end_pixel:
        visualize_grid(inflated_grid, pixel_to_grid(start_pixel), pixel_to_grid(end_pixel))
    raise SystemExit("Path computation failed.")

# Draw path overlay on last captured frame
output = frame.copy()
# Draw path with small dots
for pt in path_pixels:
    cv2.circle(output, pt, 2, (0, 0, 0), -1)  # black dots for path

# Start and end markers on output (visual clarity)
if path_pixels:
    cv2.circle(output, path_pixels[0], 6, (0, 255, 0), -1)   # Green = Start (AprilTag)
    cv2.circle(output, path_pixels[-1], 6, (255, 0, 255), -1)  # Magenta = End (pink block)

# Side-by-side original and path visualization using matplotlib
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.title("Original Image (last frame)")
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
plt.axis("off")

plt.subplot(1, 2, 2)
plt.title("Path Detected (overlay)")
plt.imshow(cv2.cvtColor(output, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.show()

In [None]:
# -------------------- Compute turning points --------------------
turning_points = get_direction_changes(path)
print("Direction change points (grid):")
for pt in turning_points:
    print("→", pt)

turning_pixels = [grid_to_pixel(pt) for pt in turning_points]
print("Direction change points (pixels):")
for pt in turning_pixels:
    print("→", pt)

In [None]:
# -------------------- Live robot tracking using AprilTag --------------------
print("Starting live AprilTag tracking. Press 'q' in window to stop.")

tag_path = []
cap = cv2.VideoCapture(CAM_INDEX)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    robot_pos = detect_apriltag(frame)  # live detection of the same tag id
    display = frame.copy()

    # Draw the planned maze path (static)
    for pt in path_pixels:
        cv2.circle(display, pt, 2, (0, 0, 0), -1)

    # Draw live tag and save its path
    if robot_pos:
        tag_path.append(robot_pos)
        cv2.circle(display, robot_pos, 6, (0, 0, 255), -1)  # red dot for live robot

    # Draw start & end points on live view
    if path_pixels:
        cv2.circle(display, path_pixels[0], 6, (0, 255, 0), -1)   # Start (green)
        cv2.circle(display, path_pixels[-1], 6, (255, 0, 255), -1)  # End (magenta)

    cv2.imshow("Robot Tracking with AprilTag", display)
    # Press 'q' key to break live loop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [None]:
# -------------------- Final comparison plot --------------------
# If we collected a tag path, plot it alongside planned path
if not tag_path:
    print("Warning: no AprilTag positions were recorded during tracking.")
else:
    maze_x = [p[0] for p in path_pixels]
    maze_y = [p[1] for p in path_pixels]

    tag_x = [p[0] for p in tag_path]
    tag_y = [p[1] for p in tag_path]

    plt.figure(figsize=(8, 8))
    # use last frame as background if available
    try:
        bg = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        plt.imshow(bg)
    except Exception:
        pass

    plt.plot(maze_x, maze_y, linestyle='--', linewidth=2, label='Planned Path (A*)')       # planned dashed
    plt.plot(tag_x, tag_y, linestyle='-', linewidth=2, label='Actual Robot Path (AprilTag)')  # actual

    if path_pixels:
        plt.scatter(*path_pixels[0], c='green', s=120, label='Start')
        plt.scatter(*path_pixels[-1], c='magenta', s=120, label='End')

    plt.title("Planned vs Actual Path")
    plt.legend()
    plt.axis('off')
    plt.show()

print("Script complete.")