In [None]:
class RobotPoseDataset(Dataset):
    def __init__(self, pairs, transform=None, HEATMAP_SIZE=(128, 128), sigma=5.0):
        self.pairs = pairs
        self.transform = transform
        self.heatmap_size = HEATMAP_SIZE
        self.sigma = sigma

        print("Loading and preprocessing metadata...")
        # --- 1. ArUco 데이터 로드 ---
        self.aruco_lookup = {}
        # pose1 데이터 로드
        pose1_aruco_path = '../dataset/franka_research3/pose1_aruco_pose_summary.json'
        with open(pose1_aruco_path, 'r') as f:
            for item in json.load(f):
                self.aruco_lookup[f"pose1_{item['view']}_{item['cam']}"] = item
        # pose2 데이터 로드
        pose2_aruco_path = '../dataset/franka_research3/pose2_aruco_pose_summary.json'
        with open(pose2_aruco_path, 'r') as f:
            for item in json.load(f):
                self.aruco_lookup[f"pose2_{item['view']}_{item['cam']}"] = item
        
        # --- 2. Calibration 데이터 로드 ---
        self.calib_lookup = {}
        calib_dir = "../dataset/franka_research3/Calib_cam_from_conf"
        for calib_path in glob.glob(os.path.join(calib_dir, "*.json")):
            filename = os.path.basename(calib_path).replace("_calib.json", "")
            with open(calib_path, 'r') as f:
                self.calib_lookup[filename] = json.load(f)
        
        # ▼▼▼ [수정] serial_to_view 딕셔너리를 __init__으로 이동 (성능 향상) ▼▼▼
        self.serial_to_view = {
            '41182735': "view1", '49429257': "view2",
            '44377151': "view3", '49045152': "view4"
        }
                
        print(f"✅ Metadata loaded. Found {len(self.aruco_lookup)} ArUco entries and {len(self.calib_lookup)} calibration files.")

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        # ▼▼▼ [수정] 안정성을 위해 try-except 구문 추가 ▼▼▼
        try:
            pair = self.pairs[idx]
            image_path = pair['image_path']
            
            # --- 1. 파일 경로 분석 ---
            filename = os.path.basename(image_path)
            parts = filename.split('_')
            serial_str = parts[1]
            selected_cam = parts[2] + "cam"
            selected_view = self.serial_to_view[serial_str]

            # --- 2. 이미지 및 메타데이터 로드 ---
            calib_key = f"{selected_view}_{serial_str}_{selected_cam}"
            calib = self.calib_lookup[calib_key]
            camera_matrix = np.array(calib["camera_matrix"], dtype=np.float32)
            dist_coeffs = np.array(calib["distortion_coeffs"], dtype=np.float32)
            
            if 'pose1' in image_path:
                pose_name = 'pose1'
            elif 'pose2' in image_path:
                pose_name = 'pose2'
            else:
                raise ValueError(f"Image path does not contain 'pose1' or 'pose2': {image_path}")
            aruco_key = f"{pose_name}_{selected_view}_{selected_cam}"
            aruco_result = self.aruco_lookup[aruco_key]

            # --- 3. 이미지 처리 ---
            image = Image.open(image_path).convert('RGB')
            image_np = np.array(image)
            undistorted_image_np = cv2.undistort(image_np, camera_matrix, dist_coeffs)
            undistorted_image = Image.fromarray(undistorted_image_np)
            image_tensor = self.transform(undistorted_image)

            # --- 4. Joint 데이터 로드 및 3D 좌표 계산 ---
            joint_angle_data = pair['joint_angles']
            gt_angles = torch.tensor(joint_angle_data, dtype=torch.float32)
            
            # ▼▼▼ [수정] 올바른 함수 이름 사용 및 불필요한 인자 제거 ▼▼▼
            joint_coords_3d = forward_kinematics(joint_angle_data)

            # --- 5. 3D->2D 투영 (단위 변환 포함) ---
            # ▼▼▼ [수정] rvec 단위를 Degree에서 Radian으로 변환하는 로직 추가 ▼▼▼
            rvec_deg = np.array([
                aruco_result.get('rvec_x_deg', aruco_result.get('rvec_x', 0)),
                aruco_result.get('rvec_y_deg', aruco_result.get('rvec_y', 0)),
                aruco_result.get('rvec_z_deg', aruco_result.get('rvec_z', 0))
            ], dtype=np.float32)
            rvec = np.deg2rad(rvec_deg) # 단위 변환
            
            tvec = np.array([
                aruco_result.get('tvec_x', aruco_result.get('mean_x', 0)),
                aruco_result.get('tvec_y', aruco_result.get('mean_y', 0)),
                aruco_result.get('tvec_z', aruco_result.get('mean_z', 0))
            ], dtype=np.float32).reshape(3, 1)

            # 왜곡 보정된 이미지에 투영하므로 dist_coeffs는 0으로 설정
            pixel_coords, _ = cv2.projectPoints(joint_coords_3d, rvec, tvec, camera_matrix, np.zeros_like(dist_coeffs))
            pixel_coords = pixel_coords.reshape(-1, 2)
            
            # --- 6. 히트맵 생성 ---
            num_joints = len(pixel_coords)
            original_h, original_w, _ = undistorted_image_np.shape

            scaled_keypoints = np.zeros_like(pixel_coords)
            scaled_keypoints[:, 0] = pixel_coords[:, 0] * (self.heatmap_size[1] / original_w)
            scaled_keypoints[:, 1] = pixel_coords[:, 1] * (self.heatmap_size[0] / original_h)

            gt_heatmaps_np = np.zeros((num_joints, self.heatmap_size[0], self.heatmap_size[1]), dtype=np.float32)
            for i in range(num_joints):
                gt_heatmaps_np[i] = create_gt_heatmap(scaled_keypoints[i], self.heatmap_size, self.sigma)
            gt_heatmaps = torch.from_numpy(gt_heatmaps_np)

            return image_tensor, gt_heatmaps, gt_angles

        except Exception as e:
            # 오류 발생 시 해당 샘플을 건너뛰도록 None을 반환
            print(f"⚠️ Warning: Skipping sample {idx} due to error in '{image_path}': {e}")
            return None, None, None

