ALL IMPORTS

Frames input

In [None]:
import os
import collections
import time
import json
import cv2
import numpy as np


def video_to_frames(video_path, target_fps=10, skip_first_frames=0):
    """將影片轉為每 target_fps 擷取一張的幀畫面"""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"無法開啟影片：{video_path}")

    original_fps = cap.get(cv2.CAP_PROP_FPS)
    interval = max(int(round(original_fps / target_fps)), 1)

    frames = []
    frame_counter = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_counter += 1
        if frame_counter <= skip_first_frames:
            continue
        if frame_counter % interval == 0:
            frames.append(frame)

    cap.release()
    print(f"總共擷取 {len(frames)} 張幀畫面")
    return frames


def run_inference_from_frames(
    frames,
    output_json_path="output.json",
    model=None,
    device="CPU",
    flip=False,
):
    """接收一組影像幀進行推論，結果輸出為 JSON"""
    import collections

    if model is None:
        raise ValueError("請提供 pose_model")

    ov_config = {}
    if "GPU" in device or ("AUTO" in device and "GPU" in core.available_devices):
        ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}

    compiled_model = core.compile_model(model, device, ov_config)

    if pose_model.predictor is None:
        custom = {"conf": 0.25, "batch": 1, "save": False, "mode": "predict"}
        args = {**seg_model.overrides, **custom}
        pose_model.predictor = pose_model._smart_load("predictor")(overrides=args, _callbacks=pose_model.callbacks)
        pose_model.predictor.setup_model(model=pose_model.model)

    pose_model.predictor.model.ov_compiled_model = compiled_model

    json_results = collections.deque()
    processing_times = collections.deque()

    for i, frame in enumerate(frames):
        if flip:
            frame = cv2.flip(frame, 1)

        scale = 1280 / max(frame.shape)
        if scale < 1:
            frame = cv2.resize(frame, dsize=None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)

        input_image = np.array(frame)

        try:
            start_time = time.time()
            detections = pose_model(input_image)
            stop_time = time.time()

            detection = detections[0]
            result = {
                "boxes": detection.boxes.xyxy.tolist() if hasattr(detection, "boxes") else [],
                "keypoints": detection.keypoints.xy.tolist() if hasattr(detection, "keypoints") else []
            }
            json_results.append(result)

            processing_times.append(stop_time - start_time)
            if len(processing_times) > 200:
                processing_times.popleft()
            avg_time = np.mean(processing_times) * 1000
            fps = 1000 / avg_time
            print(f"[Frame {i+1}/{len(frames)}] 推論時間: {avg_time:.1f}ms ({fps:.1f} FPS)")

        except Exception as e:
            print(f"[推論錯誤 @ frame {i}]: {e}")

    # 輸出 JSON
    with open(output_json_path, "w", encoding="utf-8") as f:
        json.dump(list(json_results), f, indent=2)
    print(f"✅ 骨架與物件框結果已輸出至: {output_json_path}")

In [None]:
frames = video_to_frames("test.mp4", target_fps=10)
run_inference_from_frames(
    frames,
    output_json_path="output_jsons/test.json",
    model=pose_model,
    device=device
)


人

In [14]:
quantized_pose_model = core.read_model("yolov8n-pose_openvino_int8_model/yolov8n-pose.xml")

In [None]:
from PIL import Image
from ultralytics import YOLO
from pathlib import Path
import openvino as ov
from notebook_utils import download_file, VideoPlayer, device_widget

POSE_MODEL_NAME = "yolov8n-pose"
pose_model_path = Path(f"{POSE_MODEL_NAME}_openvino_model/{POSE_MODEL_NAME}.xml")
pose_model = YOLO(f"{POSE_MODEL_NAME}.pt")
label_map = pose_model.model.names
device = "AUTO"  # Change to your desired device, e.g., "CPU", "GPU", "AUTO"

core = ov.Core()
pose_ov_model = core.read_model(pose_model_path)

ov_config = {}
if device != "CPU":
    pose_ov_model.reshape({0: [1, 3, 640, 640]})

if "GPU" in device or ("AUTO" in device and "GPU" in core.available_devices):
    ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
    
pose_compiled_model = core.compile_model(pose_ov_model, device, ov_config)

pose_model = YOLO(pose_model_path.parent, task="pose")

if pose_model.predictor is None:
    custom = {"conf": 0.25, "batch": 1, "save": False, "mode": "predict"}
    args = {**pose_model.overrides, **custom}
    pose_model.predictor = pose_model._smart_load("predictor")(overrides=args, _callbacks=pose_model.callbacks)
    pose_model.predictor.setup_model(model=pose_model.model)

