In [2]:
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
import json

# =========================
# 1) 데이터 로드
# =========================
df = pd.read_csv("/home/dickson/문서/agentApp/backend/app/data/all_labs_merged.csv")

# =========================
# 2) Action vs Observation 매칭 정의
# =========================
pair_cols = [
    ("action/target_cartesian_position_col0", "observation/robot_state/cartesian_position_col0"),
    ("action/target_cartesian_position_col1", "observation/robot_state/cartesian_position_col1"),
    ("action/target_cartesian_position_col2", "observation/robot_state/cartesian_position_col2"),
    ("action/joint_velocity_col0", "observation/robot_state/joint_velocities_col0"),
    ("action/joint_velocity_col1", "observation/robot_state/joint_velocities_col1"),
    ("action/joint_velocity_col2", "observation/robot_state/joint_velocities_col2"),
]

# 오차 컬럼 생성
for act, obs in pair_cols:
    df[f"err::{act.split('/')[-1]}"] = df[act] - df[obs]

# =========================
# 2-1) 추가 통계용 단일 컬럼 정의
# =========================
extra_cols = [
    "observation/robot_state/prev_command_successful",
    "action/robot_state/prev_controller_latency_ms",
    "observation/robot_state/prev_controller_latency_ms"  # 중복으로 주셨는데, 그대로 넣었습니다
]

# =========================
# 3) 세션별 요약 통계 생성
# =========================
def summarize_session(session_df):
    obs_vals = session_df[[c for _, c in pair_cols]].values.flatten()
    err_vals = session_df[[f"err::{act.split('/')[-1]}" for act, _ in pair_cols]].values.flatten()
    
    # 추가 컬럼 통계
    extras = {}
    for c in extra_cols:
        extras[f"{c}::mean"] = float(np.nanmean(session_df[c]))
        extras[f"{c}::std"] = float(np.nanstd(session_df[c]))
        extras[f"{c}::min"] = float(np.nanmin(session_df[c]))
        extras[f"{c}::max"] = float(np.nanmax(session_df[c]))
    
    return pd.Series({
        "obs_mean": float(np.nanmean(obs_vals)),
        "obs_std": float(np.nanstd(obs_vals)),
        "obs_range": float(np.nanmax(obs_vals) - np.nanmin(obs_vals)),
        "err_mean": float(np.nanmean(err_vals)),
        "err_std": float(np.nanstd(err_vals)),
        "err_max": float(np.nanmax(err_vals)),
        **extras
    })

stats = df.groupby("session_id").apply(summarize_session).reset_index()

# =========================
# 4) video_summary 붙이기
# =========================
session_summary = df.groupby("session_id")["video_summary"].first().reset_index()
merged = pd.merge(stats, session_summary, on="session_id", how="left")

# =========================
# 5) 임베딩 생성 (KoE5 + DistilUSE)
# =========================
koe5 = SentenceTransformer("nlpai-lab/KoE5")
distiluse = SentenceTransformer("sentence-transformers/distiluse-base-multilingual-cased-v1")

summaries = merged["video_summary"].fillna("").tolist()
embeddings_koe5 = koe5.encode(summaries, convert_to_numpy=True, normalize_embeddings=True)
embeddings_distiluse = distiluse.encode(summaries, convert_to_numpy=True, normalize_embeddings=True)

merged["embedding_koe5"] = embeddings_koe5.tolist()
merged["embedding_distiluse"] = embeddings_distiluse.tolist()

# =========================
# 6) Document 변환
# =========================
docs = []
for row in merged.to_dict(orient="records"):
    doc = {
        "session_id": row["session_id"],
        "video_summary": row["video_summary"],
        "observation_stats": {
            "mean": row["obs_mean"],
            "std": row["obs_std"],
            "range": row["obs_range"]
        },
        "error_stats": {
            "mean": row["err_mean"],
            "std": row["err_std"],
            "max": row["err_max"]
        },
        "extra_stats": {
            col: {  # mean, std, min, max 묶어서 넣기
                "mean": row[f"{col}::mean"],
                "std": row[f"{col}::std"],
                "min": row[f"{col}::min"],
                "max": row[f"{col}::max"]
            }
            for col in extra_cols
        },
        "embedding_koe5": row["embedding_koe5"],
        "embedding_distiluse": row["embedding_distiluse"]
    }
    docs.append(doc)