In [None]:
NUM_ANGLES = 7
NUM_JOINTS = 8
FEATURE_DIM = 768
HEATMAP_SIZE = (128, 128)

import timm
import torch
import torch.nn as nn
from transformers import AutoImageProcessor, AutoModel
from transformers.image_utils import load_image
import torch.nn.functional as F

from transformers import AutoImageProcessor
from torchvision import transforms

MODEL_NAME ='facebook/dinov3-convnext-tiny-pretrain-lvd1689m'
# facebook/dinov3-vitb16-pretrain-lvd1689m
# facebook/dinov3-convnext-base-pretrain-lvd1689m


# 1. DINOv3 모델에 맞는 이미지 프로세서 로드
processor = AutoImageProcessor.from_pretrained(MODEL_NAME)

# 2. 프로세서에서 평균과 표준편차 값 추출
dino3_mean = processor.image_mean
dino3_std = processor.image_std

# 3. 이 값들을 사용하여 Transform 파이프라인 재구성
# 예시: 학습용 Transform
train_transform = transforms.Compose([
    transforms.Resize(processor.size["shortest_edge"]),
    transforms.CenterCrop(processor.crop_size["height"]),
    transforms.ColorJitter(brightness=0.2, contrast=0.15, saturation=0.15, hue=0.05),
    transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
    transforms.RandomGrayscale(p=0.1),
    transforms.RandomErasing(p=0.2, scale=(0.1, 0.2), ratio=(0.3, 2.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=dino3_mean, std=dino3_std) # ✅ DINOv3 값으로 변경
])

# 예시: 검증용 Transform
val_transform = transforms.Compose([
    transforms.Resize(processor.size["shortest_edge"]),
    transforms.CenterCrop(processor.crop_size["height"]),
    transforms.ToTensor(),
    transforms.Normalize(mean=dino3_mean, std=dino3_std) # ✅ DINOv3 값으로 변경
])


class DINOv3Backbone(nn.Module):
    """
    Hugging Face transformers 라이브러리를 사용하여 DINOv3 모델을 구성합니다.
    사전에 정규화된 이미지 텐서 배치를 입력받아 패치 토큰을 반환합니다.
    """
    def __init__(self, model_name=MODEL_NAME): # ViT-Base 모델을 기본값으로 사용
        super().__init__()
        # 사전 학습된 DINOv3 모델을 불러옵니다.
        self.model = AutoModel.from_pretrained(model_name)
        # ⚠️ 참고: 모델을 특정 장치(.to('cuda'))로 보내는 코드는
        # 메인 학습 스크립트에서 한 번에 처리하는 것이 좋습니다.

    def forward(self, image_tensor_batch):
        """
        Args:
            image_tensor_batch (torch.Tensor): (B, C, H, W) 형태의 정규화된 이미지 텐서
        """
        # 그래디언트 계산을 비활성화합니다.
        with torch.no_grad():
            # Hugging Face 모델은 'pixel_values'라는 키워드 인자를 기대합니다.
            outputs = self.model(pixel_values=image_tensor_batch)

        last_hidden_state = outputs.last_hidden_state
        
        # 클래스 토큰(CLS)을 제외한 패치 토큰들만 반환합니다.
        patch_tokens = last_hidden_state[:, 1:, :]
        
        return patch_tokens

class JointAngleHead(nn.Module):
    def __init__(self, input_dim=FEATURE_DIM, num_angles=NUM_ANGLES, num_queries=4, nhead=8, num_decoder_layers=2):
        super().__init__()
        
        # 1. "로봇 포즈에 대해 질문하는" 학습 가능한 쿼리 토큰 생성
        self.pose_queries = nn.Parameter(torch.randn(1, num_queries, input_dim))
        
        # 2. PyTorch의 표준 Transformer Decoder 레이어 사용
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=input_dim, 
            nhead=nhead, 
            dim_feedforward=input_dim * 4, # 일반적인 설정
            dropout=0.1, 
            activation='gelu',
            batch_first=True  # (batch, seq, feature) 입력을 위함
        )
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
        
        # 3. 최종 각도 예측을 위한 MLP
        # 디코더를 거친 모든 쿼리 토큰의 정보를 사용
        self.angle_predictor = nn.Sequential(
            nn.LayerNorm(input_dim * num_queries),
            nn.Linear(input_dim * num_queries, 512),
            nn.GELU(),
            nn.LayerNorm(512),
            nn.Linear(512, 256),
            nn.GELU(),
            nn.LayerNorm(256),
            nn.Linear(256, num_angles)
        )

    def forward(self, fused_features):
        # fused_features: DINOv2의 패치 토큰들 (B, Num_Patches, Dim)
        # self.pose_queries: 학습 가능한 쿼리 (1, Num_Queries, Dim)
        
        # 배치 사이즈만큼 쿼리를 복제
        b = fused_features.size(0)
        queries = self.pose_queries.repeat(b, 1, 1)
        
        # Transformer Decoder 연산
        # 쿼리(queries)가 이미지 특징(fused_features)에 어텐션을 수행하여
        # 포즈와 관련된 정보로 자신의 값을 업데이트합니다.
        attn_output = self.transformer_decoder(tgt=queries, memory=fused_features)
        
        # 업데이트된 쿼리 토큰들을 하나로 펼쳐서 MLP에 전달
        output_flat = attn_output.flatten(start_dim=1)
        
        return self.angle_predictor(output_flat)