pose_model.predictor.model.ov_compiled_model = pose_compiled_model    

pose_compiled_model = core.compile_model(pose_ov_model, device, ov_config)

pose_model = YOLO(pose_model_path.parent, task="pose")

if pose_model.predictor is None:
    custom = {"conf": 0.25, "batch": 1, "save": False, "mode": "predict"}
    args = {**pose_model.overrides, **custom}
    pose_model.predictor = pose_model._smart_load("predictor")(overrides=args, _callbacks=pose_model.callbacks)
    pose_model.predictor.setup_model(model=pose_model.model)

pose_model.predictor.model.ov_compiled_model = pose_compiled_model

影片補邊程式

In [27]:
import cv2
import numpy as np
from pathlib import Path

def letterbox_image(image, target_size=(640, 640), color=(114, 114, 114)):
    original_h, original_w = image.shape[:2]
    target_w, target_h = target_size

    scale = min(target_w / original_w, target_h / original_h)
    new_w, new_h = int(original_w * scale), int(original_h * scale)

    resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)

    dw = target_w - new_w
    dh = target_h - new_h
    top, bottom = dh // 2, dh - dh // 2
    left, right = dw // 2, dw - dw // 2

    padded_image = cv2.copyMakeBorder(
        resized_image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color
    )

    return padded_image, (scale, scale), (left, top)

