# STT 하이라이트 세트 분리

In [None]:

import json
import copy
import statistics
from pathlib import Path

# 기본 함수

def split_caster_head_tail(text: str):
    """캐스터 멘트를 첫 번째 마침표(.) 기준으로 (앞부분, 뒷부분)으로 나눈다.
    마침표가 없으면 (text, "")를 반환한다.
    """
    if not text:
        return "", ""
    idx = text.find(".")
    if idx == -1:
        return text.strip(), ""
    head = text[: idx + 1].strip()   # 마침표 포함
    tail = text[idx + 1 :].strip()
    return head, tail


def baseline_groups_endgap(segments, caster_gap=2.0, analyst_gap=3.0):
    """단계 1: end_sec 기준 간격과 role 전이를 이용해서 1차 세트로 묶는다."""
    segs = sorted(segments, key=lambda s: s["start_sec"])

    # 첫 caster 나오기 전까지는 버린다.
    idx0 = 0
    while idx0 < len(segs) and segs[idx0].get("role") != "caster":
        idx0 += 1
    segs = segs[idx0:]

    groups = []
    current = []
    prev = None

    for seg in segs:
        if not current:
            current = [seg]
            prev = seg
            continue

        new_group = False
        pr, nr = prev["role"], seg["role"]

        # caster → caster
        if pr == "caster" and nr == "caster":
            if seg["start_sec"] - prev["end_sec"] >= caster_gap:
                new_group = True
        # analyst → analyst
        elif pr == "analyst" and nr == "analyst":
            if seg["start_sec"] - prev["end_sec"] >= analyst_gap:
                new_group = True
        # analyst → caster 는 무조건 새 세트
        elif pr == "analyst" and nr == "caster":
            new_group = True

        # caster → analyst 등은 같은 세트 유지
        if new_group:
            groups.append(current)
            current = [seg]
        else:
            current.append(seg)

        prev = seg

    if current:
        groups.append(current)

    return groups


def attach_analyst_only_groups(groups, analyst_attach_gap=20.0):
    """caster가 없는 analyst-only 세트를 앞 세트에 붙인다 (가까울 때만)."""
    result = []

    for g in groups:
        has_caster = any(seg["role"] == "caster" for seg in g)

        if has_caster:
            result.append(g)
        else:
            # analyst-only 세트
            if not result:
                # 앞에 붙일 세트가 없으면 그냥 버림
                continue

            prev = result[-1]
            gap = g[0]["start_sec"] - prev[-1]["end_sec"]

            if gap <= analyst_attach_gap:
                prev.extend(g)
            else:
                # 너무 멀리 떨어져 있으면 버린다.
                continue

    return result


REACTION_KEYWORDS = []


def is_reaction_caster_group(group, prev_group, reaction_gap=12.0, reaction_len=20):
    """한 개의 짧은 caster 세트가 바로 앞 세트에 붙어야 할 리액션 멘트인지 판단."""
    if len(group) != 1 or group[0]["role"] != "caster":
        return False
    if not prev_group:
        return False

    seg = group[0]
    gap = seg["start_sec"] - prev_group[-1]["end_sec"]
    if gap < 0:
        gap = 0.0

    text = seg["text"]

    # 길이 기준 (너무 길면 리액션으로 보지 않음)
    if len(text) > reaction_len:
        # 키워드도 없으면 바로 False
        if not any(k in text for k in REACTION_KEYWORDS):
            return False

    # 시간 간격이 충분히 짧으면 리액션
    if gap <= reaction_gap:
        return True

    # 혹은 전형적인 리액션 키워드가 포함된 경우 허용
    if any(k in text for k in REACTION_KEYWORDS):
        return True

    return False


def merge_reaction_groups(groups, reaction_gap=12.0, reaction_len=20):
    """짧은 리액션 caster 세트를 앞 세트에 흡수한다."""
    merged = []

    for g in groups:
        if merged and is_reaction_caster_group(g, merged[-1], reaction_gap, reaction_len):
            merged[-1].extend(g)
        else:
            merged.append(g)

    return merged


