In [None]:
import requests
import numpy as np
from pydantic import BaseModel
import os
import cv2
from scipy.signal import savgol_filter

SERVICES = {
    'asr': 'http://127.0.0.1:8001',
    'diarization': 'http://127.0.0.1:8002',
    'emotion': 'http://127.0.0.1:8003',
    'nlp': 'http://127.0.0.1:8004',
    'nonverb': 'http://127.0.0.1:8005',
    'robot_data': 'http://127.0.0.1:8006',
    'robot_speed': 'http://127.0.0.1:8007'
}

# CONFIGURATION 
ARUCO_DICT = cv2.aruco.getPredefinedDictionary(
    cv2.aruco.DICT_5X5_100
)  # change for appropriate ArUco code
ARUCO_PARAMS = cv2.aruco.DetectorParameters()
ARUCO_DETECTOR = cv2.aruco.ArucoDetector(ARUCO_DICT, ARUCO_PARAMS)
MARKER_ID = None
PIXELS_PER_CM = 4.0   # adjust when camera calibration is known
SMOOTH_WINDOW = 9

def compute_robot_winning_rate(video_path: str, window_start: float, window_end: float) -> dict:
    """
    Call the robot speed service to compute a winning rate
    for the given time window in the video.
    """
    payload = {
        "video_path": video_path,
        "window_start": window_start,
        "window_end": window_end,
    }
    response = requests.post(
        f"{SERVICES['robot_speed']}/winning_rate",
        json=payload,
        timeout=60,
    )
    response.raise_for_status()
    return response.json()

def compute_speed_for_window(video_path: str, window_start: float, window_end: float):
    """
    Process only the specified time window of the video and
    return average speed (cm/s) and number of detections.
    """
    if window_end <= window_start:
        raise ValueError("window_end must be greater than window_start")

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open video file: {video_path}")

    # Seek to window start (in milliseconds)
    cap.set(cv2.CAP_PROP_POS_MSEC, window_start * 1000.0)
    
    positions = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Current timestamp in seconds
        t = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0

        # Stop once we are past the window
        if t >= window_end:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        corners, ids, _ = ARUCO_DETECTOR.detectMarkers(gray)

        if ids is not None:
            for i, marker_id in enumerate(ids.flatten()):
                if MARKER_ID is None or marker_id == MARKER_ID:
                    c = corners[i][0]
                    center = np.mean(c, axis=0)
                    x, y = center
                    positions.append((t, x, y))

    cap.release()

    if len(positions) <= 1:
        # Not enough detections in this window to compute speed
        return np.nan, 0

    positions = np.array(positions, dtype=float)
    t, x, y = positions[:, 0], positions[:, 1], positions[:, 2]

    # Smooth positions for stability
    if len(x) >= SMOOTH_WINDOW:
        x = savgol_filter(x, SMOOTH_WINDOW, 3)
        y = savgol_filter(y, SMOOTH_WINDOW, 3)

    # Calculate instantaneous speeds
    dt = np.diff(t)
    dx, dy = np.diff(x), np.diff(y)
    dt[dt == 0] = np.finfo(float).eps  # avoid division by zero

    distance = np.sqrt(dx**2 + dy**2)  # pixel distance
    speed_pixels = distance / dt  # pixels per second
    
    # Convert to cm/s
    speed_cm_s = speed_pixels / PIXELS_PER_CM

    avg_speed_cm_s = float(np.mean(speed_cm_s))
    num_detections = len(t)

    return avg_speed_cm_s, num_detections

def test_marker_detection(video_path: str, start_time: float = 0.0, num_frames: int = 10):
    """Test marker detection and display results."""
    cap = cv2.VideoCapture(video_path)
    cap.set(cv2.CAP_PROP_POS_MSEC, start_time * 1000.0)
    
    for i in range(num_frames):
        ret, frame = cap.read()
        if not ret:
            break
            
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        corners, ids, _ = ARUCO_DETECTOR.detectMarkers(gray)
        
        # Draw detected markers
        if ids is not None:
            cv2.aruco.drawDetectedMarkers(frame, corners, ids)
            print(f"Frame {i}: Detected markers {ids.flatten()}")
        else:
            print(f"Frame {i}: No markers detected")
    
    cap.release()
    cv2.destroyAllWindows()

In [3]:
dir = "/home/UMRobotics/Desktop/edmo_dataset/07102025_school_id_1/20251007_100704/Sessions/20251007/Suzanne/101120"
audio_name = os.listdir(f"{dir}/Audio/processed")[0]
video_path = f"{dir}/Videos/top_cam/{audio_name.split('.')[0]}.MP4"

response = compute_robot_winning_rate(
    video_path=video_path,
    window_start=120.0,
    window_end=180.0
)
print("Robot Winning Rate:", response)

HTTPError: 500 Server Error: Internal Server Error for url: http://127.0.0.1:8007/winning_rate

In [10]:
compute_speed_for_window(
    video_path=video_path,
    window_start=720.0,
    window_end=750.0
)

(nan, 0)

In [13]:
video_path

'/home/UMRobotics/Desktop/edmo_dataset/07102025_school_id_1/20251007_100704/Sessions/20251007/Suzanne/101120/Videos/top_cam/GX010576.MP4'

In [12]:
test_marker_detection(video_path, start_time=720.0, num_frames=30)

Frame 0: No markers detected
Frame 1: No markers detected
Frame 2: No markers detected
Frame 3: No markers detected
Frame 4: No markers detected
Frame 5: No markers detected
Frame 6: No markers detected
Frame 7: No markers detected
Frame 8: No markers detected
Frame 9: No markers detected
Frame 10: No markers detected
Frame 11: No markers detected
Frame 12: No markers detected
Frame 13: No markers detected
Frame 14: No markers detected
Frame 15: No markers detected
Frame 16: No markers detected
Frame 17: No markers detected
Frame 18: No markers detected
Frame 19: No markers detected
Frame 20: No markers detected
Frame 21: No markers detected
Frame 22: No markers detected
Frame 23: No markers detected
Frame 24: No markers detected
Frame 25: No markers detected
Frame 26: No markers detected
Frame 27: No markers detected
Frame 28: No markers detected
Frame 29: No markers detected
