# New approaches to axis detection

In [87]:
import cv2
from pathlib import Path
import numpy as np
from matplotlib import pyplot as plt
import math
import pandas as pd
from scipy.signal import savgol_filter
from scipy.signal import medfilt
import ast
from scipy.spatial.transform import Rotation 

Import video

In [88]:
video_number = "5"
# Define the relative path to the video file
notebook_dir = Path().resolve()
project_root = notebook_dir.parent.parent
video_path = project_root / "data" / f"recording_{video_number}" / f"Recording_{video_number}.mp4" 
video_path = str(video_path)

# Load the video
cap = cv2.VideoCapture(video_path)

# Check
print(f"Opened: {cap.isOpened()}, FPS: {cap.get(cv2.CAP_PROP_FPS)}, Total Frames: {cap.get(cv2.CAP_PROP_FRAME_COUNT)}")
fps = round(cap.get(cv2.CAP_PROP_FPS))
#size of the video
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Width: {width}, Height: {height}")

Opened: True, FPS: 30.0, Total Frames: 258.0
Width: 1920, Height: 1080


Import data

In [89]:
# Define the path to the CSV file
input_data_path = project_root / "data" / "auxiliary_data" / "circle_positions" / f"Adjusted_positions_new_{video_number}.csv"

# Load the CSV file into a DataFrame
df = pd.read_csv(input_data_path)

Cut the frames roi (gray)

In [90]:
def get_two_roi(x, y, x_next, y_next, r, curr_frame, next_frame, width, height,  off=2):
    x0, x1 = max(x-r-off,0), min(x+r+off, width)
    y0, y1 = max(y-r-off,0), min(y+r+off, height)

    x0_next, x1_next = max(x_next-r-off,0), min(x_next+r+off, width)
    y0_next, y1_next = max(y_next-r-off,0), min(y_next+r+off, height)

    roi1 = curr_frame[y0:y1, x0:x1]
    roi2 = next_frame[y0_next:y1_next, x0_next:x1_next]
    g1 = cv2.cvtColor(roi1, cv2.COLOR_BGR2GRAY)
    g2 = cv2.cvtColor(roi2, cv2.COLOR_BGR2GRAY)
    c_roi = np.array([x - x0, y - y0])

    return g1, g2, c_roi

Sample points random

In [91]:
def sample_points_random(frame, center_roi, r):
    # Randomly sample points within the circle
    pts = []
    while len(pts) < 1000:
        xi = np.random.randint(0, frame.shape[1])
        yi = np.random.randint(0, frame.shape[0])
        if (xi-center_roi[0])**2 + (yi-center_roi[1])**2 <= (r*0.7)**2:
            pts.append([xi, yi])
    p0 = np.array(pts, dtype=np.float32).reshape(-1,1,2)

    if p0.size == 0:
        return None
    return p0

Sample points manually

In [92]:
def sample_points_manually(center_roi, r):
    # selezione 100 punti manuali distribuiti su 10 cerchi concentrici
    manual_points = []
    num_rings = 10  # numero di cerchi concentrici
    points_per_ring = 10  # punti per ogni cerchio (10 × 10 = 100)
    max_radius = r * 0.99  # lascia un margine vicino al bordo

    for i in range(1, num_rings + 1):
        r = (i / num_rings) * max_radius
        for j in range(points_per_ring):
            if j<20 or j>9:
                theta = 2 * np.pi * j / points_per_ring
                x = int(center_roi[0] + r * np.cos(theta))
                y = int(center_roi[1] + r * np.sin(theta))
                manual_points.append((x, y))

    # Converti in formato richiesto da calcOpticalFlowPyrLK
    p0 = np.array(manual_points, dtype=np.float32).reshape(-1, 1, 2)

    if p0.size == 0:
        return None
    return p0

Compute the new points with optical flow

In [93]:
def new_points_optical_flow(g1, g2, p0):
    p1, st, _ = cv2.calcOpticalFlowPyrLK(
                g1, g2, p0, None,
                winSize=(15,15), maxLevel=2,
                criteria=(cv2.TERM_CRITERIA_EPS|cv2.TERM_CRITERIA_COUNT, 10, 0.03)
            )

    return p1, st

Get the 3d points

In [94]:
def get_3d_points(p0, p1, st, c_roi, r):
    old3d, new3d = [], []
    for o, n, s in zip(p0.reshape(-1,2), p1.reshape(-1,2), st.reshape(-1)):
        if s:
            ox, oy = o - c_roi
            nx, ny = n - c_roi
            oz = np.sqrt(max(r*r - ox*ox - oy*oy, 0))
            nz = np.sqrt(max(r*r - nx*nx - ny*ny, 0))
            old3d.append([ox, oy, oz])
            new3d.append([nx, ny, nz])

    if len(old3d) < 3:
        return None, None

    old3d = np.array(old3d)
    new3d = np.array(new3d)

    return old3d, new3d