# 샘플 출력
print(json.dumps(docs[0], ensure_ascii=False, indent=2))


  from .autonotebook import tqdm as notebook_tqdm
  stats = df.groupby("session_id").apply(summarize_session).reset_index()


{
  "session_id": "Fri_Aug_18_12_06_27_2023",
  "video_summary": "박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음",
  "observation_stats": {
    "mean": 0.18160002207098427,
    "std": 0.349257623880365,
    "range": 2.0307194590568542
  },
  "error_stats": {
    "mean": -0.0071375970548648035,
    "std": 0.2607038239094778,
    "max": 0.8785010613501072
  },
  "extra_stats": {
    "observation/robot_state/prev_command_successful": {
      "mean": 1.0,
      "std": 0.0,
      "min": 1.0,
      "max": 1.0
    },
    "action/robot_state/prev_controller_latency_ms": {
      "mean": 0.22838087046616956,
      "std": 0.032017270897274214,
      "min": 0.1515810042619705,
      "max": 0.3353759944438934
    },
    "observation/robot_state/prev_controller_latency_ms": {
      "mean": 0.24228705446186818,
      "std": 0.03825732534083878,
      "min": 0.1548240035772323,
      "max": 0.3577249944210052
    }
  },
  "embedding_koe5": [
    0.05085272341966629,
    0.004953502211719751,
    0.025171935558319092,

In [8]:
import pandas as pd

In [4]:
df1 = pd.read_csv("/home/dickson/문서/agentApp/backend/app/data/all_labs_merged.csv")


In [7]:
df1

Unnamed: 0,action/cartesian_velocity_col0,action/cartesian_velocity_col1,action/cartesian_velocity_col2,action/cartesian_velocity_col3,action/cartesian_velocity_col4,action/cartesian_velocity_col5,action/gripper_velocity,action/joint_velocity_col0,action/joint_velocity_col1,action/joint_velocity_col2,...,observation/robot_state/prev_joint_torques_computed_safened_col3,observation/robot_state/prev_joint_torques_computed_safened_col4,observation/robot_state/prev_joint_torques_computed_safened_col5,observation/robot_state/prev_joint_torques_computed_safened_col6,camera_id,session_id,desc_major,object_text,video_summary,lab_name
0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.142283,0.016028,-0.121965,...,2.654777,-0.080063,0.511664,0.077850,22008760,Fri_Jul_14_17_28_24_2023,,,제대로 동작하지 않음,AUTOLab
1,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.142179,0.016078,-0.121750,...,2.095490,-0.063049,0.459810,0.062080,22008760,Fri_Jul_14_17_28_24_2023,,,제대로 동작하지 않음,AUTOLab
2,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.142075,0.016118,-0.121562,...,1.983375,-0.064775,0.430028,0.057596,22008760,Fri_Jul_14_17_28_24_2023,,,제대로 동작하지 않음,AUTOLab
3,0.069639,0.062086,0.007866,0.013909,0.105186,0.088893,0.0,0.158121,0.116443,-0.103216,...,1.943410,-0.061533,0.425927,0.047436,22008760,Fri_Jul_14_17_28_24_2023,,,제대로 동작하지 않음,AUTOLab
4,0.196645,0.107526,0.052838,0.072707,0.031011,0.136134,0.0,0.154868,0.224567,-0.096917,...,0.676406,-0.349595,0.029090,0.129157,22008760,Fri_Jul_14_17_28_24_2023,,,제대로 동작하지 않음,AUTOLab
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
20011,-0.642855,-0.402850,0.651498,-0.338817,-0.487955,-0.000024,-1.0,0.071025,-0.928363,-0.177029,...,-1.905610,-0.591159,-0.161240,0.405375,28451778,Mon_Feb__5_14_43_20_2024,로봇이 대상을 내려놓고 있습니다.,,물건을 용기에서 꺼내 다른 곳에 둔다,TRI
20012,-0.686480,-0.406743,0.602748,-0.274768,-0.563342,0.122182,-1.0,0.040709,-0.906317,-0.137314,...,-1.024829,-0.754396,0.614741,0.273526,28451778,Mon_Feb__5_14_43_20_2024,로봇이 대상을 내려놓고 있습니다.,,물건을 용기에서 꺼내 다른 곳에 둔다,TRI
20013,-0.734497,-0.412432,0.538900,-0.210493,-0.747319,0.451510,-1.0,-0.001555,-0.864907,-0.084319,...,-2.126694,-0.694344,0.228921,0.360438,28451778,Mon_Feb__5_14_43_20_2024,로봇이 대상을 내려놓고 있습니다.,,물건을 용기에서 꺼내 다른 곳에 둔다,TRI
20014,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,-0.046199,-0.001111,0.044754,...,-2.324980,-0.682029,0.039449,0.152819,28451778,Mon_Feb__5_14_43_20_2024,작업이 정상적으로 완료되었습니다.,,물건을 용기에서 꺼내 다른 곳에 둔다,TRI


In [26]:
from pathlib import Path
import re

BASE_DIR = Path("/home/user2/문서/agentApp")




def normalize_session_id(session_id: str) -> str:
    """
    Fri_Aug_18_12_06_27_2023 → Fri_Aug_18_12:06:27_2023
    """
    return re.sub(
        r'_(\d{2})_(\d{2})_(\d{2})_(\d{4})$',
        lambda m: f"_{m.group(1)}:{m.group(2)}:{m.group(3)}_{m.group(4)}",
        session_id
    )
def find_video_path(session_id: str, camera_id: str) -> str | None:
    real_session_id = normalize_session_id(session_id)
    print(f"[dbg] 입력 session_id: {session_id}")
    print(f"[dbg] 변환 session_id: {real_session_id}")

    for p in BASE_DIR.rglob(f"{camera_id}.mp4"):
        if real_session_id in str(p):
            print(f"✅ 후보 매칭: {p}")
            return str(p)
    return None



# ===== 테스트 =====
session_id = "Fri_Aug_18_12_06_27_2023"   # 엘라스틱 저장 버전
camera_id = "22008760"

video_path = find_video_path(session_id, camera_id)

if video_path:
    print("✅ 찾음:", video_path)
else:
    print("❌ 파일 없음")


[dbg] 입력 session_id: Fri_Aug_18_12_06_27_2023
[dbg] 변환 session_id: Fri_Aug_18_12:06:27_2023
✅ 후보 매칭: /home/user2/문서/agentApp/AUTOLab_2gb_sessions/failure/2023-08-18/Fri_Aug_18_12:06:27_2023/Fri_Aug_18_12:06:27_2023/recordings/MP4/22008760.mp4
✅ 찾음: /home/user2/문서/agentApp/AUTOLab_2gb_sessions/failure/2023-08-18/Fri_Aug_18_12:06:27_2023/Fri_Aug_18_12:06:27_2023/recordings/MP4/22008760.mp4


In [14]:
df1[df1['session_id'] == session_id]

Unnamed: 0,action/cartesian_velocity_col0,action/cartesian_velocity_col1,action/cartesian_velocity_col2,action/cartesian_velocity_col3,action/cartesian_velocity_col4,action/cartesian_velocity_col5,action/gripper_velocity,action/joint_velocity_col0,action/joint_velocity_col1,action/joint_velocity_col2,...,observation/robot_state/prev_joint_torques_computed_safened_col3,observation/robot_state/prev_joint_torques_computed_safened_col4,observation/robot_state/prev_joint_torques_computed_safened_col5,observation/robot_state/prev_joint_torques_computed_safened_col6,camera_id,session_id,desc_major,object_text,video_summary,lab_name
323,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.095618,0.024538,-0.095145,...,2.645140,-0.053135,0.808138,0.035792,22008760,Fri_Aug_18_12_06_27_2023,로봇이 자세/정렬을 조정하고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
324,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.095407,0.024527,-0.094815,...,1.996317,-0.035019,0.644145,0.038137,22008760,Fri_Aug_18_12_06_27_2023,로봇이 자세/정렬을 조정하고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
325,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.095237,0.024516,-0.094587,...,1.954741,-0.032078,0.623744,0.035265,22008760,Fri_Aug_18_12_06_27_2023,로봇이 자세/정렬을 조정하고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
326,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.095251,0.024519,-0.094590,...,1.993170,-0.030712,0.645497,0.040840,22008760,Fri_Aug_18_12_06_27_2023,로봇이 자세/정렬을 조정하고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
327,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.095240,0.024516,-0.094577,...,2.064182,-0.045970,0.656759,0.035965,22008760,Fri_Aug_18_12_06_27_2023,로봇이 자세/정렬을 조정하고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
394,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,0.000000,0.000000,...,0.196622,0.004893,0.056496,0.005410,22008760,Fri_Aug_18_12_06_27_2023,로봇이 대상을 내려놓고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
395,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,0.000000,0.000000,...,0.197159,-0.001055,0.055686,-0.000518,22008760,Fri_Aug_18_12_06_27_2023,로봇이 대상을 내려놓고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
396,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,0.000000,0.000000,...,0.219174,-0.005902,0.064883,-0.002370,22008760,Fri_Aug_18_12_06_27_2023,로봇이 대상을 내려놓고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab
397,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000000,0.000000,0.000000,...,0.214137,3.170736,5.403646,-0.346027,22008760,Fri_Aug_18_12_06_27_2023,로봇이 대상을 내려놓고 있습니다.,박스,박스에 접근하던 중 급격한 움직임과 함께 충돌이 있었음,AUTOLab


In [1]:
from pathlib import Path
from PIL import Image
import imageio.v2 as iio

IMG_EXT = {".jpg",".jpeg",".png",".webp",".bmp"}
VID_EXT = {".mp4",".mov",".mkv",".avi"}

def read_first_frame_any(path, target_w=384, target_h=384):
    p = Path(path)
    if p.is_dir():
        imgs = sorted([q for q in p.iterdir() if q.suffix.lower() in IMG_EXT])
        if not imgs:
            raise FileNotFoundError(f"No frame images in {p}")
        img = Image.open(imgs[0]).convert("RGB")
    elif p.suffix.lower() in VID_EXT:
        with iio.get_reader(str(p)) as r:
            frame = r.get_data(0)
        img = Image.fromarray(frame).convert("RGB")
    else:
        # 단일 이미지 파일
        img = Image.open(p).convert("RGB")
    return img.resize((target_w, target_h))


In [4]:
# -*- coding: utf-8 -*-
"""
SigLIP 기반 멀티모달 임베딩 유틸
- 이미지와 텍스트를 같은 임베딩 공간/차원으로 생성
- 통합 forward가 아닌 전용 함수(get_*_features) 사용으로 안전화
"""

from typing import List
import torch
import numpy as np
from PIL import Image
from transformers import SiglipProcessor, SiglipModel


class UnifiedEmbedder:
    """
    SigLIP 통합 임베더
    - embed_texts(List[str])  -> (N, D)
    - embed_images(List[Image.Image]) -> (N, D)
    - 차원 D 동일 (모델에 따라 768/1024/1152 등)
    """
    def __init__(
        self,
        model_name: str,
        device: str = "cuda",
        dtype: str = "float16",
        normalize: bool = True,
    ):
        # 디바이스/정규화
        self.device = torch.device(device if (device == "cpu" or torch.cuda.is_available()) else "cpu")
        self.normalize = normalize

        # dtype
        if dtype.lower() in ("fp16", "float16"):
            self.dtype = torch.float16
        elif dtype.lower() in ("bf16", "bfloat16"):
            self.dtype = torch.bfloat16
        else:
            self.dtype = torch.float32  # CPU 권장

        # Processor & Model (여기서 forward 호출 금지!)
        self.processor = SiglipProcessor.from_pretrained(model_name)
        self.model = SiglipModel.from_pretrained(model_name, torch_dtype=self.dtype)
        self.model.to(self.device).eval()

        # 임베딩 차원: config.projection_dim 사용 (없으면 추후 첫 임베딩 시 확정)
        self.embed_dim = int(getattr(self.model.config, "projection_dim", 0)) or 0

    # -----------------------------
    # 텍스트 임베딩
    # -----------------------------
    @torch.inference_mode()
    def embed_texts(self, texts: List[str]) -> np.ndarray:
        if len(texts) == 0:
            return np.empty((0, self.embed_dim or 0), dtype=np.float32)

        # tokenizer → ids, mask
        enc = self.processor(
            text=texts,
            padding=True,
            truncation=True,
            return_tensors="pt",
        )
        input_ids = enc.get("input_ids", None)
        attention_mask = enc.get("attention_mask", None)
        if input_ids is None:
            # SigLIP 텍스트 경로는 input_ids가 반드시 있어야 함
            raise ValueError("SigLIP 텍스트 임베딩에 필요한 input_ids가 생성되지 않았습니다.")

        input_ids = input_ids.to(self.device)
        attention_mask = attention_mask.to(self.device) if attention_mask is not None else None

        # ⚠️ 통합 forward 금지: 전용 함수 사용
        emb = self.model.get_text_features(
            input_ids=input_ids,
            attention_mask=attention_mask,
        )  # (B, D)

        if self.embed_dim == 0:
            self.embed_dim = int(emb.shape[-1])

        if self.normalize:
            emb = torch.nn.functional.normalize(emb, p=2, dim=-1)

        return emb.detach().cpu().numpy().astype(np.float32)

    # -----------------------------
    # 이미지 임베딩
    # -----------------------------
    @torch.inference_mode()
    def embed_images(self, images: List[Image.Image]) -> np.ndarray:
        if len(images) == 0:
            return np.empty((0, self.embed_dim or 0), dtype=np.float32)

        # processor → pixel_values
        enc = self.processor(
            images=images,
            return_tensors="pt",
        )
        pixel_values = enc.get("pixel_values", None)
        if pixel_values is None:
            raise ValueError("SigLIP 이미지 임베딩에 필요한 pixel_values가 생성되지 않았습니다.")

        pixel_values = pixel_values.to(self.device)

        # ⚠️ 통합 forward 금지: 전용 함수 사용
        emb = self.model.get_image_features(pixel_values=pixel_values)  # (B, D)

        if self.embed_dim == 0:
            self.embed_dim = int(emb.shape[-1])

        if self.normalize:
            emb = torch.nn.functional.normalize(emb, p=2, dim=-1)

        return emb.detach().cpu().numpy().astype(np.float32)

    # -----------------------------
    # 텍스트/이미지 쌍 임베딩 결합
    # -----------------------------
    def embed_pair_and_fuse(self, texts: List[str], images: List[Image.Image], mode: str = "mean") -> np.ndarray:
        te = self.embed_texts(texts)
        ie = self.embed_images(images)
        assert te.shape[0] == ie.shape[0], "텍스트/이미지 배치 크기가 다릅니다."

        if mode == "mean":
            fused = (te + ie) / 2.0
        else:
            # 필요 시 가중 평균/concat 등 확장
            fused = (te + ie) / 2.0

        if self.normalize:
            denom = np.linalg.norm(fused, axis=1, keepdims=True) + 1e-12
            fused = fused / denom
        return fused

    def get_dim(self) -> int:
        return int(self.embed_dim)


In [7]:
# -*- coding: utf-8 -*-
"""
고정 스키마용 CSV 인덱서 (초간단/고속판)
- 컬럼명은 고정: session_id, camera_id, video_summary, lab_name
- 기본 CSV 경로: data/all_labs_merged.csv
- 반환: dict[session_id] = { "camera_id":..., "caption":..., "lab":... }
"""

import csv
from pathlib import Path
from typing import Dict

# 고정 경로/스키마 (필요하면 여기만 바꾸세요)
DEFAULT_CSV = Path("/home/dickson/문서/agentApp/backend/app/data/all_labs_merged.csv")
REQUIRED = ("session_id", "camera_id", "video_summary", "lab_name")

def discover_csv() -> Path:
    """항상 같은 파일을 쓴다는 전제라면 단순 반환."""
    return DEFAULT_CSV

def _require_headers(headers) -> None:
    miss = [h for h in REQUIRED if h not in headers]
    if miss:
        raise SystemExit(f"[ERR] CSV missing required columns: {miss} (have: {headers})")

def build_session_index(csv_path: str | Path | None = None) -> Dict[str, Dict[str, str]]:
    """
    매우 빠른 인덱싱: 별칭/오버라이드 없음, 고정 컬럼만 읽음.
    """
    p = Path(csv_path) if csv_path else discover_csv()
    if not p.exists():
        raise SystemExit(f"[ERR] CSV not found: {p}")

    idx: Dict[str, Dict[str, str]] = {}
    with open(p, "r", encoding="utf-8") as f:
        r = csv.DictReader(f)
        headers = [h.strip() for h in (r.fieldnames or [])]
        _require_headers(headers)

        # 필드 키 조회 비용을 줄이기 위해 인덱스 캐시
        sid_k, cam_k, cap_k, lab_k = "session_id", "camera_id", "video_summary", "lab_name"

        for row in r:
            # 최소 트림만 수행 (lower 필요 없음: 스키마 고정이므로)
            sid = (row.get(sid_k) or "").strip()
            if not sid:
                continue
            cam = (row.get(cam_k) or "").strip()
            cap = (row.get(cap_k) or "").strip()
            lab = (row.get(lab_k) or "").strip()
            idx[sid] = {"camera_id": cam, "caption": cap, "lab": lab}
    return idx


In [3]:
# embedding/text2video/data_finder.py
# -*- coding: utf-8 -*-
from pathlib import Path
from typing import Iterable, List, Optional

VID_EXT = {".mp4", ".mov", ".mkv", ".avi"}

def discover_video_roots(
    base_dirs: Iterable[str] = ("data",),
    lab_names: Iterable[str] = (),
    max_depth: int = 3,
) -> List[Path]:
    """
    lab 힌트를 쓰되, 구현은 단순: base 자체만 루트로 두고 시작.
    - lab 폴더를 굳이 더 찾는 과탐색을 줄이고, 일단 base 디렉토리만 루트로 삼음.
    - (대부분 케이스) 루트가 소수이고, 아래의 정확 매칭 글롭이 빠르게 끝남.
    """
    roots: List[Path] = []
    for b in base_dirs:
        p = Path(b).resolve()
        if p.exists():
            roots.append(p)
    return roots or [Path("data").resolve()]

def _glob_first(root: Path, patterns: list[str], max_hits: int = 1) -> List[Path]:
    """
    여러 패턴을 순서대로 글롭하여 최초 max_hits개만 수집.
    - 조기 종료로 디스크 트래버설 최소화.
    """
    out: List[Path] = []
    for pat in patterns:
        for hit in root.glob(pat):
            out.append(hit)
            if len(out) >= max_hits:
                return out
    return out

def resolve_video_exact(
    roots: Iterable[Path],
    lab_name: str,
    session_id: str,
    camera_id: str,
) -> Optional[Path]:
    """
    '정확 매칭' 규칙을 글롭 패턴으로 바로 표현:
      - **/*{sid}*/(recordings/MP4|MP4)/*{cam}*.mp4
      - 없으면 **/*{sid}*/*{cam}*.mp4 (제한 깊이로 과탐색 완화는 roots 설계로 해결)
    여러 개면 경로가 짧은 순으로 정렬해 1개 반환.
    """
    sid = (session_id or "").lower()
    cam = (camera_id or "").lower()

    if not sid or not cam:
        return None

    # 우선순위 높은 패턴들
    pri_patterns = [
        f"**/*{sid}*/recordings/MP4/*{cam}*.mp4",
        f"**/*{sid}*/MP4/*{cam}*.mp4",
    ]
    # 최후 fallback
    fb_patterns = [
        f"**/*{sid}*/*{cam}*.mp4",
    ]

    candidates: List[Path] = []
    for root in roots:
        # 1) 우선 패턴에서 바로 찾기
        hits = _glob_first(root, pri_patterns, max_hits=3)
        if not hits:
            hits = _glob_first(root, fb_patterns, max_hits=3)

        for h in hits:
            # 케이스 인식: 소문자 비교로 camera_id 포함 재확인
            if cam in h.name.lower() and h.suffix.lower() in VID_EXT:
                candidates.append(h)

        if candidates:
            break  # 루트별 조기 종료

    if not candidates:
        return None

    candidates.sort(key=lambda p: len(str(p)))
    return candidates[0]


In [8]:
# -*- coding: utf-8 -*-
"""
CSV 기반 첫 프레임 임베딩 → 결과 구조 미리보기 (ES 적재 전 테스트용)
"""

import pandas as pd
from pathlib import Path



CSV_PATH = "/home/dickson/문서/agentApp/backend/app/data/all_labs_merged.csv"   # 필요시 직접 지정
INDEX_NAME = "embeddings_video"

# ✅ 모델 로드
siglip = UnifiedEmbedder(
    "google/siglip-so400m-patch14-384",
    device="cuda",
    dtype="float16",
    normalize=True
)

def build_actions():
    # -------------------
    # 1) CSV 인덱스 생성
    # -------------------
    csv_path = Path(CSV_PATH).resolve() if CSV_PATH else discover_csv()
    idx = build_session_index(str(csv_path))
    print(f"[INFO] parsed sessions: {len(idx)}")

    if not idx:
        raise RuntimeError("세션 인덱스 없음")

    # -------------------
    # 2) 비디오 루트 탐색
    # -------------------
    roots = discover_video_roots(base_dirs=("data",), lab_names=(), max_depth=3)
    print("[INFO] roots to search:")
    for r in roots:
        print(" -", r)

    # -------------------
    # 3) 세션별 → actions 리스트
    # -------------------
    actions = []
    n_ok, n_miss = 0, 0

    for sid, info in idx.items():
        cam = (info.get("camera_id", "") or "cam")
        lab = info.get("lab", "")
        caption = info.get("caption", "")

        doc_id = f"video:{sid}:{cam}"

        res = resolve_video_exact(roots, lab, sid, cam)
        if not res:
            n_miss += 1
            continue

        try:
            img = read_first_frame_any(str(res), target_w=384, target_h=384)
            emb_vec = siglip.embed_images([img])[0].tolist()
        except Exception as e:
            print(f"[ERR] {sid}/{cam} embed fail: {e}")
            n_miss += 1
            continue

        actions.append({
            "_op_type": "index",
            "_index": INDEX_NAME,
            "_id": doc_id,
            "_source": {
                "doc_id": doc_id,
                "session_id": sid,
                "camera_id": cam,
                "lab": lab,
                "text": caption,
                "image_ref": Path(res).name,
                "step_idx": 0,
                "time_ms": 0,
                "embedding_siglip": emb_vec,
            }
        })
        n_ok += 1

    print(f"[DONE] prepared={n_ok}, missed={n_miss}, total={len(idx)}")
    return actions


# ✅ Jupyter에서 테스트 실행
actions = build_actions()

# 앞에 2개만 구조 확인
for doc in actions[:2]:
    from pprint import pprint
    pprint(doc)


[INFO] parsed sessions: 82
[INFO] roots to search:
 - /home/dickson/문서/agentApp/data
[DONE] prepared=0, missed=82, total=82


In [9]:
actions

[]