In [1]:
# load npz 
import numpy as np
import json

root = "/workspace/data/annotation"
full_path = root + "/full.json"
mini_path = root + "/mini.json"

full_data = json.load(open(full_path, 'r'))
mini_data = json.load(open(mini_path, 'r'))

In [4]:
import json
from collections import defaultdict

def organize_labels(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    summary = defaultdict(lambda: {"count": 0, "total_duration": 0})
    organized_results = []

    for entry in data:
        video_info = {
            "video_path": entry.get("video"),
            "annotation_id": entry.get("annotation_id"),
            "labels": []
        }
        
        # 遍历视频中的所有标注
        labels_list = entry.get("videoLabels", [])
        for label_item in labels_list:
            # 获取标签名称（通常列表里只有一个元素）
            label_names = label_item.get("timelinelabels", [])
            ranges = label_item.get("ranges", [])
            
            for r in ranges:
                start = r.get("start")
                end = r.get("end")
                duration = end - start
                
                for name in label_names:
                    # 添加到该视频的整理列表中
                    video_info["labels"].append({
                        "label": name,
                        "start": start,
                        "end": end,
                        "duration": duration
                    })
                    
                    # 更新全局统计信息
                    summary[name]["count"] += 1
                    summary[name]["total_duration"] += duration
        
        # 按开始时间排序
        video_info["labels"].sort(key=lambda x: x['start'])
        organized_results.append(video_info)

    return organized_results, summary

# --- 执行整理 ---
file_name = '/workspace/data/annotation/mini.json'  # 请确保文件名正确
results, stats = organize_labels(file_name)

# --- 打印整理后的结果 ---
print("=== 标签统计摘要 ===")
print(f"{'标签':<15} | {'出现次数':<10} | {'总持续时间'}")
print("-" * 45)
for label, info in stats.items():
    print(f"{label:<15} | {info['count']:<10} | {info['total_duration']}")

print("\n=== 视频详细标注明细 (前5条) ===")
for vid in results[:1]:  # 示例打印第一个视频
    print(f"\n视频文件: {vid['video_path']}")
    for l in vid['labels'][:10]:  # 示例打印前10个标签
        print(f"  [{l['start']:>5} - {l['end']:>5}] 动作: {l['label']:<10} (时长: {l['duration']})")
    print("  ...")

=== 标签统计摘要 ===
标签              | 出现次数       | 总持续时间
---------------------------------------------
left            | 3219       | 151533
right           | 2439       | 168421
down            | 4454       | 150484
up              | 737        | 31386
right_up        | 64         | 3365
right_down      | 90         | 3528
left_down       | 423        | 15604
left_up         | 226        | 13561

=== 视频详细标注明细 (前5条) ===

视频文件: /data/local-files/?d=mydata/drive_data/person_01_day_high_h265.mp4
  [  362 -   384] 动作: left       (时长: 22)
  [  384 -   456] 动作: right      (时长: 72)
  [  488 -   520] 动作: down       (时长: 32)
  [  579 -   622] 动作: down       (时长: 43)
  [  639 -   663] 动作: right      (时长: 24)
  [  665 -   694] 动作: down       (时长: 29)
  [  746 -   782] 动作: down       (时长: 36)
  [  795 -   846] 动作: down       (时长: 51)
  [  870 -   919] 动作: down       (时长: 49)
  [  953 -   993] 动作: down       (时长: 40)
  ...


In [8]:
# --- 按视频合并不同标注者的结果 ---
import json
import os

def merge_labels_by_video(input_json, output_dir="merged_videos"):
    # 1. 创建输出目录
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 2. 读取原始数据
    with open(input_json, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # 3. 使用字典按视频名归组
    # 数据结构: { "视频名": [标注者A的数据, 标注者B的数据, ...] }
    video_groups = defaultdict(list)

    for entry in data:
        # 提取视频文件名
        raw_path = entry.get("video", "unknown")
        video_filename = raw_path.split('/')[-1] if '/' in raw_path else os.path.basename(raw_path)
        video_groups[video_filename].append(entry)

    # 4. 写入文件
    for video_name, annotations in video_groups.items():
        # 构建保存文件名（例如：person_01_day_high_h265.json）
        base_name = os.path.splitext(video_name)[0]
        output_path = os.path.join(output_dir, f"{base_name}.json")
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        # 构造最终的 JSON 结构
        # 包含视频名和对应的所有标注记录
        final_data = {
            "video_file": video_name,
            "total_annotators": len(annotations),
            "annotations": annotations  # 这里包含了不同 annotation_id 的完整内容
        }

        with open(output_path, 'w', encoding='utf-8') as out_f:
            json.dump(final_data, out_f, indent=4, ensure_ascii=False)
        
        print(f"成功合并: {video_name} (共有 {len(annotations)} 个人的标注结果)")
        print(f"保存路径: {output_path}")

# 执行
merge_labels_by_video('/workspace/data/annotation/mini.json', output_dir="/workspace/data/label")

成功合并: person_01_day_high_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_01_day_high_h265.json
成功合并: person_01_day_low_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_01_day_low_h265.json
成功合并: person_01_night_high_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_01_night_high_h265.json
成功合并: person_01_night_low_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_01_night_low_h265.json
成功合并: person_02_day_high_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_02_day_high_h265.json
成功合并: person_02_day_low_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_02_day_low_h265.json
成功合并: person_02_night_high_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_02_night_high_h265.json
成功合并: person_02_night_low_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_02_night_low_h265.json
成功合并: person_03_day_high_h265.mp4 (共有 3 个人的标注结果)
保存路径: /workspace/data/label/person_03_day_high_h265.json
成功合并: person_03_day_low_h265.mp4 (共有 3

In [None]:
# 读取一个人的label文件并且整理成dict
# dict的格式是，key是label名称，value是一个list，list里面是该label对应的所有时间段

import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Any

def load_label_dict(
    json_path: str | Path,
    *,
    annotator: Optional[int] = None,
    annotation_id: Optional[int] = None,
    merge_all: bool = False,
) -> Dict[str, List[dict]]:
    """
    读取 label json，整理成:
        {label_name: [{"start": int/float, "end": int/float}, ...]}

    参数：
      - annotator: 指定只读取某个 annotator 的标注（例如 3/4/5）
      - annotation_id: 指定只读取某个 annotation_id
      - merge_all: True 表示合并所有 annotations（此时 annotator/annotation_id 会被忽略）
    """
    json_path = Path(json_path)
    with json_path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    anns: List[dict] = data.get("annotations", [])
    if not isinstance(anns, list):
        raise ValueError("Invalid json: 'annotations' must be a list")

    # 选择要用的 annotation(s)
    if merge_all:
        chosen = anns
    else:
        chosen = []
        for a in anns:
            if annotation_id is not None and a.get("annotation_id") != annotation_id:
                continue
            if annotator is not None and a.get("annotator") != annotator:
                continue
            chosen.append(a)

        # 没指定任何筛选时：默认取“最新更新”的那一份（通常比较合理）
        if annotator is None and annotation_id is None:
            def _ts(x: dict) -> str:
                return (x.get("updated_at") or x.get("created_at") or "")
            if len(anns) == 0:
                return {}
            chosen = [max(anns, key=_ts)]

    out: Dict[str, List[dict]] = defaultdict(list)

    for ann in chosen:
        video_labels = ann.get("videoLabels", [])
        if not isinstance(video_labels, list):
            continue

        for item in video_labels:
            labels = item.get("timelinelabels", [])
            ranges = item.get("ranges", [])
            if not labels or not ranges:
                continue

            # 一条 videoLabels 通常只有一个 label；但这里写成支持多个
            for lb in labels:
                for r in ranges:
                    if r is None:
                        continue
                    s = r.get("start")
                    e = r.get("end")
                    if s is None or e is None:
                        continue
                    out[str(lb)].append({"start": s, "end": e})

    # 可选：按 start 排序，方便后处理
    for lb in out:
        out[lb].sort(key=lambda x: (x["start"], x["end"]))

    return dict(out)


# -------------------- 用法示例 --------------------
if __name__ == "__main__":
    path = "/workspace/data/label/person_01_day_high_h265.json"

    # 1) 默认：取最新更新的一份 annotation
    d_latest = load_label_dict(path)
    print("latest keys:", d_latest.keys())

    # 2) 取某个 annotator 的
    d_ann3 = load_label_dict(path, annotator=3)

    # 3) 合并所有 annotator
    d_all = load_label_dict(path, merge_all=True)

    # 看一下某个 label 的区间
    print("down ranges (merged):", d_all.get("down", [])[:5])




latest keys: dict_keys(['left', 'right', 'down', 'up', 'right_up', 'right_down', 'left_down'])
down ranges (merged): [{'start': 488, 'end': 520}, {'start': 493, 'end': 515}, {'start': 494, 'end': 512}, {'start': 579, 'end': 622}, {'start': 590, 'end': 616}]


In [11]:
"""
Label utilities

- Load label JSON into dict: {label: [{start, end}, ...]}
- Automatically fill unlabeled gaps as `front`

Assumes JSON structure with:
  data['annotations'][*]['videoLabels'][*]['ranges'] and ['timelinelabels']
"""

from __future__ import annotations

import json
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Tuple, Optional


# ---------------------------------------------------------------------
# Loading labels
# ---------------------------------------------------------------------

def load_label_dict(
    json_path: str | Path,
    *,
    annotator: Optional[int] = None,
    annotation_id: Optional[int] = None,
    merge_all: bool = False,
) -> Dict[str, List[dict]]:
    """
    Read label json and return:
        {label_name: [{"start": s, "end": e}, ...]}

    Args:
        annotator: only read this annotator (e.g. 3/4/5)
        annotation_id: only read this annotation_id
        merge_all: merge all annotations (ignore annotator/annotation_id)
    """
    json_path = Path(json_path)
    with json_path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    anns = data.get("annotations", [])
    if not isinstance(anns, list):
        raise ValueError("Invalid json: 'annotations' must be a list")

    # choose annotations
    if merge_all:
        chosen = anns
    else:
        chosen = []
        for a in anns:
            if annotation_id is not None and a.get("annotation_id") != annotation_id:
                continue
            if annotator is not None and a.get("annotator") != annotator:
                continue
            chosen.append(a)

        # default: latest updated
        if annotator is None and annotation_id is None:
            if not anns:
                return {}

            def _ts(x: dict) -> str:
                return (x.get("updated_at") or x.get("created_at") or "")

            chosen = [max(anns, key=_ts)]

    out: Dict[str, List[dict]] = defaultdict(list)

    for ann in chosen:
        video_labels = ann.get("videoLabels", [])
        if not isinstance(video_labels, list):
            continue

        for item in video_labels:
            labels = item.get("timelinelabels", [])
            ranges = item.get("ranges", [])
            if not labels or not ranges:
                continue

            for lb in labels:
                for r in ranges:
                    if r is None:
                        continue
                    s = r.get("start")
                    e = r.get("end")
                    if s is None or e is None:
                        continue
                    out[str(lb)].append({"start": s, "end": e})

    # sort
    for lb in out:
        out[lb].sort(key=lambda x: (x["start"], x["end"]))

    return dict(out)


# ---------------------------------------------------------------------
# Interval helpers
# ---------------------------------------------------------------------

def _merge_intervals(
    intervals: List[Tuple[float, float]], eps: float = 1e-9
) -> List[Tuple[float, float]]:
    """Merge overlapping or adjacent intervals."""
    intervals = [
        (float(s), float(e)) for s, e in intervals if e is not None and s is not None and e > s
    ]
    if not intervals:
        return []
    intervals.sort(key=lambda x: (x[0], x[1]))
    merged = [intervals[0]]
    for s, e in intervals[1:]:
        ps, pe = merged[-1]
        if s <= pe + eps:
            merged[-1] = (ps, max(pe, e))
        else:
            merged.append((s, e))
    return merged


def _clip_intervals(
    intervals: List[Tuple[float, float]], start: float, end: float
) -> List[Tuple[float, float]]:
    out = []
    for s, e in intervals:
        s2, e2 = max(start, s), min(end, e)
        if e2 > s2:
            out.append((s2, e2))
    return out


# ---------------------------------------------------------------------
# Fill unlabeled as front
# ---------------------------------------------------------------------

def fill_unlabeled_as_front(
    label_dict: Dict[str, List[dict]],
    *,
    total_end: Optional[float] = None,
    total_start: float = 0.0,
    front_label: str = "front",
    eps: float = 1e-9,
) -> Dict[str, List[dict]]:
    """
    Fill unlabeled gaps on [total_start, total_end] as `front`.

    Args:
        label_dict: output of load_label_dict
        total_end: video duration / last frame index (required to fill tail)
        total_start: usually 0
        front_label: label name for gaps
    """
    # collect all labeled intervals
    all_intervals: List[Tuple[float, float]] = []
    for segs in label_dict.values():
        for r in segs:
            s, e = r.get("start"), r.get("end")
            if s is None or e is None:
                continue
            all_intervals.append((float(s), float(e)))

    if total_end is None:
        total_end = max([e for _, e in all_intervals], default=total_start)

    covered = _merge_intervals(
        _clip_intervals(all_intervals, total_start, float(total_end)), eps=eps
    )

    # gaps
    gaps: List[Tuple[float, float]] = []
    cur = float(total_start)
    for s, e in covered:
        if s > cur + eps:
            gaps.append((cur, s))
        cur = max(cur, e)
    if float(total_end) > cur + eps:
        gaps.append((cur, float(total_end)))

    # write back
    out = {k: [dict(x) for x in v] for k, v in label_dict.items()}
    front_list = out.get(front_label, [])
    front_list.extend([{"start": s, "end": e} for s, e in gaps])

    merged_front = _merge_intervals(
        [(r["start"], r["end"]) for r in front_list], eps=eps
    )
    out[front_label] = [{"start": s, "end": e} for s, e in merged_front]

    return out


# ---------------------------------------------------------------------
# Example
# ---------------------------------------------------------------------
if __name__ == "__main__":
    path = "/workspace/data/label/person_01_day_high_h265.json"
    labels = load_label_dict(path, merge_all=True)
    labels = fill_unlabeled_as_front(labels, total_end=3000)
    print(labels.keys())


dict_keys(['left', 'right', 'down', 'up', 'right_up', 'right_down', 'left_down', 'left_up', 'front'])
