In [None]:
import os
import glob
import cv2
from tqdm import tqdm

video_dir = "4DGS"
mp4_files = sorted(glob.glob(os.path.join(video_dir, "*.mp4")))

In [None]:
# export every frame of every .mp4 in "4DGS" into a dedicated directory per video
for vid_path in mp4_files:
    name = os.path.splitext(os.path.basename(vid_path))[0]
    frames_dir = os.path.join(video_dir, f"{name}_frames")
    os.makedirs(frames_dir, exist_ok=True)

    vid_cap = cv2.VideoCapture(vid_path)
    if not vid_cap.isOpened():
        print(f"Cannot open video: {vid_path}")
        continue

    total = int(vid_cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    pad = len(str(total)) if total > 0 else 6
    pbar = tqdm(total=total, desc=name)

    frame_i = 0
    while True:
        ret, frm = vid_cap.read()
        if not ret:
            break
        frame_i += 1
        out_path = os.path.join(frames_dir, f"{frame_i:0{pad}d}.jpg")
        # skip if already exists (useful to resume)
        if not os.path.exists(out_path):
            try:
                cv2.imwrite(out_path, frm, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
            except Exception as e:
                print(f"Failed to write {out_path}: {e}")
        pbar.update(1)

    pbar.close()
    vid_cap.release()
    print(f"Saved frames for {name} -> {frames_dir}")

001001: 100%|██████████| 60/60 [00:00<00:00, 61.92it/s]
001001: 100%|██████████| 60/60 [00:00<00:00, 61.92it/s]


Saved frames for 001001 -> 4DGS/001001_frames


002001: 100%|██████████| 60/60 [00:00<00:00, 79.87it/s]



Saved frames for 002001 -> 4DGS/002001_frames


003001: 100%|██████████| 60/60 [00:00<00:00, 74.44it/s]



Saved frames for 003001 -> 4DGS/003001_frames


004001: 100%|██████████| 60/60 [00:00<00:00, 73.56it/s]



Saved frames for 004001 -> 4DGS/004001_frames


005001: 100%|██████████| 60/60 [00:00<00:00, 76.91it/s]
005001: 100%|██████████| 60/60 [00:00<00:00, 76.91it/s]


Saved frames for 005001 -> 4DGS/005001_frames


006001: 100%|██████████| 60/60 [00:00<00:00, 77.56it/s]



Saved frames for 006001 -> 4DGS/006001_frames


008001: 100%|██████████| 60/60 [00:00<00:00, 77.89it/s]



Saved frames for 008001 -> 4DGS/008001_frames


009001: 100%|██████████| 60/60 [00:00<00:00, 76.29it/s]



Saved frames for 009001 -> 4DGS/009001_frames


010001: 100%|██████████| 60/60 [00:00<00:00, 78.14it/s]



Saved frames for 010001 -> 4DGS/010001_frames


012001: 100%|██████████| 60/60 [00:00<00:00, 70.47it/s]



Saved frames for 012001 -> 4DGS/012001_frames


101001: 100%|██████████| 60/60 [00:00<00:00, 81.07it/s]



Saved frames for 101001 -> 4DGS/101001_frames


102001: 100%|██████████| 60/60 [00:00<00:00, 79.04it/s]



Saved frames for 102001 -> 4DGS/102001_frames


103001: 100%|██████████| 60/60 [00:00<00:00, 81.14it/s]



Saved frames for 103001 -> 4DGS/103001_frames


104001: 100%|██████████| 60/60 [00:00<00:00, 70.36it/s]



Saved frames for 104001 -> 4DGS/104001_frames


105001: 100%|██████████| 60/60 [00:00<00:00, 79.48it/s]



Saved frames for 105001 -> 4DGS/105001_frames


106001: 100%|██████████| 60/60 [00:00<00:00, 81.51it/s]



Saved frames for 106001 -> 4DGS/106001_frames


107001: 100%|██████████| 60/60 [00:00<00:00, 80.74it/s]



Saved frames for 107001 -> 4DGS/107001_frames


108001: 100%|██████████| 60/60 [00:00<00:00, 90.51it/s] 



Saved frames for 108001 -> 4DGS/108001_frames


109001: 100%|██████████| 60/60 [00:00<00:00, 75.76it/s]



Saved frames for 109001 -> 4DGS/109001_frames


110001: 100%|██████████| 60/60 [00:00<00:00, 78.03it/s]



Saved frames for 110001 -> 4DGS/110001_frames


111001: 100%|██████████| 60/60 [00:00<00:00, 72.07it/s]



Saved frames for 111001 -> 4DGS/111001_frames


112001: 100%|██████████| 60/60 [00:00<00:00, 71.55it/s]

Saved frames for 112001 -> 4DGS/112001_frames





In [5]:
from ultralytics.data.annotator import auto_annotate

videos = sorted(glob.glob(os.path.join(video_dir, "*_frames")))
for video_frames_dir in videos:
    auto_annotate(data=video_frames_dir, det_model="yolo11x.pt", sam_model="sam2_b.pt", 
                  output_dir=f"annotations/{os.path.basename(video_frames_dir)}", max_det=5)


image 1/60 /tmp2/b12902101/annotation/4DGS/001001_frames/01.jpg: 384x640 3 persons, 1 bench, 71.5ms
image 1/60 /tmp2/b12902101/annotation/4DGS/001001_frames/01.jpg: 384x640 3 persons, 1 bench, 71.5ms
image 2/60 /tmp2/b12902101/annotation/4DGS/001001_frames/02.jpg: 384x640 3 persons, 1 bench, 1 chair, 14.5ms
image 2/60 /tmp2/b12902101/annotation/4DGS/001001_frames/02.jpg: 384x640 3 persons, 1 bench, 1 chair, 14.5ms
image 3/60 /tmp2/b12902101/annotation/4DGS/001001_frames/03.jpg: 384x640 3 persons, 1 bench, 13.7ms
image 3/60 /tmp2/b12902101/annotation/4DGS/001001_frames/03.jpg: 384x640 3 persons, 1 bench, 13.7ms
image 4/60 /tmp2/b12902101/annotation/4DGS/001001_frames/04.jpg: 384x640 3 persons, 1 bench, 13.7ms
image 4/60 /tmp2/b12902101/annotation/4DGS/001001_frames/04.jpg: 384x640 3 persons, 1 bench, 13.7ms
image 5/60 /tmp2/b12902101/annotation/4DGS/001001_frames/05.jpg: 384x640 3 persons, 1 bench, 13.6ms
image 5/60 /tmp2/b12902101/annotation/4DGS/001001_frames/05.jpg: 384x640 3 person

In [7]:
import os
import glob
import json
import cv2
import numpy as np
from tqdm import tqdm

def find_ann_for_stem(stem, ann_dir):
    # 支援 .txt (annotations) 與 .json，回傳第一個匹配檔案路徑或 None
    for ext in ('.json', '.txt'):
        p = os.path.join(ann_dir, stem + ext)
        if os.path.exists(p):
            return p
    lst = glob.glob(os.path.join(ann_dir, stem + '.*'))
    return lst[0] if lst else None

def is_number(s):
    try:
        float(s)
        return True
    except:
        return False

def parse_txt(path, img_w, img_h):
    items = []
    with open(path, 'r') as f:
        for ln in f.readlines():
            parts = ln.strip().split()
            if not parts:
                continue
            # 若第一個 token 不是數字， treat as label
            if not is_number(parts[0]):
                label = parts[0]
                tokens = parts[1:]
            else:
                label = int(float(parts[0])) if len(parts)>0 else 0
                tokens = parts[1:]
            nums = [float(t) for t in tokens if is_number(t)]
            if len(nums) == 4 and len(tokens)==4:
                # 可能為 absolute bbox x y w h
                x, y, w_box, h_box = nums
                items.append((label, (int(x), int(y), int(x + w_box), int(y + h_box))))
            elif len(nums) == 5 and len(tokens)==5:
                # YOLO center-format (class handled above) -> xc yc w h (normalized)
                # tokens may be xc yc w h (class already consumed) or class xc yc w h if first was numeric
                xc, yc, w_box, h_box = nums[1:] if len(parts)>5 else nums[0:4]
                x1 = int((xc - w_box/2) * img_w)
                y1 = int((yc - h_box/2) * img_h)
                x2 = int((xc + w_box/2) * img_w)
                y2 = int((yc + h_box/2) * img_h)
                items.append((label, (x1, y1, x2, y2)))
            elif len(nums) >= 6 and len(nums) % 2 == 0:
                # polygon: decide normalized or absolute
                vals = nums
                maxv = max(vals)
                pts = []
                if maxv <= 1.0001:
                    # normalized coords
                    for i in range(0, len(vals), 2):
                        x = int(vals[i] * img_w)
                        y = int(vals[i+1] * img_h)
                        pts.append((x, y))
                else:
                    for i in range(0, len(vals), 2):
                        pts.append((int(vals[i]), int(vals[i+1])))
                items.append((label, pts))
            else:
                # 無法解析的行，跳過
                continue
    return items

def parse_json(path):
    items = []
    with open(path, 'r') as f:
        try:
            data = json.load(f)
        except Exception:
            return items
    ann_list = None
    if isinstance(data, dict):
        ann_list = data.get('annotations') or data.get('shapes') or data.get('objects') or data.get('labels')
        if ann_list is None and 'images' in data and 'annotations' in data:
            ann_list = data['annotations']
    elif isinstance(data, list):
        ann_list = data
    if not ann_list:
        return items
    for ann in ann_list:
        if not isinstance(ann, dict):
            continue
        if 'bbox' in ann and isinstance(ann['bbox'], (list, tuple)) and len(ann['bbox']) >= 4:
            x, y, w_box, h_box = ann['bbox'][:4]
            items.append((int(ann.get('category_id', ann.get('class', 0))), (int(x), int(y), int(x + w_box), int(y + h_box))))
        elif 'points' in ann and isinstance(ann['points'], list):
            pts = [(int(p[0]), int(p[1])) for p in ann['points']]
            items.append((int(ann.get('category_id', ann.get('class', 0))), pts))
        elif 'segmentation' in ann and isinstance(ann['segmentation'], list) and len(ann['segmentation'])>0:
            flat = ann['segmentation'][0]
            coords = [(int(flat[i]), int(flat[i+1])) for i in range(0, len(flat), 2)]
            items.append((int(ann.get('category_id', ann.get('class', 0))), coords))
    return items

In [None]:
# 對每個 frames 目錄產生對應輸出影片，輸出放在 output_videos/
os.makedirs('output_videos', exist_ok=True)

frames_dirs = sorted(glob.glob(os.path.join('4DGS', '*_frames')))
if not frames_dirs:
    print('No frames directories found under 4DGS/*_frames')

for frames_dir in frames_dirs:
    base = os.path.basename(frames_dir)
    ann_dir = os.path.join('annotations', base)
    out_video = os.path.join('output_videos', f'annotated_{base}.mp4')

    frame_paths = sorted(glob.glob(os.path.join(frames_dir, '*')))
    if not frame_paths:
        print(f'No frames in {frames_dir}, skip')
        continue

    # 嘗試從對應原始影片取得 fps（原始影片預期為 4DGS/{name}.mp4）
    name = base.replace('_frames','')
    orig_video = os.path.join('4DGS', f'{name}.mp4')
    fps = 30.0
    if os.path.exists(orig_video):
        cap = cv2.VideoCapture(orig_video)
        if cap.isOpened():
            fps = cap.get(cv2.CAP_PROP_FPS) or fps
        cap.release()

    first = cv2.imread(frame_paths[0])
    h, w = first.shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(out_video, fourcc, float(fps), (w, h))

    pbar = tqdm(total=len(frame_paths), desc=f'Annotating {base}')
    for fp in frame_paths:
        img = cv2.imread(fp)
        stem = os.path.splitext(os.path.basename(fp))[0]
        annp = find_ann_for_stem(stem, ann_dir)
        if annp:
            ext = os.path.splitext(annp)[1].lower()
            try:
                if ext == '.txt':
                    items = parse_txt(annp, w, h)
                elif ext == '.json':
                    items = parse_json(annp)
                else:
                    items = []
                # 繪製標註（bbox 與 polygon），polygon 先以 #00FFFF 填充（BGR=(255,255,0)）並 alpha 混合
                for cls, shape in items:
                    if isinstance(shape, tuple) and len(shape) == 4:
                        x1, y1, x2, y2 = shape
                        cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        cv2.putText(img, str(cls), (max(0, x1), max(0, y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA)
                    elif isinstance(shape, list) and len(shape) >= 2:
                        pts_arr = np.array(shape, dtype=np.int32).reshape((-1,1,2))
                        if len(shape) >= 3:
                            overlay = img.copy()
                            cv2.fillPoly(overlay, [pts_arr], color=(255,255,0))
                            alpha = 0.4
                            cv2.addWeighted(overlay, alpha, img, 1 - alpha, 0, img)
                        cv2.polylines(img, [pts_arr], isClosed=(len(shape) >= 3), color=(0,0,255), thickness=2)
                        x0, y0 = int(shape[0][0]), int(shape[0][1])
                        cv2.putText(img, str(cls), (x0, max(0, y0-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 1, cv2.LINE_AA)
            except Exception as e:
                print(f"Failed to parse/draw {annp}: {e}")
        out.write(img)
        pbar.update(1)
    pbar.close()
    out.release()
    print(f'Wrote annotated video: {out_video}')