# Interactive Pipeline for Tracking Robots
## v2, using Hough-transform


##### Luco Buise
##### MSc Thesis: Radboud University

22-05-2025

---

## Imports

In [None]:
import os

import matplotlib as mpl
import matplotlib.pyplot as plt

# change the following to %matplotlib notebook for interactive plotting
%matplotlib inline

import numpy as np
import pandas as pd
from pandas import DataFrame, Series  # for convenience

import cv2
import trackpy as tp
import ipywidgets as widgets
import pickle
from IPython.display import display
from scipy.optimize import curve_fit
from scipy.signal import savgol_filter

from ipywidgets import HBox, Textarea, interact

---

## Helper Functions

In [None]:
def detect_circles(image, min_radius, max_radius, param1, param2, dp=1.2):
    min_dist = int(0.9 * max_radius)

    # apply Hough transform
    circles = cv2.HoughCircles(image, cv2.HOUGH_GRADIENT, dp, min_dist,
                               param1=param1, param2=param2,
                               minRadius=min_radius, maxRadius=max_radius)
    return circles
    
def capture_frame(video_obj, frame_num):
    video_obj.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
    ret, frame = video_obj.read()
    if not ret:
        return None # if reached ending, stop
    return frame

def draw_circles(ax, circles):
    if circles is not None:
        for pt in circles[0]:
            x, y, r = pt
            circle = plt.Circle((x, y), r, color='r', fill=False)
            ax.add_patch(circle)

def crop_image(img,x0,y0,x1,y1):
    return img[y0:y1,x0:x1,:]


## Import video

Change dir/video name below to select your video.

In [None]:
# choose video
video_dir = "lab_recordings/cluster_dispersion_v3"
video_name = "group_50_floor"
video_path = os.path.join(video_dir, video_name + ".mp4")

In [None]:
# load video
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
if ret:
    x_res = frame.shape[1]
    y_res = frame.shape[0]
else:
    print("Video does not exist")

---

## Determine pixel to cm ratio

A new window will pop up when you run this cell. Select two points between which you know the distance (standard is 150cm, change below if different).

In [None]:
# calculate pixel to cm ratio using something you know the distance of
object_length_cm = 150 # 30 cm for ruler

# convert to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

print("In the just opened window, select two points you know the distance between.")

