In [1]:
import cv2
import torch
import math
import numpy as np
from collections import deque
from typing import Tuple, Optional, Deque
from ultralytics import YOLO

class SyringeVolumeCalculator:
    def __init__(self) -> None:
        self.model = YOLO("runs/pose/train-pose11x-v14/weights/best.pt").eval()
        if torch.cuda.is_available():
            self.device = "cuda"
        elif torch.backends.mps.is_available():
            self.device = "mps"
        else:
            self.device = "cpu"
        self.model.to(self.device)
        self.volume_history: Deque[float] = deque(maxlen=3)
        self.syringe_diameter: float = 1.0  # cm
        self.ll_point: Optional[np.ndarray] = None
        self.ul_point: Optional[np.ndarray] = None
        self.ur_point: Optional[np.ndarray] = None
        self.lr_point: Optional[np.ndarray] = None

    @staticmethod
    def calculate_line_equation(p1: np.ndarray, p2: np.ndarray) -> Tuple[float, float, float]:
        x1, y1 = p1
        x2, y2 = p2
        A = y2 - y1
        B = x1 - x2
        C = (x2 - x1) * y1 - (y2 - y1) * x1
        return (A, B, C)

    def calculate_parallel_distance(
        self,
        line1: Tuple[float, float, float],
        line2: Tuple[float, float, float]
    ) -> Optional[float]:
        A1, B1, C1 = line1
        A2, B2, C2 = line2
        if abs(A1 * B2 - A2 * B1) > 1e-6:
            return None
        return abs(C2 - C1) / math.hypot(A1, B1)

    @staticmethod
    def point_to_line_distance(point: np.ndarray, line: Tuple[float, float, float]) -> float:
        x, y = point
        A, B, C = line
        return abs(A * x + B * y + C) / math.hypot(A, B)

    def calculate_height(
        self,
        lower_line_eq: Tuple[float, float, float],
        upper_line_eq: Tuple[float, float, float],
        scale_factor: float
    ) -> float:
        h_pixels = self.calculate_parallel_distance(lower_line_eq, upper_line_eq)
        d_ll = self.point_to_line_distance(self.ll_point, upper_line_eq) if self.ll_point is not None else 0
        d_lr = self.point_to_line_distance(self.lr_point, upper_line_eq) if self.lr_point is not None else 0
        avg_point_dist = (d_ll + d_lr) / 2
        if h_pixels is None or abs(h_pixels - avg_point_dist) > 5:
            return avg_point_dist * scale_factor
        return ((h_pixels * 0.7) + (avg_point_dist * 0.3)) * scale_factor

    def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, Optional[float]]:
        results = self.model.predict(frame, verbose=False, conf=0.6)
        if not results:
            return frame, None
        result = results[0]
        annotated_frame = result.plot()
        if result.keypoints is None or len(result.keypoints.xy[0]) < 4:
            return annotated_frame, None
        try:
            kpts = result.keypoints.xy[0].cpu().numpy()
            # Assume keypoint order: lower left, upper left, upper right, lower right.
            self.ll_point, self.ul_point, self.ur_point, self.lr_point = kpts[:4]
        except Exception as e:
            print(f"Keypoint extraction error: {e}")
            return annotated_frame, None

        self.draw_debug_info(annotated_frame)

        try:
            lower_line_eq = self.calculate_line_equation(self.ll_point, self.lr_point)
            upper_line_eq = self.calculate_line_equation(self.ul_point, self.ur_point)
            lower_edge_px = np.linalg.norm(self.lr_point - self.ll_point)
            upper_edge_px = np.linalg.norm(self.ur_point - self.ul_point)
            if lower_edge_px <= 0 or upper_edge_px <= 0:
                return annotated_frame, None
            scale_factor = 1.0 / ((lower_edge_px + upper_edge_px) / 2.0)
            height_cm = self.calculate_height(lower_line_eq, upper_line_eq, scale_factor)
            if height_cm <= 0 or height_cm > 30:
                return annotated_frame, None
            volume = math.pi * (self.syringe_diameter / 2) ** 2 * height_cm
            self.volume_history.append(volume)
            smoothed_volume = float(np.median(self.volume_history))
            return annotated_frame, smoothed_volume
        except Exception as e:
            print(f"Processing error: {e}")
            return annotated_frame, None

    def draw_debug_info(self, frame: np.ndarray) -> None:
        debug_params = {
            'fontFace': cv2.FONT_HERSHEY_SIMPLEX,
            'fontScale': 0.6,
            'color': (0, 0, 255),
            'thickness': 1
        }
        labels = ['LL', 'UL', 'UR', 'LR']
        points = [self.ll_point, self.ul_point, self.ur_point, self.lr_point]
        for label, pt in zip(labels, points):
            if pt is not None:
                cv2.putText(frame, label, (int(pt[0]) + 10, int(pt[1])), **debug_params)
        if self.ll_point is not None and self.lr_point is not None:
            lower_len = np.linalg.norm(self.lr_point - self.ll_point)
            cv2.putText(frame, f"Lower: {lower_len:.1f}px", (10, 100), **debug_params)
        if self.ul_point is not None and self.ur_point is not None:
            upper_len = np.linalg.norm(self.ur_point - self.ul_point)
            cv2.putText(frame, f"Upper: {upper_len:.1f}px", (10, 130), **debug_params)

    def run_on_video(self, input_video_path: str, output_video_path: str) -> None:
        cap = cv2.VideoCapture(input_video_path)
        if not cap.isOpened():
            raise IOError(f"Cannot open video file: {input_video_path}")
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps <= 0:
            fps = 30
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        current_frame = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            processed_frame, volume = self.process_frame(frame)
            if volume is not None:
                cv2.putText(processed_frame, f"Volume: {volume:.2f} mL", (10, 30),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                height_cm = volume / (math.pi * (self.syringe_diameter / 2) ** 2)
                cv2.putText(processed_frame, f"Height: {height_cm:.1f} cm", (10, 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
            out.write(processed_frame)
            current_frame += 1
            print(f"Processing frame {current_frame}/{frame_count}", end="\r")
        cap.release()
        out.release()
        cv2.destroyAllWindows()
        print("\nExport complete!")

# Set the paths for your video files.
input_video_path: str = "input_video.mp4"   # Replace with the path to your input video file.
output_video_path: str = "output_video.mp4" # Replace with the desired path for the output video.

# Create an instance and run video processing.
calculator = SyringeVolumeCalculator()
calculator.run_on_video(input_video_path, output_video_path)

OpenCV: Couldn't read video stream from file "input_video.mp4"


OSError: Cannot open video file: input_video.mp4