class MultiViewFusion(nn.Module):
    """
    Latent Query 기반의 Multi-view Fusion 모듈.
    """
    def __init__(self, feature_dim=FEATURE_DIM, num_heads=8, dropout=0.1, num_queries=16, num_layers=2):
        super().__init__()
        # 씬 전체의 정보를 요약할 학습 가능한 글로벌 쿼리
        self.global_queries = nn.Parameter(torch.randn(1, num_queries, feature_dim))
        
        # Cross-Attention + Self-Attention으로 구성된 Transformer Decoder 레이어
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=feature_dim, nhead=num_heads, dim_feedforward=feature_dim * 4,
            dropout=dropout, activation='gelu', batch_first=True
        )
        self.fusion_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

    def forward(self, view_features: list):
        # 1. 모든 뷰의 토큰들을 시퀀스 차원에서 하나로 합침
        all_view_tokens = torch.cat(view_features, dim=1)
        b = all_view_tokens.size(0)
        
        # 2. 배치 사이즈만큼 글로벌 쿼리 복제
        queries = self.global_queries.repeat(b, 1, 1)
        
        # 3. Decoder를 통해 쿼리가 모든 뷰의 정보를 요약하도록 함
        # 쿼리가 Key/Value인 all_view_tokens에 Cross-Attention을 수행하고,
        # 이후 쿼리들끼리 Self-Attention을 수행하며 정보를 정제함
        fused_queries = self.fusion_decoder(tgt=queries, memory=all_view_tokens)
        
        return fused_queries

