In [1]:
import torch

In [28]:
import cv2
import time
import platform
from threading import Thread, Event
from queue import Queue

class CrossPlatformCamera:
    def __init__(self, interval=0.5):
        self.interval = interval
        self.frame_queue = Queue(maxsize=1)
        self.stop_event = Event()
        self.os_type = platform.system()
        
    def _select_camera_api(self):
        """根据操作系统选择摄像头API"""
        if self.os_type == "Darwin":  # macOS
            return cv2.CAP_AVFOUNDATION
        elif self.os_type == "Windows":
            return cv2.CAP_DSHOW
        else:
            return cv2.CAP_ANY
    
    def _camera_worker(self):
        """摄像头采集线程"""
        api_preference = self._select_camera_api()
        cap = cv2.VideoCapture(0, api_preference)
        
        if not cap.isOpened():
            raise RuntimeError("无法打开摄像头")
        
        try:
            while not self.stop_event.is_set():
                ret, frame = cap.read()
                if ret:
                    if not self.frame_queue.empty():
                        try:
                            self.frame_queue.get_nowait()
                        except:
                            pass
                    self.frame_queue.put(frame)
                time.sleep(0.01)  # 降低CPU占用
        finally:
            cap.release()
    
    def _timer_worker(self):
        """定时截图线程"""
        last_capture = time.time()
        while not self.stop_event.is_set():
            if time.time() - last_capture >= self.interval:
                if not self.frame_queue.empty():
                    frame = self.frame_queue.get()
                    self.process_frame(frame)
                last_capture = time.time()
            time.sleep(0.01)
    
    def process_frame(self, frame):
        """图像处理函数（需自定义）"""
        # 在此处添加动作识别逻辑
        # 示例：保存截图用于后续处理
        cv2.imwrite(f"capture_{time.time()}.jpg", frame)
        print("截图已保存并等待处理")
    
    def start(self):
        Thread(target=self._camera_worker, daemon=True).start()
        Thread(target=self._timer_worker, daemon=True).start()
        print("摄像头监控已启动...")
    
    def stop(self):
        self.stop_event.set()
        print("摄像头监控已停止")

# 使用示例
if __name__ == "__main__":
    camera = CrossPlatformCamera(interval=0.5)
    camera.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        camera.stop()

摄像头监控已启动...
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
截图已保存并等待处理
摄像头监控已停止


In [31]:
class ActionClassifier:
    def __init__(self, model_path):
        self.model = torch.jit.load(model_path)
        self.model.eval()
    
    def predict(self, tensor):
        with torch.no_grad():
            output = self.model(tensor.unsqueeze(0))
            return torch.argmax(output).item()

def process_frame(self, frame):
    # 预处理
    processed = preprocess(frame)
    
    # 推理
    prediction = self.classifier.predict(processed)
    
    # 记录结果
    if prediction == 1:
        print("检测到拉弓动作！")
    else:
        print("无特定动作")