In [None]:
import os
import csv
import logging
import sys
import pickle
from pathlib import Path
import numpy as np
from typing import Optional, List, Tuple, Dict, Any
import time

import torch
from torch.utils.data import Dataset, DataLoader, IterableDataset
from torchvision import transforms
from PIL import Image

# --- 설정 ---
# 데이터가 저장된 기본 경로
BASE_PATH = Path("/home/najo/NAS/VLA/Qwen2.5-VL-3B-_OCT_FPI_Action_Model/Real_Env_Test/")

# 뷰 이름과 해당 하위 폴더 경로 (View5는 하위 폴더 없음)
# 이 키(key)들이 최종적으로 데이터에 포함될 뷰 이름이 됩니다.
VIEW_CONFIG = {
    "View1_left": "View1/left",
    "View1_right": "View1/right",
    "View2_left": "View2/left",
    "View2_right": "View2/right",
    "View3_left": "View3/left",
    "View3_right": "View3/right",
    "View4_left": "View4/left",
    "View4_right": "View4/right",
    "View5": "View5", # View5는 하위 폴더 없음
}
# 이미지 파일 확장자
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png'}
# 센서-이미지 간 최대 허용 시간 차이 (초)
MATCH_TOLERANCE_SEC = 0.1 

# CSV 헤더에서 찾을 열 이름 및 인덱스
CSV_TIMESTAMP_COL = "send_timestamp" # 2번 인덱스
CSV_JOINT_COLS = [f"joint_{i}" for i in range(1, 7)] # 4~9번 인덱스
CSV_POSE_COLS = ["pose_x", "pose_y", "pose_z", "pose_a", "pose_b", "pose_r"] # 10~15번 인덱스

# --- 로깅 설정 ---
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)

# --- 헬퍼 함수 (데이터 로딩 및 매칭) ---