class TokenFuser(nn.Module):
    """
    ViT의 패치 토큰(1D 시퀀스)을 CNN이 사용하기 좋은 2D 특징 맵으로 변환하고 정제합니다.
    """
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.projection = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.refine_blocks = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.GELU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )
        self.residual_conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
    def forward(self, x):
        # x: (B, D, H, W) 형태로 reshape된 토큰 맵
        projected = self.projection(x)
        refined = self.refine_blocks(projected)
        residual = self.residual_conv(x)
        return torch.nn.functional.gelu(refined + residual)

class LightCNNStem(nn.Module):
    def __init__(self):
        super().__init__()
        # 간단한 CNN 블록 구성
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1, bias=False), # 해상도 1/2
            nn.BatchNorm2d(16),
            nn.GELU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1, bias=False), # 해상도 1/4
            nn.BatchNorm2d(32),
            nn.GELU()
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1, bias=False), # 해상도 1/8
            nn.BatchNorm2d(64),
            nn.GELU()
        )
        
    def forward(self, x):
        # x: 원본 이미지 텐서 배치 (B, 3, H, W)
        feat_4 = self.conv_block1(x)  # 1/4 스케일 특징
        feat_8 = self.conv_block2(feat_4) # 1/8 스케일 특징
        return feat_4, feat_8 # 다른 해상도의 특징들을 반환