def finalize_group(group, split_first_caster_when_multi=True):
    """한 세트를 (caster + optional analyst 1개) 구조로 정리한다.

    - 항상 첫 번째 caster만 남긴다.
    - 나머지 caster + analyst 텍스트는 모두 analyst 하나로 합친다.
    - speaker_id, segment_id 등은 제거한다.
    """
    group = sorted(group, key=lambda s: s["start_sec"])
    casters = [seg for seg in group if seg["role"] == "caster"]
    analysts = [seg for seg in group if seg["role"] == "analyst"]

    if not casters:
        # 이전 단계에서 이미 필터링되지만 방어적으로 한 번 더 체크
        return []

    main_caster = copy.deepcopy(casters[0])
    other_casters = [copy.deepcopy(s) for s in casters[1:]]

    # 여러 개의 caster가 있는 경우에만 첫 번째 caster를 문장 단위로 분리
    if split_first_caster_when_multi and len(casters) >= 2:
        head, tail = split_caster_head_tail(main_caster["text"])
    else:
        head, tail = main_caster["text"], ""

    main_caster["text"] = head.strip()

    # analyst 파트 구성
    analysis_pieces = []
    analysis_start_candidates = []
    analysis_segments_for_conf = []

    # main caster의 꼬리 부분(tail)을 analyst 쪽으로
    if tail:
        analysis_pieces.append(tail)
        analysis_start_candidates.append(main_caster["end_sec"])
        analysis_segments_for_conf.append(main_caster)

    # 나머지 caster들은 전부 analyst 텍스트에 합치기
    for oc in other_casters:
        if oc["text"].strip():
            analysis_pieces.append(oc["text"].strip())
            analysis_start_candidates.append(oc["start_sec"])
            analysis_segments_for_conf.append(oc)

    # analyst 들의 텍스트를 모두 합치기
    analysts_sorted = sorted(analysts, key=lambda s: s["start_sec"])
    for a in analysts_sorted:
        if a["text"].strip():
            analysis_pieces.append(a["text"].strip())
            analysis_start_candidates.append(a["start_sec"])
            analysis_segments_for_conf.append(a)

    # 최종 세그먼트 리스트
    final_segments = []

    # caster 한 명은 반드시 남긴다 (필드 정리)
    caster_out = {
        "role": "caster",
        "start_sec": float(main_caster["start_sec"]),
        "end_sec": float(main_caster["end_sec"]),
        "text": main_caster["text"],
        "confidence": float(main_caster.get("confidence", 0.0)),
    }
    final_segments.append(caster_out)

    # analyst 텍스트가 있을 때만 analyst 세그먼트 생성
    if analysis_pieces:
        analyst_text = " ".join(p for p in analysis_pieces if p)
        analyst_start = min(analysis_start_candidates)
        analyst_end = max(seg["end_sec"] for seg in analysis_segments_for_conf)

        if analysis_segments_for_conf:
            confs = [float(seg.get("confidence", 0.0)) for seg in analysis_segments_for_conf]
            analyst_conf = float(statistics.mean(confs))
        else:
            analyst_conf = 0.0

        analyst_out = {
            "role": "analyst",
            "start_sec": float(analyst_start),
            "end_sec": float(analyst_end),
            "text": analyst_text,
            "confidence": analyst_conf,
        }
        final_segments.append(analyst_out)

    final_segments.sort(key=lambda s: s["start_sec"])
    return final_segments


def split_stt_json_to_event_sets(
    stt_json,
    id_prefix=None,
    caster_gap=2.0,
    analyst_gap=3.0,
    analyst_attach_gap=20.0,
    reaction_gap=12.0,
    reaction_len=20,
):
    """STT json 하나를 하이라이트 이벤트 세트 리스트로 변환한다.

    Args:
        stt_json (dict): STT 결과 (video_id, segments 포함)
        id_prefix (str, optional): 세트의 video_id prefix. 기본값은 stt_json["video_id"].
        caster_gap (float): caster-caster 사이 end_sec 기준 간격 임계값(초).
        analyst_gap (float): analyst-analyst 사이 간격 임계값(초).
        analyst_attach_gap (float): analyst-only 세트를 앞 세트에 붙일 수 있는 최대 간격(초).
        reaction_gap (float): 리액션 세트를 앞 세트에 붙이는 시간 간격 기준(초).
        reaction_len (int): 리액션으로 취급할 수 있는 최대 글자 수.

    Returns:
        list[dict]: 각 이벤트 세트의 리스트. 각 세트는
            {"video_id": ..., "segments": [...]} 형식.
    """
    segments = stt_json.get("segments", [])
    if not segments:
        return []

    if id_prefix is None:
        id_prefix = stt_json.get("video_id", "clip")

    # 1단계: role/시간 기반 1차 세트 생성
    groups = baseline_groups_endgap(segments, caster_gap, analyst_gap)

    # 2단계: analyst-only 세트를 앞 세트에 붙이기
    groups = attach_analyst_only_groups(groups, analyst_attach_gap)

    # 3단계: 짧은 리액션 caster 세트는 앞 세트에 흡수
    groups = merge_reaction_groups(groups, reaction_gap, reaction_len)

    # 4단계: 각 세트를 (caster + optional analyst) 구조로 정리
    event_sets = []
    for idx, g in enumerate(groups, start=1):
        final_segments = finalize_group(g)

        # 혹시 모를 edge case 보호 (caster가 없다면 버림)
        if not any(seg["role"] == "caster" for seg in final_segments):
            continue

        event_sets.append(
            {
                "video_id": f"{id_prefix}-{idx}",
                "segments": final_segments,
            }
        )

    return event_sets


In [None]:
from pathlib import Path
import json

input_path = Path("wbsc.timeline.json")
output_path = Path("wbsc_event2.json")

with input_path.open("r", encoding="utf-8") as f:
    stt_json = json.load(f)

event_sets = split_stt_json_to_event_sets(stt_json, id_prefix="clip1")

print(f"이벤트 세트 개수: {len(event_sets)}")

with output_path.open("w", encoding="utf-8") as f:
    json.dump(event_sets, f, ensure_ascii=False, indent=2)

print(f"저장 완료: {output_path.resolve()}")


이벤트 세트 개수: 29
저장 완료: C:\Users\Playdata\Desktop\모델 stt 연결\wbsc_event2.json