def load_sensor_data_from_csv(csv_path: str) -> List[Dict[str, Any]]:
    """
    주어진 CSV 파일에서 모든 행의 타임스탬프, 관절, 포즈 데이터를 읽어옵니다.
    """
    logging.info(f"Loading sensor data from {csv_path}...")
    sensor_states = []
    try:
        with open(csv_path, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            required_cols = [CSV_TIMESTAMP_COL] + CSV_JOINT_COLS + CSV_POSE_COLS
            if not all(col in reader.fieldnames for col in required_cols):
                logging.error(f"CSV에 필요한 열이 부족합니다. 건너뜁니다. (파일: {csv_path})")
                return []

            for i, row in enumerate(reader):
                try:
                    state = {
                        'time': float(row[CSV_TIMESTAMP_COL]),
                        'joints': np.array([float(row[col]) for col in CSV_JOINT_COLS], dtype=np.float32),
                        'pose': np.array([float(row[col]) for col in CSV_POSE_COLS], dtype=np.float32)
                    }
                    sensor_states.append(state)
                except (ValueError, TypeError) as e:
                    logging.warning(f"CSV {i+2}번째 행 파싱 실패 ({e}). 건너뜁니다.")
    except Exception as e:
        logging.error(f"CSV 로드 중 오류: {e}")
        return []
    logging.info(f"✅ CSV 로드 성공! 총 {len(sensor_states)}개 데이터 포인트.")
    return sensor_states

def extract_timestamp_from_filename(filename: str) -> Optional[float]:
    try:
        stem = os.path.splitext(filename)[0]
        parts = stem.split('_')
        timestamp_str = parts[-1] 
        return float(timestamp_str)
    except (ValueError, IndexError):
        logging.warning(f"파일명 타임스탬프 파싱 불가: {filename}")
        return None

def index_image_files(view_path: Path) -> List[Tuple[float, str]]:
    """
    주어진 뷰 폴더 내의 모든 이미지 파일을 찾아 (하위 폴더 포함)
    (타임스탬프, 경로) 리스트로 반환하고 시간순으로 정렬합니다.
    """
    image_list = []
    if not view_path.is_dir():
        logging.warning(f"View 폴더를 찾을 수 없음: {view_path}")
        return []

    for root, dirs, files in os.walk(view_path):
        for file in files:
            file_ext = os.path.splitext(file)[1].lower()
            if file_ext in IMAGE_EXTENSIONS:
                image_time = extract_timestamp_from_filename(file)
                if image_time is not None:
                    image_list.append((image_time, os.path.join(root, file)))
    
    image_list.sort() # 시간순 정렬
    return image_list

def find_closest_image_path(target_time: float, 
                            image_list: List[Tuple[float, str]], 
                            tolerance: float) -> Optional[str]:
    """
    정렬된 이미지 리스트(image_times 오름차순)에서 target_time과 가장 가까운 이미지를
    이진 탐색(np.searchsorted)을 이용해 효율적으로 찾습니다.
    """
    if not image_list:
        return None

    # (시간, 경로) 튜플에서 시간만 추출 (매번 생성 대신 캐싱 가능하지만, 여기서는 단순화)
    image_times = np.array([t for t, p in image_list])
    
    idx = np.searchsorted(image_times, target_time, side='left')
    
    best_match_path = None
    min_diff = float('inf')

    # 찾은 위치(idx) 확인
    if idx < len(image_times):
        diff = abs(image_times[idx] - target_time)
        if diff < min_diff:
            min_diff = diff
            best_match_path = image_list[idx][1]
    # 바로 전 위치(idx-1) 확인
    if idx > 0:
        diff = abs(image_times[idx-1] - target_time)
        if diff < min_diff:
            min_diff = diff
            best_match_path = image_list[idx-1][1]
    
    # 허용 오차(tolerance) 이내인 경우에만 경로 반환
    if min_diff <= tolerance:
        return best_match_path
    else:
        return None 

# --- PyTorch IterableDataset 클래스 정의 ---

class RealTimeMultiviewDataset(IterableDataset):
    """
    'recv_all_...' 폴더에서 CSV와 이미지 파일을 실시간으로 읽고 매칭하는 IterableDataset.
    """
    
    def __init__(self, 
                 base_path: Path, 
                 view_config: Dict[str, str], 
                 image_transform: transforms.Compose, 
                 tolerance: float = 0.1):
        """
        Args:
            base_path (Path): 'recv_all_...' 폴더들이 있는 기본 경로.
            view_config (Dict[str, str]): {'View1_left': 'View1/left', ...} 형식의 뷰 설정.
            image_transform (transforms.Compose): PIL 이미지에 적용할 변환.
            tolerance (float): 센서-이미지 간 최대 허용 시간 차이 (초).
        """
        super().__init__()
        self.base_path = base_path
        self.view_config = view_config
        self.view_keys_order = list(view_config.keys()) # 9개 뷰의 순서 고정
        self.transform = image_transform
        self.tolerance = tolerance
        
        if not self.base_path.is_dir():
            raise FileNotFoundError(f"기본 경로를 찾을 수 없습니다: {base_path}")

    def _process_sample(self, state: Dict[str, Any], image_paths: Dict[str, str]) -> Dict[str, torch.Tensor]:
        """하나의 매칭된 샘플을 로드하고 텐서로 변환합니다."""
        
        # 1. 이미지 로드 및 변환 (고정된 순서로)
        image_tensors = []
        for view_key in self.view_keys_order: # 9개 뷰 순서 고정
            image_path = image_paths[view_key]
            pil_image = Image.open(image_path).convert('RGB')
            image_tensor = self.transform(pil_image)
            image_tensors.append(image_tensor)

        # (9, C, H, W) 형태로 스택
        images_stacked = torch.stack(image_tensors, dim=0)

        # 2. 센서 데이터 텐서 변환
        joints = torch.tensor(state['joints'], dtype=torch.float32) # (6,)
        pose = torch.tensor(state['pose'], dtype=torch.float32)     # (6,)

        # 3. 모델 입력 딕셔너리로 반환
        return {
            'pixel_values': images_stacked, # (9, C, H, W)
            'joints': joints,               # (6,)
            'pose': pose,                   # (6,)
            'timestamp': torch.tensor(state['time'], dtype=torch.float64) # (스칼라)
        }

    def __iter__(self) -> Dict[str, torch.Tensor]:
        """
        데이터셋 이터레이터. 세션 폴더를 순회하며 매칭된 데이터를 yield합니다.
        """
        logging.info(f"Dataset iterator 시작. {self.base_path} 탐색...")

        # 1. 'recv_all_...' 폴더 순회
        session_paths = sorted([entry.path for entry in os.scandir(self.base_path) 
                                if entry.is_dir() and entry.name.startswith("recv_all_")])
        
        if not session_paths:
            logging.warning(f"{self.base_path}에서 'recv_all_...' 폴더를 찾지 못했습니다.")
            return

        for session_path_str in session_paths:
            session_path = Path(session_path_str)
            logging.info(f"--- 세션 폴더 처리 시작: {session_path.name} ---")

            # 2. CSV 파일 로드 (세션당 1회)
            csv_files = list(session_path.glob('robot_state_*.csv'))
            if not csv_files:
                logging.warning(f"CSV 파일 없음. 건너뜁니다: {session_path.name}")
                continue
            
            sensor_states = load_sensor_data_from_csv(str(csv_files[0]))
            if not sensor_states:
                logging.warning(f"센서 데이터 없음. 건너뜁니다: {session_path.name}")
                continue
            
            # 3. 모든 뷰의 이미지 파일 인덱싱 (세션당 1회)
            image_indexes: Dict[str, List[Tuple[float, str]]] = {}
            for view_key, view_subpath in self.view_config.items():
                view_path = session_path / view_subpath
                # logging.info(f"  '{view_key}' 폴더 인덱싱...")
                image_indexes[view_key] = index_image_files(view_path)
                if not image_indexes[view_key]:
                     logging.warning(f"  '{view_key}' 폴더에 이미지가 없습니다.")
            
            logging.info(f"  이미지 인덱싱 완료. {len(sensor_states)}개 센서 데이터와 매칭 시작...")

            # 4. 센서 타임스탬프 기준으로 매칭 및 yield
            matched_count_in_session = 0
            for state in sensor_states:
                sensor_time = state['time']
                matched_image_paths = {}
                all_views_matched = True

                # 모든 뷰(9개)에 대해 가장 가까운 이미지 찾기
                for view_key in self.view_keys_order:
                    closest_path = find_closest_image_path(
                        sensor_time, 
                        image_indexes[view_key], 
                        self.tolerance
                    )
                    if closest_path:
                        matched_image_paths[view_key] = closest_path
                    else:
                        all_views_matched = False
                        break # 하나라도 실패하면 중단
                
                # 5. 모든 뷰가 매칭되었으면 데이터 처리 및 yield
                if all_views_matched:
                    try:
                        processed_sample = self._process_sample(state, matched_image_paths)
                        yield processed_sample
                        matched_count_in_session += 1
                    except Exception as e:
                        logging.error(f"샘플 처리 중 오류 (인덱스 {state['time']}): {e}")
            
            logging.info(f"--- 세션 처리 완료: {matched_count_in_session}개 데이터 포인트 생성 ---")


# --- 메인 실행 예시 (테스트용) ---
if __name__ == "__main__":

    # DINO / ImageNet 표준 이미지 변환
    MEAN = [0.485, 0.456, 0.406]
    STD  = [0.229, 0.224, 0.225]
    RESIZE = 224
    
    image_transform = transforms.Compose([
        transforms.Resize((RESIZE, RESIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=MEAN, std=STD),
    ])

    try:
        # 1. 데이터셋 인스턴스 생성
        print("RealTimeMultiviewDataset 생성 중...")
        dataset = RealTimeMultiviewDataset(
            base_path=BASE_PATH,
            view_config=VIEW_CONFIG,
            image_transform=image_transform,
            tolerance=MATCH_TOLERANCE_SEC
        )
        print("데이터셋 생성 완료.")
        
        # 2. DataLoader 생성
        print("DataLoader 생성 중... (Batch Size = 4)")
        # IterableDataset은 shuffle=True를 DataLoader에서 직접 못하므로,
        # RLDS 로더처럼 내부 버퍼링이 필요하지만, 여기서는 순차 로딩으로 테스트합니다.
        dataloader = DataLoader(
            dataset,
            batch_size=4,
            num_workers=0, # IterableDataset은 멀티워커 설정 시 주의 필요 (0이 가장 안전)
            # collate_fn은 기본값(default_collate) 사용
        )
        print("DataLoader 생성 완료.")

        # 3. 첫 번째 배치 가져오기
        print("\n첫 번째 배치 로딩 시도...")
        start_time = time.time()
        first_batch = next(iter(dataloader))
        end_time = time.time()
        
        print(f"✅ 첫 번째 배치 로드 성공! (소요 시간: {end_time - start_time:.2f}초)")
        print("\n" + "="*30)
        print(" 첫 번째 배치 정보 (Batch 0)")
        print("="*30)
        
        for key, value in first_batch.items():
            if isinstance(value, torch.Tensor):
                print(f"  Key: '{key}'")
                print(f"    Shape: {value.shape}") # (B, 9, C, H, W) 또는 (B, 6)
                print(f"    Dtype: {value.dtype}")
            else:
                print(f"  Key: '{key}', Type: {type(value)}")

    except Exception as e:
        print(f"\n테스트 실행 중 오류 발생: {e}")
        import traceback
        traceback.print_exc()

RealTimeMultiviewDataset 생성 중...
데이터셋 생성 완료.
DataLoader 생성 중... (Batch Size = 4)
DataLoader 생성 완료.

첫 번째 배치 로딩 시도...
2025-10-24 16:23:04,675 - INFO - Dataset iterator 시작. /home/najo/NAS/VLA/Qwen2.5-VL-3B-_OCT_FPI_Action_Model/Real_Env_Test 탐색...
2025-10-24 16:23:04,676 - INFO - --- 세션 폴더 처리 시작: recv_all_20251022_044355 ---
2025-10-24 16:23:04,678 - INFO - Loading sensor data from /home/najo/NAS/VLA/Qwen2.5-VL-3B-_OCT_FPI_Action_Model/Real_Env_Test/recv_all_20251022_044355/robot_state_20251022_044355.csv...
2025-10-24 16:23:04,752 - INFO - ✅ CSV 로드 성공! 총 5130개 데이터 포인트.
2025-10-24 16:23:04,757 - INFO -   이미지 인덱싱 완료. 5130개 센서 데이터와 매칭 시작...
2025-10-24 16:23:04,784 - INFO - --- 세션 처리 완료: 0개 데이터 포인트 생성 ---
2025-10-24 16:23:04,785 - INFO - --- 세션 폴더 처리 시작: recv_all_20251023_150612 ---
2025-10-24 16:23:04,787 - INFO - Loading sensor data from /home/najo/NAS/VLA/Qwen2.5-VL-3B-_OCT_FPI_Action_Model/Real_Env_Test/recv_all_20251023_150612/robot_state_20251023_150612.csv...
2025-10-24 16:23:04,848 