def process_video_with_letterbox(video_path, output_path=None, target_size=(640, 640), show=True):
    """
    逐幀處理影片，將每一幀等比例縮放並補邊成 YOLO 輸入格式。

    參數：
        video_path: 輸入影片路徑
        output_path: 若提供，將轉換後影片儲存到該路徑
        target_size: 輸出畫面尺寸（預設 640x640）
        show: 是否顯示畫面（預設 True）
    """
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        print(f"❌ 無法開啟影片：{video_path}")
        return

    # 影片輸出設定
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(str(output_path), fourcc, cap.get(cv2.CAP_PROP_FPS), target_size)
    else:
        out = None

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        padded_frame, ratio, pad = letterbox_image(frame, target_size)

        if show:
            cv2.imshow("YOLO Letterbox Format", padded_frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        if out:
            out.write(padded_frame)

    cap.release()
    if out:
        out.release()
    if show:
        cv2.destroyAllWindows()


In [29]:
import os
import cv2
from pathlib import Path

def batch_letterbox_videos(input_dir, output_dir, target_size=(640, 640), exts=(".mp4", ".avi", ".mov", ".mkv")):
    input_dir = Path(input_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    for video_path in input_dir.glob("*"):
        if video_path.suffix.lower() not in exts:
            continue
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            print(f"❌ 無法開啟影片：{video_path}")
            continue

        fps = cap.get(cv2.CAP_PROP_FPS)
        out_path = output_dir / video_path.name
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(str(out_path), fourcc, fps, target_size)

        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            padded_frame, _, _ = letterbox_image(frame, target_size)
            out.write(padded_frame)
            frame_count += 1

        cap.release()
        out.release()
        print(f"✅ 已補邊並儲存：{out_path}（共 {frame_count} 幀）")

# 使用範例
batch_letterbox_videos(
    input_dir="medias/train_video/original/normal",    # 輸入影片資料夾路徑
    output_dir="YOLO/medias/padded_train_video/normal",  # 輸出影片資料夾路徑
    target_size=(640, 640)
)

✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_079.mp4（共 720 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_088.mp4（共 840 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_137.mp4（共 240 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_090.mp4（共 720 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_023.mp4（共 840 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_117.mp4（共 1680 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_053.mp4（共 840 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_159.mp4（共 480 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_084.mp4（共 840 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_176.mp4（共 537 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_050.mp4（共 840 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_105.mp4（共 1680 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_170.mp4（共 573 幀）
✅ 已補邊並儲存：YOLO/medias/padded_train_video/normal/normal_107.mp4（共 1680 幀）
✅ 已

In [24]:
import os
import collections
import time
import json
import cv2
import numpy as np

# ===============================================
# ⚠️ 這支程式會把推論結果先用 collections.deque()（佇列）收集，
# ⚠️ 在寫入 JSON 時轉成 list 來存檔（因 JSON 不支援 deque）。
#
# ➡️ 如果之後要再提取回來使用佇列，可以這樣做：
#     import json, collections
#     with open("path_to_json.json", "r", encoding="utf-8") as f:
#         data = json.load(f)
#     my_queue = collections.deque(data)
# ===============================================

def run_inference_to_json(
    source=0,
    flip=False,
    skip_first_frames=0,
    target_fps=10,
    output_dir="output_jsons",
    model=quantized_pose_model,
    device="AUTO",
):
    player = None

    ov_config = {}
    if "GPU" in device or ("AUTO" in device and "GPU" in core.available_devices):
        ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}

    compiled_model = core.compile_model(model, device, ov_config)

    if pose_model.predictor is None:
        custom = {"conf": 0.25, "batch": 1, "save": False, "mode": "predict"}
        args = {**seg_model.overrides, **custom}
        pose_model.predictor = pose_model._smart_load("predictor")(overrides=args, _callbacks=pose_model.callbacks)
        pose_model.predictor.setup_model(model=pose_model.model)

    pose_model.predictor.model.ov_compiled_model = compiled_model

    # 處理 output 檔名
    if isinstance(source, str):
        video_name = os.path.splitext(os.path.basename(source))[0]
    else:
        video_name = "camera_capture"
    os.makedirs(output_dir, exist_ok=True)
    output_json_path = os.path.join(output_dir, f"{video_name}.json")

    json_results = collections.deque()
    try:
        player = VideoPlayer(source=source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
        player.start()

        # 取得影片 FPS
        capture = cv2.VideoCapture(source)
        video_fps = capture.get(cv2.CAP_PROP_FPS)
        capture.release()
        if video_fps <= 0 or np.isnan(video_fps):
            video_fps = 30
        print(f"影片 FPS = {video_fps:.2f}")

        skip_frames = max(int(round(video_fps / target_fps)), 1)
        print(f"將每 {skip_frames} 幀做一次推論以達成 {target_fps} FPS")

        frame_counter = 0
        processing_times = collections.deque()

        while True:
            frame = player.next()
            if frame is None:
                print("Source ended")
                break

            frame_counter += 1
            if frame_counter % skip_frames != 0:
                continue

            scale = 1280 / max(frame.shape)
            if scale < 1:
                frame = cv2.resize(frame, dsize=None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)

            input_image = np.array(frame)

            try:
                start_time = time.time()
                detections = pose_model(input_image)
                stop_time = time.time()

                detection = detections[0]
                result = {
                    "boxes": detection.boxes.xyxy.tolist() if hasattr(detection, "boxes") else [],
                    "keypoints": detection.keypoints.xy.tolist() if hasattr(detection, "keypoints") else []
                }
                json_results.append(result)

                processing_times.append(stop_time - start_time)
                if len(processing_times) > 200:
                    processing_times.popleft()
                processing_time = np.mean(processing_times) * 1000
                fps = 1000 / processing_time
                print(f"[推論時間: {processing_time:.1f}ms ({fps:.1f} FPS)]")

            except Exception as e:
                print(f"[推論錯誤]: {e}")

    except Exception as e:
        print(f"捕捉到錯誤: {e}")
    finally:
        if player is not None:
            player.stop()
        # 寫入時轉成 list
        with open(output_json_path, "w", encoding="utf-8") as f:
            json.dump(list(json_results), f, indent=2)
        print(f"骨架與物件框結果已輸出至: {output_json_path}")


In [25]:
import openvino as ov # 引入 OpenVINO 函式庫

core = ov.Core()

quantized_pose_model = core.read_model("yolov8n-pose_openvino_int8_model/yolov8n-pose.xml")

run_inference_to_json(
    source="test_data/IMG_7704.mp4",
    target_fps=10,
    output_dir="results",
    model=quantized_pose_model
)

影片 FPS = 59.94
將每 6 幀做一次推論以達成 10 FPS

[推論錯誤]: Exception from src/inference/src/cpp/infer_request.cpp:116:
Exception from src/inference/src/cpp/infer_request.cpp:66:
Exception from src/plugins/intel_cpu/src/infer_request.cpp:385:
Can't set the input tensor with index: 0, because the model input (shape=[1,3,640,640]) and the tensor (shape=(1.3.384.640)) are incompatible




[推論錯誤]: Exception from src/inference/src/cpp/infer_request.cpp:116:
Exception from src/inference/src/cpp/infer_request.cpp:66:
Exception from src/plugins/intel_cpu/src/infer_request.cpp:385:
Can't set the input tensor with index: 0, because the model input (shape=[1,3,640,640]) and the tensor (shape=(1.3.384.640)) are incompatible




[推論錯誤]: Exception from src/inference/src/cpp/infer_request.cpp:116:
Exception from src/inference/src/cpp/infer_request.cpp:66:
Exception from src/plugins/intel_cpu/src/infer_request.cpp:385:
Can't set the input tensor with index: 0, because the model input (shape=[1,3,640,640]) and the t

In [None]:
import os
import cv2
from pathlib import Path

def batch_letterbox_videos(input_dir, output_dir, target_size=(640, 640), exts=(".mp4", ".avi", ".mov", ".mkv")):
    input_dir = Path(input_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    for video_path in input_dir.glob("*"):
        if video_path.suffix.lower() not in exts:
            continue
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            print(f"❌ 無法開啟影片：{video_path}")
            continue

        fps = cap.get(cv2.CAP_PROP_FPS)
        out_path = output_dir / video_path.name
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(str(out_path), fourcc, fps, target_size)

        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            padded_frame, _, _ = letterbox_image(frame, target_size)
            out.write(padded_frame)
            frame_count += 1

        cap.release()
        out.release()
        print(f"✅ 已補邊並儲存：{out_path}（共 {frame_count} 幀）")

# 使用範例
batch_letterbox_videos(
    input_dir="YOLO/medias/padded_train_video/fall",    # 輸入影片資料夾路徑
    output_dir="YOLO/data/pose",  # 輸出影片資料夾路徑
    target_size=(640, 640)
)

物件框

In [20]:
import cv2
import time
import json
from pathlib import Path
from ultralytics import YOLO

def process_video_to_json(
    video_path,
    output_dir,
    target_fps=10,
    model_path="yolov8n.pt",
    confidence_threshold=0.3
):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"❌ 無法開啟影片：{video_path}")
        return

    # 載入模型
    model = YOLO(model_path)

    # 準備輸出路徑
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    video_name = Path(video_path).stem  # e.g. "your_video"
    output_path = output_dir / f"{video_name}.json"

    # 取得原始 fps 並計算處理間隔
    original_fps = cap.get(cv2.CAP_PROP_FPS)
    original_fps_rounded = round(original_fps)
    frame_interval = max(1, round(original_fps_rounded / target_fps))
    print(f"🎞️ 原始 FPS: {original_fps:.2f} → 目標 FPS: {target_fps}，每 {frame_interval} 幀取 1 幀")

    results_list = []
    frame_idx = 0
    last_time = time.time()

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_interval == 0:
            start = time.time()
            results = model(frame)[0]

            frame_result = {
                "objects": []
            }

            for box in results.boxes:
                conf = float(box.conf[0])
                if conf < confidence_threshold:
                    continue

                class_id = int(box.cls[0])
                class_name = model.names[class_id]
                bbox = list(map(int, box.xyxy[0]))

                frame_result["objects"].append({
                    "class_id": class_id,
                    "class_name": class_name,
                    "confidence": round(conf, 4),
                    "bbox": bbox
                })

            results_list.append(frame_result)

            fps = 1 / (time.time() - last_time + 1e-6)
            last_time = time.time()
            print(f"[Frame {frame_idx}] Detected {len(frame_result['objects'])} objects | FPS: {fps:.2f}")

        frame_idx += 1

    cap.release()

    # 一次寫入整個 JSON 結果
    with open(output_path, "w") as f:
        json.dump(results_list, f, indent=2)

    print(f"✅ 已完成，儲存至 {output_path}（共 {len(results_list)} 幀）")


In [22]:
process_video_to_json(
    video_path="test_data/IMG_7704.mp4",
    output_dir="test_data",
    target_fps=10,
    model_path="yolov8n.pt",
    confidence_threshold=0.3
)

🎞️ 原始 FPS: 59.94 → 目標 FPS: 10，每 6 幀取 1 幀

0: 384x640 1 person, 1 tv, 80.5ms
Speed: 10.3ms preprocess, 80.5ms inference, 19.9ms postprocess per image at shape (1, 3, 384, 640)
[Frame 0] Detected 2 objects | FPS: 1.14

0: 384x640 1 person, 1 tv, 26.2ms
Speed: 4.0ms preprocess, 26.2ms inference, 6.0ms postprocess per image at shape (1, 3, 384, 640)
[Frame 6] Detected 2 objects | FPS: 12.75

0: 384x640 1 person, 1 tv, 28.3ms
Speed: 3.4ms preprocess, 28.3ms inference, 6.4ms postprocess per image at shape (1, 3, 384, 640)
[Frame 12] Detected 2 objects | FPS: 13.39

0: 384x640 1 person, 1 chair, 1 tv, 33.3ms
Speed: 4.2ms preprocess, 33.3ms inference, 5.4ms postprocess per image at shape (1, 3, 384, 640)
[Frame 18] Detected 2 objects | FPS: 12.82

0: 384x640 1 person, 1 tv, 30.7ms
Speed: 3.7ms preprocess, 30.7ms inference, 5.2ms postprocess per image at shape (1, 3, 384, 640)
[Frame 24] Detected 1 objects | FPS: 13.72

0: 384x640 1 person, 1 chair, 23.2ms
Speed: 3.5ms preprocess, 23.2ms infere