Compute the axis with SVD

In [95]:
def get_axis_svd(old3d, new3d):
    # Compute Rotation matrix using SVD
    Hmat = old3d.T @ new3d
    U, S, Vt = np.linalg.svd(Hmat)
    R = Vt.T @ U.T
    if np.linalg.det(R) < 0:
        Vt[-1,:] *= -1
        R = Vt.T @ U.T

    # Compute the rotation axis and angle
    theta = np.arccos(np.clip((np.trace(R)-1)/2, -1,1))
    if np.sin(theta) != 0:
        ax = np.array([R[2,1]-R[1,2],
                        R[0,2]-R[2,0],
                        R[1,0]-R[0,1]])
        ax /= (2*np.sin(theta))
        ax /= np.linalg.norm(ax)
        return ax, theta
    return None, None

Compute axis with Kabsch SVD

In [96]:
def axis_kabsch(P1, P2):
    H = P1.T @ P2
    U, S, Vt = np.linalg.svd(H)
    R = Vt.T @ U.T
    if np.linalg.det(R) < 0:
        Vt[-1, :] *= -1
        R = Vt.T @ U.T
    # Eigenvector of R corresponding to eigenvalue 1
    axis = np.real_if_close(np.linalg.svd(R - np.eye(3))[2][-1])
    return axis / np.linalg.norm(axis)

Get axis with Rodriguez formula

In [97]:
def axis_rodrigues(p1, p2):
    cross = np.cross(p1, p2)
    if np.linalg.norm(cross) < 1e-6:
        raise ValueError("Vectors are nearly colinear or identical.")
    axis = cross / np.linalg.norm(cross)
    return axis

In [98]:
def average_rodrigues_axis(P1, P2):
    axes = []
    for i in range(len(P1)):
        try:
            a = axis_rodrigues(P1[i], P2[i])
            axes.append(a)
        except:
            pass
    return np.mean(axes, axis=0) / np.linalg.norm(np.mean(axes, axis=0))

Get axis with quaternions

In [99]:
def axis_quaternion(P1, P2):
    rot, _ = Rotation.align_vectors(P2, P1)
    rotvec = rot.as_rotvec()
    axis = rotvec / np.linalg.norm(rotvec)
    return axis

Get axis from velocities

In [100]:
def axis_from_velocity(P1, P2, delta_t):
    V = (P2 - P1) / delta_t  # Nx3 velocity vectors
    A = np.zeros((3 * len(P1), 3))
    b = np.zeros((3 * len(P1), 1))
    
    for i in range(len(P1)):
        r = P1[i]
        vx, vy, vz = V[i]
        # ω x r = v => [r]_x ω = v
        skew = np.array([[0, -r[2], r[1]],
                         [r[2], 0, -r[0]],
                         [-r[1], r[0], 0]])
        A[3*i:3*i+3, :] = skew
        b[3*i:3*i+3, 0] = V[i]
    
    # Least squares solution
    omega, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
    axis = omega.flatten() / np.linalg.norm(omega)
    return axis

Main

In [101]:
def compute_axis(x, y, x_next, y_next, r, curr_frame, next_frame, width, height, max_iterations=3):
    grey_curr, grey_next, center_roi = get_two_roi(x, y, x_next, y_next, r, curr_frame, next_frame, width, height)
    
    axes = []
    angles = []

    for _ in range(max_iterations):
        p0 = sample_points_random(grey_curr, center_roi, r)
        # p0 = sample_points_manually(center_roi, r)

        if p0 is None:
            break
        p1, st = new_points_optical_flow(grey_curr, grey_next, p0)
        curr_3d_points, next_3d_points = get_3d_points(p0, p1, st,center_roi, r)
        if curr_3d_points is None or next_3d_points is None:
            break

        axis, angle = get_axis_svd(curr_3d_points, next_3d_points)
        # axis = axis_kabsch(curr_3d_points, next_3d_points) # similar to get_axis_svd
        # axis = average_rodrigues_axis(curr_3d_points, next_3d_points) # bad results
        # axis = axis_quaternion(curr_3d_points, next_3d_points) # similar to get_axis_svd
        # axis = axis_from_velocity(curr_3d_points, next_3d_points, 1/fps) # good results
        
        if axis is None:
            break
        axes.append(axis)
        angles.append(angle)
    
    if axes:
            avg_axis = np.mean(np.vstack(axes), axis=0)
            avg_axis /= np.linalg.norm(avg_axis)
            avg_angle = np.mean(angles)
            return avg_axis, avg_angle
    return None, None