class FusedUpsampleBlock(nn.Module):
    """
    업샘플링된 특징과 CNN 스템의 고해상도 특징(스킵 연결)을 융합하는 블록.
    """
    def __init__(self, in_channels, skip_channels, out_channels):
        super().__init__()
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.refine_conv = nn.Sequential(
            nn.Conv2d(in_channels + skip_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.GELU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.GELU()
        )

    def forward(self, x, skip_feature):
        x = self.upsample(x)
        
        # ✅ 해결책: skip_feature를 x의 크기에 강제로 맞춥니다.
        # ----------------------------------------------------------------------
        # 두 텐서의 높이와 너비가 다를 경우, skip_feature를 x의 크기로 리사이즈합니다.
        if x.shape[-2:] != skip_feature.shape[-2:]:
            skip_feature = F.interpolate(
                skip_feature, 
                size=x.shape[-2:], # target H, W
                mode='bilinear', 
                align_corners=False
            )
        # ----------------------------------------------------------------------
        
        # 이제 두 텐서의 크기가 같아졌으므로 안전하게 합칠 수 있습니다.
        fused = torch.cat([x, skip_feature], dim=1)
        return self.refine_conv(fused)
    
class UNetViTKeypointHead(nn.Module):
    def __init__(self, input_dim=768, num_joints=7, heatmap_size=(128, 128)):
        super().__init__()
        self.heatmap_size = heatmap_size
        self.token_fuser = TokenFuser(input_dim, 256)
        self.decoder_block1 = FusedUpsampleBlock(in_channels=256, skip_channels=64, out_channels=128)
        self.decoder_block2 = FusedUpsampleBlock(in_channels=128, skip_channels=32, out_channels=64)
        self.final_upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.heatmap_predictor = nn.Conv2d(64, num_joints, kernel_size=3, padding=1)

    def forward(self, dino_features, cnn_features):
        cnn_feat_4, cnn_feat_8 = cnn_features

        # 1. DINOv3 토큰을 표준 ViT 패치 개수인 196개로 잘라내고 2D 맵으로 변환
        num_patches_to_keep = 196
        dino_features_sliced = dino_features[:, :num_patches_to_keep, :]
        
        b, n, d = dino_features_sliced.shape
        h = w = int(n**0.5)
        x = dino_features_sliced.permute(0, 2, 1).reshape(b, d, h, w)

        x = self.token_fuser(x)

        # 2. 디코더 업샘플링 & 융합
        x = self.decoder_block1(x, cnn_feat_8)
        x = self.decoder_block2(x, cnn_feat_4)
        
        # 3. 최종 해상도로 업샘플링 및 예측
        x = self.final_upsample(x)
        heatmaps = self.heatmap_predictor(x)
        
        return F.interpolate(heatmaps, size=self.heatmap_size, mode='bilinear', align_corners=False)
    
class DINOv3PoseEstimator(nn.Module):
    """
    [전체 아키텍처 (Overall Architecture)]
    Multi-view 이미지들을 입력받아, 각 뷰의 특징을 추출하고 융합하여
    하나의 통합된 관절 각도(global pose)와 각 뷰에 대한 키포인트 히트맵(local keypoints)을 예측합니다.
    """
    def __init__(self, model_name=MODEL_NAME, num_joints=NUM_JOINTS, num_angles=NUM_ANGLES, max_views=10):
        super().__init__()
        
        # 1. 백본: 고차원 의미 정보 추출
        self.backbone = DINOv3Backbone(model_name)
        feature_dim = self.backbone.model.config.hidden_size
        
        # ▼▼▼ [수정 1] 뷰 임베딩을 유연하게 처리하기 위한 변경 ▼▼▼
        # 최대 처리 가능한 카메라(뷰) 개수를 기반으로 임베딩 레이어 생성
        self.view_embeddings = nn.Embedding(max_views, feature_dim)
        
        # forward 시점에 뷰 이름/시리얼과 인덱스를 동적으로 매핑할 딕셔너리
        self.view_to_idx = {} 
        self.next_view_idx = 0

        # 3. CNN 스템: 저차원 공간 정보 추출
        self.cnn_stem = LightCNNStem()
        
        # 4. 퓨전 모듈: 모든 뷰의 정보를 하나의 전역 요약 정보로 압축
        self.fusion_module = MultiViewFusion(feature_dim=feature_dim)
        
        # 5. 헤드 (예측기)
        self.angle_head = JointAngleHead(input_dim=feature_dim, num_angles=num_angles, num_queries=16)
        self.keypoint_head = UNetViTKeypointHead(input_dim=feature_dim, num_joints=num_joints)
        self.keypoint_enricher = nn.TransformerDecoderLayer(
            d_model=feature_dim, nhead=8, dim_feedforward=feature_dim * 4,
            dropout=0.1, activation='gelu', batch_first=True
        )

    def forward(self, multi_view_images: dict):
        all_dino_features_with_embed = []
        all_cnn_features = {}
        view_keys_ordered = list(multi_view_images.keys())

        # --- Step 1: 각 뷰에 대한 병렬 특징 추출 ---
        for view_key in view_keys_ordered: # key는 'front' 또는 '41182735' 등이 될 수 있음
            view_tensor = multi_view_images[view_key]
            dino_features = self.backbone(view_tensor)
            
            # ▼▼▼ [수정 2] 동적 뷰 인덱싱 로직 ▼▼▼
            # 이전에 보지 못한 뷰(카메라)라면, 새로운 인덱스를 할당합니다.
            if view_key not in self.view_to_idx:
                if self.next_view_idx >= self.view_embeddings.num_embeddings:
                    raise ValueError(f"Exceeded maximum number of views ({self.view_embeddings.num_embeddings}).")
                self.view_to_idx[view_key] = self.next_view_idx
                self.next_view_idx += 1
            
            view_idx = self.view_to_idx[view_key]
            
            # 해당 인덱스의 임베딩 벡터를 DINO 특징에 더해줍니다.
            embedding = self.view_embeddings(
                torch.tensor([view_idx], device=dino_features.device)
            ).unsqueeze(0)
            all_dino_features_with_embed.append(dino_features + embedding)
            
            all_cnn_features[view_key] = self.cnn_stem(view_tensor)

        # --- Step 2: Multi-view 정보 융합 ---
        # Latent Query를 통해 모든 뷰의 DINO 특징을 'fused_queries'라는 전역 정보로 요약
        fused_queries = self.fusion_module(all_dino_features_with_embed)
        
        # --- Step 3: 관절 각도 예측 ---
        # 요약된 전역 정보로부터 직접 관절 각도를 예측
        predicted_angles = self.angle_head(fused_queries)
        
        # --- Step 4: 키포인트 히트맵 예측 ---
        predicted_heatmaps_dict = {}
        for i, view_name in enumerate(view_names_ordered):
            # 4a. Keypoint Enricher: i번째 뷰의 특징(tgt)이 전역 요약 정보(memory)를 참고하여 스스로를 보강
            enriched_tokens = self.keypoint_enricher(
                tgt=all_dino_features_with_embed[i], 
                memory=fused_queries
            )
            # 4b. 보강된 토큰과 해당 뷰의 CNN 공간 특징을 이용해 최종 히트맵 예측
            heatmap = self.keypoint_head(enriched_tokens, all_cnn_features[view_name])
            predicted_heatmaps_dict[view_name] = heatmap
        
        return predicted_heatmaps_dict, predicted_angles