# helper function
def click_event(event, x, y, flags, param):
    global ix, iy, points
    if event == cv2.EVENT_LBUTTONDOWN:
        points.append((x, y))
        if len(points) == 2:  # after two points are selected, calculate distance
            cv2.line(frame, points[0], points[1], (0, 255, 0), 2)
            cv2.putText(frame, 'Distance in pixels: {:.2f}'.format(np.linalg.norm(np.array(points[1]) - np.array(points[0]))), 
                        (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            cv2.imshow("Frame", frame)

points = []
cv2.imshow("Frame", frame)
cv2.setMouseCallback("Frame", click_event)
cv2.waitKey(0)
cv2.destroyAllWindows()

px_distance = np.linalg.norm(np.array(points[1]) - np.array(points[0]))
print(f"Pixel distance between two points: {px_distance} pixels")

# calc px to cm ratio
px_cm_ratio = object_length_cm / px_distance
print(f"Pixel to cm ratio: {px_cm_ratio} cm/pixel")

---

## Determine cropping parameters

Use to interactive widget to determine how to crop the video, then save the found values in the cell below.

In [None]:
# WIDGET FOR CROPPING

# function to crop and display the frame
def update_crop(crop_x1, crop_x2, crop_y1, crop_y2):
    # capture a frame from the video
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)  # reset to the first frame
    ret, frame = cap.read()
    
    if ret:
        # crop the frame
        crop_frame = frame[crop_y1:crop_y2, crop_x1:crop_x2]
        
        # show the cropped frame
        plt.figure(figsize=(6, 6))
        plt.imshow(cv2.cvtColor(crop_frame, cv2.COLOR_BGR2RGB))  # convert from BGR to RGB
        plt.show()

# create interactive widgets
crop_x1_slider = widgets.IntSlider(min=0, max=x_res, step=1, value=0, description='Crop X1')
crop_x2_slider = widgets.IntSlider(min=0, max=x_res, step=1, value=x_res, description='Crop X2')
crop_y1_slider = widgets.IntSlider(min=0, max=y_res, step=1, value=0, description='Crop Y1')
crop_y2_slider = widgets.IntSlider(min=0, max=y_res, step=1, value=y_res, description='Crop Y2')

# link the widgets with the update function
interactive_plot = widgets.interactive(update_crop, 
                                       crop_x1=crop_x1_slider, 
                                       crop_x2=crop_x2_slider, 
                                       crop_y1=crop_y1_slider, 
                                       crop_y2=crop_y2_slider)

# display the interactive widgets
display(interactive_plot)

In [None]:
# crop the video if desired, using values found above
crop_x = (511, 1301)
crop_y = (131, 932)

---

## Interactive widget for annotation

Using the interactive widget, determine the annotation parameters. Once satisfied, copy them in the cell below the widget.

In [None]:
video = cv2.VideoCapture(video_path)
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

canny_edge_thresh_default = 100
circle_detection_thresh_default = 30
radiusMin_default = 5
radiusMax_default = 30

output = widgets.Output()

@interact(
    frame_num=(0, frame_count - 1),
    canny_edge_thresh=(1, 300, 1), # how sharp are the edges considered
    circle_detection_thresh=(1, 100, 1), # lower = more sensitive, but can detect false positives.
    radius_min=(1, 50, 1),
    radius_max=(5, 100, 1),
)
def explore_video(frame_num=0, canny_edge_thresh=canny_edge_thresh_default,
                  circle_detection_thresh=circle_detection_thresh_default, radius_min=radiusMin_default,
                  radius_max=radiusMax_default):
    
    frame = capture_frame(video, frame_num)
    cropped = crop_image(frame, crop_x[0], crop_y[0], crop_x[1], crop_y[1])
    grey = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
    
    # black and white threshold
    #_, bw = cv2.threshold(gray, black_white_thresh, 255, cv2.THRESH_BINARY)

    # detect circles using Hough transform
    circles = detect_circles(grey, radius_min, radius_max, canny_edge_thresh, circle_detection_thresh)

    # display result
    with output:
        output.clear_output(wait=True)
        fig, ax = plt.subplots(figsize=(6, 6))
        ax.imshow(grey, cmap='gray')
        draw_circles(ax, circles)
        ax.set_title(f"Frame {frame_num}")
        plt.show()

display(HBox([output]))


In [None]:
# decide on params using widget above
canny_edge_thresh = 47
circle_detection_thresh = 15
radiusMin = 10
radiusMax = 19

---

## Get and plot trajectories

Run these cells to get all the trajectories and plot them. If you want to change when to start and end tracking, change the parameters below.

In [None]:
video = cv2.VideoCapture(video_path)
frameCount = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

# change beginning and ending
start_frame = 0
end_frame = frameCount

frames = np.array(range(start_frame, end_frame, 1))

circle_detections = []

for i, frame_num  in enumerate(frames):
    img = capture_frame(video, frame_num )
    if img is not None:
        img = crop_image(img, crop_x[0], crop_y[0], crop_x[1], crop_y[1])
        grayImage = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        circles = detect_circles(grayImage, radiusMin, radiusMax, canny_edge_thresh, circle_detection_thresh)
    
        if circles is not None:
            for circle in circles[0]:  # circles[0] is the list of detections
                x, y, r = circle
                circle_detections.append([x, y, r, frame_num])
    
        if frame_num  % 500 == 0:
            print("Done with frame", frame_num , "/", frameCount) 

In [None]:
#array to dataframe
frame_df = pd.DataFrame(circle_detections, columns=["x", "y", "size", "frame"])

### Change the parameters below to help determine how the trajectories are created. 

In [None]:
# parameters for trajectory creation
max_dist = 10 # maximum distance particle can move between frames
memory = 10 # maximum number of frames during which a feature can vanish, and be considered the same particle
min_length = 100 # minimum length for a trajectory

In [None]:
# get trajectories from the locations
t = tp.link(frame_df, max_dist, memory=memory)

# remove short trajectories
t1 = tp.filter_stubs(t, min_length)

# compare the number of particles in the unfiltered and filtered data.
print('Before stub filtering:', t['particle'].nunique())
print('After stub filtering:', t1['particle'].nunique())

In [None]:
plt.figure()

# frame size
cropped_x_res = crop_x[1] - crop_x[0]
cropped_y_res = crop_y[1] - crop_y[0]

# make plot same shape&size as the frames
ax = plt.gca()
ax.set_xlim([0, cropped_x_res])
ax.set_ylim([0, cropped_y_res])

# add scale bar
scale_bar_length_cm = 30

# convert scale bar length from cm to pixels (using ratio calculated in beginning)
scale_bar_length_pxs = scale_bar_length_cm / px_cm_ratio

# location of the scale bar
x_scale_start = 50
y_scale_start = cropped_y_res - 50
scale_bar_end = x_scale_start + scale_bar_length_pxs

# draw a horizontal line for the scale bar
plt.plot([x_scale_start, scale_bar_end], [y_scale_start, y_scale_start], color='black', lw=2)

# add label for the scale bar
plt.text(x_scale_start + scale_bar_length_pxs / 2, y_scale_start - 5, f'{scale_bar_length_cm} cm', 
         color='black', ha='center', va='bottom', fontsize=12)

# plot traj
tp.plot_traj(t1, label=True)

plt.show()

### If you wish to remove any of the trajectories plotted above, add their particle numbers to the array below.

In [None]:
# remove any additional trajectories you do not want
remove_part = []

t2 = t1.copy()

for p in remove_part:
    t2 = t2[t2.particle != p]
    
plt.figure()

# frame size
cropped_x_res = crop_x[1] - crop_x[0]
cropped_y_res = crop_y[1] - crop_y[0]

# make plot same shape&size as the frames
ax = plt.gca()
ax.set_xlim([0, cropped_x_res])
ax.set_ylim([0, cropped_y_res])

# add scale bar
scale_bar_length_cm = 30

# convert scale bar length from cm to pixels (using ratio calculated in beginning)
scale_bar_length_pxs = scale_bar_length_cm / px_cm_ratio

# location of the scale bar
x_scale_start = 50
y_scale_start = cropped_y_res - 50
scale_bar_end = x_scale_start + scale_bar_length_pxs

# draw a horizontal line for the scale bar
plt.plot([x_scale_start, scale_bar_end], [y_scale_start, y_scale_start], color='black', lw=2)

# add label for the scale bar
plt.text(x_scale_start + scale_bar_length_pxs / 2, y_scale_start - 5, f'{scale_bar_length_cm} cm', 
         color='black', ha='center', va='bottom', fontsize=12)

# plot traj
tp.plot_traj(t2, label=True)

plt.show()

---

## Smoothen trajectories

Run these cells if you wish to smoothen the trajectories

In [None]:
def smooth_trajectories(df, window_length=10, polyorder=2):   
    # make sure 'frame' is only a column, not an index level
    df = df.reset_index(drop=True)
    
    # Make sure data is sorted by particle and frame
    df = df.sort_values(['particle', 'frame']).reset_index(drop=True)
    
    # group by particle and smooth each trajectory
    for pid, group in df.groupby('particle'):
        n = len(group)
        
        # adjust window_length if too short
        wl = min(window_length, n if n % 2 == 1 else n - 1)
        if wl < polyorder + 2:
            # If trajectory too short for savgol, just copy raw data
            df.loc[group.index, 'x'] = group['x']
            df.loc[group.index, 'y'] = group['y']
            continue
        
        # smooth x and y separately
        df.loc[group.index, 'x'] = savgol_filter(group['x'], wl, polyorder)
        df.loc[group.index, 'y'] = savgol_filter(group['y'], wl, polyorder)
    
    return df

In [None]:
# smoothen
t2 = smooth_trajectories(t2)

plt.figure()

# frame size
cropped_x_res = crop_x[1] - crop_x[0]
cropped_y_res = crop_y[1] - crop_y[0]

# make plot same shape&size as the frames
ax = plt.gca()
ax.set_xlim([0, cropped_x_res])
ax.set_ylim([0, cropped_y_res])

# add scale bar
scale_bar_length_cm = 30

# convert scale bar length from cm to pixels (using ratio calculated in beginning)
scale_bar_length_pxs = scale_bar_length_cm / px_cm_ratio

# location of the scale bar
x_scale_start = 50
y_scale_start = cropped_y_res - 50
scale_bar_end = x_scale_start + scale_bar_length_pxs

# draw a horizontal line for the scale bar
plt.plot([x_scale_start, scale_bar_end], [y_scale_start, y_scale_start], color='black', lw=2)

# add label for the scale bar
plt.text(x_scale_start + scale_bar_length_pxs / 2, y_scale_start - 5, f'{scale_bar_length_cm} cm', 
         color='black', ha='center', va='bottom', fontsize=12)

# plot traj
tp.plot_traj(t2, label=True)

plt.show()

---

## Save trajectory dataframe and parameters as pickle using dict

In [None]:
# save dir
save_dir = "saved_trajectories_pkl"
file_name = video_name + ".pkl"

save_file_path = os.path.join(save_dir, file_name)

In [None]:
traj_dict = {"traj_df":t2, "px_cm_ratio":px_cm_ratio, "x_res_px":cropped_x_res, "y_res_px":cropped_y_res}

# save dict as pickle
try:
    with open(save_file_path, 'wb') as file:
        pickle.dump(traj_dict, file)
except Exception as e:
    print(f"Error pickling dictionary: {e}")

In [None]:
# reading a pickle file
try:
    with open(save_file_path, 'rb') as file:
        read_pkl_dict = pickle.load(file)
except Exception as e:
    print(f"Error unpickling dictionary: {e}")

read_pkl_dict["traj_df"].head()

---

## Save trajectory dataframe as CSV (does not include parameters)

In [None]:
# save dir
save_dir = "saved_trajectories_csv"
file_name = video_name + ".csv"

save_file_path = os.path.join(save_dir, file_name)

In [None]:
# save df as csv
t2.to_csv(save_file_path)

In [None]:
# reading a csv file
read_csv_df = pd.read_csv(save_file_path)

read_csv_df.head()

---

## Create video trajectory overlapping the tracking trajectory

In [None]:
# decide where and how to save the file
output_video_name = video_name + "_trajs.mp4"
output_path = os.path.join('traj_videos', output_video_name)

# parameters for video
fading_trail = True
max_trail = 100  # how many past positions to show if using fading trail

line_colour = (0, 0, 255)
line_thickness = 1

# load data
df = t2

# video reading
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
crop_width = crop_x[1] - crop_x[0]
crop_height = crop_y[1] - crop_y[0]

# output video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (crop_width, crop_height))

# fast-forward to start_frame
video.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

frame_idx = 0
trajectory_history = {}  # particle_id -> list of (x, y)

for abs_frame_idx in range(start_frame, end_frame):
    ret, frame = video.read()
    if not ret:
        break

    # crop frame
    cropped = frame[crop_y[0]:crop_y[1], crop_x[0]:crop_x[1]].copy()
    
    # get current frame's data
    current_frame_data = df[df['frame'] == abs_frame_idx]

    for _, row in current_frame_data.iterrows():
        x, y = int(row['x']), int(row['y'])
        r = int(row['size'])  # radius
        p_id = int(row['particle'])

        # add to trajectory history
        if p_id not in trajectory_history:
            trajectory_history[p_id] = []
        trajectory_history[p_id].append((x, y))

        # draw current robot as circle
        #cv2.circle(cropped, (x, y), r, (0, 255, 0), -1)

        if not fading_trail:
            # draw full trajectory line
            pts = trajectory_history[p_id]
            for i in range(1, len(pts)):
                cv2.line(cropped, pts[i - 1], pts[i], line_colour, line_thickness)
        else:
            overlay = cropped.copy()
            pts = trajectory_history[p_id]
    
            for i in range(max(1, len(pts) - max_trail), len(pts)):
                pt1 = pts[i - 1]
                pt2 = pts[i]
                alpha = (i - (len(pts) - max_trail)) / max_trail  # 0 to 1
                color = (0, 0, int(255 * (1 - alpha)))  # fade from bright red to dark
                cv2.line(overlay, pt1, pt2, color, line_thickness)
    
            # blend overlay with original cropped frame
            cv2.addWeighted(overlay, 0.6, cropped, 0.4, 0, cropped)

    out.write(cropped)
    frame_idx += 1

video.release()
out.release()
print("Video saved to:", output_path)