Generate video

In [102]:
# --- prepare writer ---
fps = cap.get(cv2.CAP_PROP_FPS)
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out_path = str(project_root / "data" / f"recording_{video_number}" / "test_video" / f"Spin_video_{video_number}.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(str(out_path), fourcc, fps, (W, H))

# circle video
# Define the output path for the new video
output_path = str(project_root / "data" / f"recording_{video_number}" / "test_video" / f"Red_Circle_Video_{video_number}.mp4")

# Create a VideoWriter object
circle_video = cv2.VideoWriter(output_path, fourcc, fps, (300, 300))

# Calculate the total number of frames in the original video
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Prepare list to log axis endpoints
log = []

# Read frame0 and frame1
ret_curr, curr = cap.read()
ret_next, next_ = cap.read()
frame_idx = 0

while ret_curr and ret_next:
    vis = curr.copy()
    pA, pB = (np.nan, np.nan), (np.nan, np.nan)
    axis = None
    # Only compute if we have valid circle data
    if frame_idx < len(df)-1 and not df.iloc[frame_idx][['x','y','radius']].isna().any() and not df.iloc[frame_idx+1][['x','y','radius']].isna().any():
        x, y, r = df.iloc[frame_idx][['x','y','radius']].astype(int)
        x_next, y_next, r_next = df.iloc[frame_idx+1][['x','y','radius']].astype(int)
        # Get the axis of rotation
        axis, angle = compute_axis(x, y, x_next, y_next, r, curr, next_, W, H)


        # Get the Intersection points with the sphere
        if axis is not None:
            # add axis to df
            df.at[frame_idx, 'axis_x'] = axis[0]
            df.at[frame_idx, 'axis_y'] = axis[1]
            df.at[frame_idx, 'axis_z'] = axis[2]
            df.at[frame_idx, 'angle'] = angle
            # Calculate the intersection points with the sphere
            pA = axis * r
            pB = - axis * r
            pA = (x + pA[0], y + pA[1])
            pB = (x + pB[0], y + pB[1])

            # Draw the axis on the current frame
            cv2.line(vis, (int(pA[0]), int(pA[1])), (int(pB[0]), int(pB[1])), (0, 255, 0), 2)
            cv2.circle(vis, (int(pA[0]), int(pA[1])), 5, (0, 255, 255), -1)
            cv2.circle(vis, (int(pB[0]), int(pB[1])), 5, (0, 0, 255), -1)
            cv2.circle(vis, (int(x), int(y)), 5, (255, 0, 0), 2)

    # log the endpoints for this frame
    log.append({
        'frame': frame_idx,
        'pA_x': pA[0], 'pA_y': pA[1],
        'pB_x': pB[0], 'pB_y': pB[1]
    })

    out.write(vis)

    # Create a black square
    frame = np.zeros((300, 300, 3), dtype=np.uint8)
    
    # Draw a red circle in the center
    center = (150, 150)  # Center of the square
    radius = 100  # Radius of the circle
    color = (0, 0, 255)  # Red color in BGR
    thickness = -1  # Fill the circle
    cv2.circle(frame, center, radius, color, thickness)
    if axis is not None:
        # convert points to circle coordinates
        pA = axis * 100
        pB = - axis * 100
        pA = (150 + pA[0], 150 + pA[1])
        pB = (150 + pB[0], 150 + pB[1])
        cv2.circle(frame, (int(pA[0]), int(pA[1])), 5, (255, 255, 0), -1)
        cv2.circle(frame, (int(pB[0]), int(pB[1])), 5, (0, 255, 255), -1)
        cv2.circle(frame, center, 5, (255, 0, 0), 2)
        cv2.line(frame, (int(pA[0]), int(pA[1])), (int(pB[0]), int(pB[1])), (0, 255, 0), 2)
    
    # Write the frame to the video
    circle_video.write(frame)


    # advance frames
    frame_idx += 1
    curr = next_
    ret_next, next_ = cap.read()
    ret_curr = curr is not None

# write last frame if exists
if ret_curr:
    vis = curr.copy()
    log.append({'frame': frame_idx,
                'pA_x': np.nan, 'pA_y': np.nan,
                'pB_x': np.nan, 'pB_y': np.nan})
    out.write(vis)

cap.release()
out.release()
circle_video.release()


Save in csv file

In [103]:
output_data_path = project_root / "notebook" / "spin" / "intermediate_data" / f"Axis_{video_number}.csv"
df.to_csv(output_data_path, index=False)
print(f"DataFrame saved to {output_data_path}")

DataFrame saved to C:\Users\miche\OneDrive\Documenti\GitHub\bowling-analysis\notebook\spin\intermediate_data\Axis_5.csv
