# Dinov3

In [1]:
import cv2 
import numpy as np
import torch
from models.pose import DINOv3Pose

def preprocess(image_input, device='cuda'):
    """이미지 로드, 리사이징, 패딩, 정규화"""
    # 1. 입력 처리 (경로 or Tensor)
    if isinstance(image_input, str):
        img = cv2.imread(image_input)
        if img is None:
            raise ValueError(f"Image not found: {image_input}")
    elif isinstance(image_input, np.ndarray):
        img = image_input
    else:
        raise TypeError("Input must be a file path(str) or numpy array")

    h0, w0 = img.shape[:2]
    
    # 2. Resize (비율 유지)
    target_size = 640
    scale = min(target_size / h0, target_size / w0)
    h, w = int(h0 * scale), int(w0 * scale)
    
    img_resized = cv2.resize(img, (w, h))
    
    # 3. Padding (32의 배수로 맞춤)
    pad_h = (32 - h % 32) % 32
    pad_w = (32 - w % 32) % 32
    
    # Right, Bottom 방향으로만 패딩 추가
    img_padded = cv2.copyMakeBorder(
        img_resized, 0, pad_h, 0, pad_w, 
        cv2.BORDER_CONSTANT, value=(114, 114, 114)
    )
    
    # 4. To Tensor
    img_tensor = torch.from_numpy(img_padded).permute(2, 0, 1).float() / 255.0
    img_tensor = img_tensor.unsqueeze(0).to(device)
    
    # 복원용 메타 데이터
    meta = {
        'orig_shape': (h0, w0),
        'scale': scale,
        'pad': (0, pad_h, 0, pad_w) # top, bottom, left, right
    }
    
    return img_tensor, img, meta

model = DINOv3Pose('dinov3_convnext_small', finetuning=True)
img = preprocess('./examples/00017.png')[0]
result = model(img)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import os
import glob
import numpy as np
from tqdm import tqdm

def get_sorted_indices(kpts_xy):
    """
    키포인트 좌표(x, y)를 받아서 정렬된 인덱스를 반환합니다.
    규칙:
    1. 중심점 기준 시계 방향 정렬
    2. 시작점은 '중심보다 오른쪽에 있는 점 중 가장 위에 있는 점(Y가 작은 값 or 큰 값)'
       (주의: 이미지 좌표계에서는 Y가 0일수록 '위'입니다. 아래 코드는 Y가 작을수록 위라고 가정합니다.)
       (만약 일반 수학 좌표계라면 argmin을 argmax로 바꾸세요.)
    """
    pts = np.array(kpts_xy)
    if len(pts) == 0:
        return []

    center = np.mean(pts, axis=0)
    
    # 1. 각도 계산 (이미지 좌표계: y가 아래로 증가하므로 y축 반전 고려)
    # 이미지 상에서 시계방향: 12시 -> 3시 -> 6시
    # 일반 수학 좌표계(y상승) arctan2: 반시계.
    # 이미지 좌표계(y하강)에서 arctan2(y, x)를 쓰면:
    # y가 양수(아래)일수록 각도가 커짐 -> 시계방향과 유사하게 작동
    angles = np.arctan2(pts[:, 1] - center[1], pts[:, 0] - center[0])
    
    # 각도 기준 정렬
    sorted_indices = np.argsort(angles)
    sorted_pts = pts[sorted_indices]
    
    # 2. 시작점(Pivot) 찾기: 중심보다 오른쪽(x > cx) 이면서 가장 위(y가 최소)
    # YOLO 정규화 좌표는 위쪽이 0, 아래쪽이 1입니다. 따라서 '가장 위' = 'y값이 최소'
    rel_x = sorted_pts[:, 0] - center[0]
    
    # 오른쪽 점들 마스크
    right_mask = rel_x > 0
    if not np.any(right_mask):
        right_mask = np.ones(len(sorted_pts), dtype=bool) # 예외: 오른쪽에 점이 없으면 전체 대상

    # 후보 인덱스 (sorted_pts 기준)
    candidate_local_indices = np.where(right_mask)[0]
    
    # 후보 중 Y값이 가장 작은 점 (이미지 상 가장 위)
    candidate_y_values = sorted_pts[candidate_local_indices, 1]
    best_candidate_idx = np.argmin(candidate_y_values) # <--- 중요: 이미지 좌표계는 min이 위쪽
    
    # 회전해야 할 양 (Shift amount)
    pivot_index = candidate_local_indices[best_candidate_idx]
    
    # 인덱스 회전
    final_indices = np.roll(sorted_indices, -pivot_index)
    
    return final_indices

def process_yolo_labels(source_dir, target_dir, nkpts=4, dim=3):
    """
    source_dir: 원본 txt 파일 경로
    target_dir: 저장할 경로
    nkpts: 키포인트 개수 (예: 4)
    dim: 키포인트 당 값의 개수 (x,y,v면 3 / x,y면 2)
    """
    os.makedirs(target_dir, exist_ok=True)
    txt_files = glob.glob(os.path.join(source_dir, "*.txt"))
    
    print(f"Processing {len(txt_files)} files...")
    
    for txt_path in tqdm(txt_files):
        filename = os.path.basename(txt_path)
        save_path = os.path.join(target_dir, filename)
        
        new_lines = []
        
        with open(txt_path, 'r') as f:
            lines = f.readlines()
            
        for line in lines:
            parts = line.strip().split()
            if len(parts) < 5: # 데이터가 없거나 빈 줄
                continue
                
            # 1. 기본 정보 분리 (cls, cx, cy, w, h)
            cls_idx = parts[0]
            bbox = parts[1:5] # cx, cy, w, h
            
            # 2. 키포인트 데이터 파싱
            kpt_data = np.array(parts[5:], dtype=np.float32)
            
            # 데이터 개수가 맞는지 확인
            if len(kpt_data) == nkpts * dim:
                # (N, dim) 형태로 리쉐이프. 예: [[x1,y1,v1], [x2,y2,v2], ...]
                kpts_reshaped = kpt_data.reshape(nkpts, dim)
                
                # 정렬을 위한 XY 좌표 추출
                kpts_xy = kpts_reshaped[:, :2]
                
                # 3. 정렬 인덱스 계산
                sorted_idx = get_sorted_indices(kpts_xy)
                
                # 4. 데이터 재배열 (Visibility 포함 전체 이동)
                kpts_sorted = kpts_reshaped[sorted_idx]
                
                # 5. 다시 1줄짜리 문자열로 변환
                kpts_flat = kpts_sorted.flatten()
                kpts_str = " ".join([f"{x:.6f}" for x in kpts_flat]) # 소수점 6자리 포맷팅
                
                # bbox 정보와 합치기
                bbox_str = " ".join(bbox)
                new_line = f"{cls_idx} {bbox_str} {kpts_str}\n"
                new_lines.append(new_line)
            else:
                # 키포인트 개수가 안 맞으면 건드리지 않고 원본 유지 (혹은 에러처리)
                print(f"Warning: {filename} has invalid keypoint length. Skipping line.")
                new_lines.append(line)
        
        # 저장
        with open(save_path, 'w') as f:
            f.writelines(new_lines)

    print("Done! Sorted labels are saved in:", target_dir)

# ==================================================
# 실행 설정
# ==================================================
source_folder = '/media/otter/otterHD/AXData/TotalAX/TotalAX/valid/labels'
target_folder = '/media/otter/otterHD/AXData/TotalAX/TotalAX/valid/labels_sorted'

# 내 데이터 설정에 맞게 수정하세요
NUM_KEYPOINTS = 4  # 점의 개수
DIM_PER_POINT = 2  # 점 하나당 값의 개수 (x,y,v = 3 / x,y = 2)

if __name__ == '__main__':
    process_yolo_labels(source_folder, target_folder, nkpts=NUM_KEYPOINTS, dim=DIM_PER_POINT)

Processing 3736 files...


100%|██████████| 3736/3736 [00:01<00:00, 3632.00it/s]

Done! Sorted labels are saved in: /media/otter/otterHD/AXData/TotalAX/TotalAX/valid/labels_sorted





In [31]:
import torch 
import torch.nn.functional as F
from models.backbones.dinov3convnext import Dinov3ConvNext, convnext_sizes

convnext_ckps = {
    'tiny': './checkpoints/dinov3/convnext/dinov3_convnext_tiny_pretrain_lvd1689m-21b726bb.pth', 
    'small': './checkpoints/dinov3/convnext/dinov3_convnext_small_pretrain_lvd1689m-296db49d.pth', 
    'base': './checkpoints/dinov3/convnext/dinov3_convnext_base_pretrain_lvd1689m-801f2ba9.pth', 
    'large': './checkpoints/dinov3/convnext/dinov3_convnext_large_pretrain_lvd1689m-61fa432d.pth'}

vit_ckps = {
    'small': './checkpoints/dinov3/vit/dinov3_vits16_pretrain_lvd1689m-08c60483.pth', 
    'base': './checkpoints/dinov3/vit/dinov3_vit7b16_pretrain_lvd1689m-a955f4ea.pth', 
    'large': None}

model_size = 'small'

model = Dinov3ConvNext(
    depths=convnext_sizes[model_size]["depths"],
    dims=convnext_sizes[model_size]["dims"],    
    weights=convnext_ckps[model_size],
    )
inputs = torch.randn(3, 3, 640, 640)
feature_list = model.forward_features([inputs], [None])
output = model(inputs)
print(output.shape)
feature_list['x_norm_patchtokens'].shape

# model = Dinov3ViT(
#     patch_size=vit_sizes[model_size]["patch_size"],
#     embed_dim=vit_sizes[model_size]["embed_dim"],
#     depth=vit_sizes[model_size]["depth"],
#     num_heads=vit_sizes[model_size]["num_heads"],
#     ffn_ratio=vit_sizes[model_size]["ffn_ratio"],
#     weights=vit_ckps[model_size],
#     )
inputs = torch.randn(3, 3, 640, 640)
feature_list = model.forward_features_list([inputs], [None])

torch.Size([3, 768])


In [33]:
for i in range(1, 4):
    print(feature_list[i].shape)

torch.Size([3, 192, 80, 80])
torch.Size([3, 384, 40, 40])
torch.Size([3, 768, 20, 20])


# Meta Space

In [None]:
import torch
import torch 
import torch.nn.functional as F
from models.backbones import Dinov3ViT, vit_sizes
from models.modules import MetaSpace

vit_ckps = {
    'small': './checkpoints/dinov3/vit/dinov3_vits16_pretrain_lvd1689m-08c60483.pth', 
    'base': './checkpoints/dinov3/vit/dinov3_vit7b16_pretrain_lvd1689m-a955f4ea.pth', 
    'large': None}

model_size = 'small'

# ===== Usage Example =====
model = Dinov3ViT(
    patch_size=vit_sizes[model_size]["patch_size"],
    embed_dim=vit_sizes[model_size]["embed_dim"],
    depth=vit_sizes[model_size]["depth"],
    num_heads=vit_sizes[model_size]["num_heads"],
    ffn_ratio=vit_sizes[model_size]["ffn_ratio"],
    weights=vit_ckps[model_size],
    )
print("=" * 60)
print("MetaSpace with Keypoint Features - Example")
print("=" * 60)

# Hyperparameters
batch_size = 4
num_kpts = 17  # e.g., human pose keypoints
original_size = (256, 256)

# Multi-scale feature maps (e.g., from backbone)
feature_maps = [
    torch.randn(batch_size, 256, 64, 64),   # Level 0: 1/4 scale
    torch.randn(batch_size, 512, 32, 32),   # Level 1: 1/8 scale
    torch.randn(batch_size, 1024, 16, 16),  # Level 2: 1/16 scale
]

# Keypoints in original image coordinates
keypoints = torch.rand(batch_size, num_kpts, 2) * 256  # Random [0, 256]

# Valid mask (e.g., some keypoints are occluded)
valid_mask = torch.rand(batch_size, num_kpts) > 0.2

# Initialize MetaSpace
meta_space = MetaSpace(
    original_size=original_size,
    feature_dims=[256, 512, 1024],
    num_kpts=num_kpts,
    num_heads=8,
    momentum=0.9
)

print("\n1. Initial forward pass (training mode):")
meta_space.train()
fused_features = meta_space(feature_maps, keypoints, valid_mask)

for i, feats in enumerate(fused_features):
    print(f"   Level {i}: {feats.shape}")

print("\n2. Update meta spaces:")
meta_space.update_meta_spaces()
print("   Meta spaces updated with accumulated features")

print("\n3. Check meta space statistics:")
for i, meta in enumerate(meta_space.meta_spaces):
    print(f"   Level {i} meta space: {meta.shape}")
    print(f"      Mean: {meta.mean().item():.4f}, Std: {meta.std().item():.4f}")

print("\n4. Inference mode:")
meta_space.eval()
with torch.no_grad():
    fused_features_eval = meta_space(feature_maps, keypoints)
print("   Features fused without accumulation")

print("\n" + "=" * 60)
print("주요 기능:")
print("- Multi-scale keypoint feature extraction")
print("- Gaussian pooling for robust local features")
print("- EMA-based meta feature learning")
print("- Gated attention fusion")
print("- Valid mask support for occluded keypoints")
print("=" * 60)

MetaSpace with Keypoint Features - Example

1. Initial forward pass (training mode):
   Level 0: torch.Size([4, 17, 256])
   Level 1: torch.Size([4, 17, 512])
   Level 2: torch.Size([4, 17, 1024])

2. Update meta spaces:
   Meta spaces updated with accumulated features

3. Check meta space statistics:
   Level 0 meta space: torch.Size([17, 256])
      Mean: 0.0005, Std: 0.0774
   Level 1 meta space: torch.Size([17, 512])
      Mean: -0.0006, Std: 0.0565
   Level 2 meta space: torch.Size([17, 1024])
      Mean: -0.0000, Std: 0.0411

4. Inference mode:
   Features fused without accumulation

주요 기능:
- Multi-scale keypoint feature extraction
- Gaussian pooling for robust local features
- EMA-based meta feature learning
- Gated attention fusion
- Valid mask support for occluded keypoints


# FSKD

In [None]:
import torch
import torch.nn as nn
from typing import List, Dict, Optional

from models.backbones.dinov3vit import Dinov3ViT, vit_sizes

class FSKD(nn.Module):
    def __init__(
            self,
            nkpts: int,
            backbone: str = 'base',
            pretrained: bool = True,
        ):
        super(FSKD, self).__init__()
        backbone_cfg = vit_sizes[backbone]
        embed_dim = backbone_cfg['embed_dim']

        self.backbone = Dinov3ViT        (
            patch_size=backbone_cfg["patch_size"],
            embed_dim=embed_dim,
            depth=backbone_cfg["depth"],
            num_heads=backbone_cfg["num_heads"],
            ffn_ratio=backbone_cfg["ffn_ratio"],
            pretrained=pretrained,
        )

        self.neck = nn.Identity()
        self.head = nn.Linear(embed_dim, nkpts * 2)

    def forward_features(
            self,
            x: torch.Tensor,
            masks: Optional[torch.Tensor] = None
        ) -> torch.Tensor:
        _, all_xes = self.backbone.forward_features_list([x], [masks])
        
        last_block_features = all_xes[-1][0]
        cls_token = last_block_features[:, 0]

        pose_feature = self.neck(cls_token)
        result = self.head(pose_feature)
        return result

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        result = self.forward_features(x)
        return result


In [None]:
import torch 

# Autocast

In [None]:
import torch.amp as amp 
device = 'cuda' if torch.cuda.is_available() else 'cpu'
amp.autocast(device, enabled=False)

<torch.amp.autocast_mode.autocast at 0x76d12b671e10>

# DeepPose

In [None]:
import timm
import timm.models.resnet as resnet

In [51]:
from timm.models.resnet import Bottleneck
from timm.models.resnet import BasicBlock
from timm.models.helpers import load_pretrained, resolve_pretrained_cfg

resnet_args = {
    'resnet18': dict(block=BasicBlock, layers=(2, 2, 2, 2)),
    'resnet34': dict(block=BasicBlock, layers=(3, 4, 6, 3)),
    'resnet50': dict(block=Bottleneck, layers=(3, 4, 6, 3)),
    'resnet50s': dict(block=Bottleneck, layers=(3, 4, 6, 3), stem_width=64, stem_type='deep'),
    'resnet50t': dict(block=Bottleneck, layers=(3, 4, 6, 3), stem_width=32, stem_type='deep_tiered', avg_down=True),
    'resnet101': dict(block=Bottleneck, layers=(3, 4, 23, 3))
}

resnet_ckps = {
    'resnet18': './checkpoints/resnet/resnet18-5c106cde.pth',
    'resnet34': './checkpoints/resnet/resnet34-333f7ec4.pth',
    'resnet50': './checkpoints/resnet/resnet50-0676ba61.pth',
    'resnet50s': './checkpoints/resnet/resnet50s-3cf99910.pth',
    'resnet50t': './checkpoints/resnet/resnet50t-1f8793b8.pth',
    'resnet101': './checkpoints/resnet/resnet101-5d3b4d8f.pth'
}

class DeepPose(timm.models.ResNet):
    def __init__(self, num_joints=8, backbone='resnet50', checkpoint=None, download=False, **kwargs):
        model_args = resnet_args.get(backbone)
        super(DeepPose, self).__init__(**dict(model_args, **kwargs))
        features = False

        # resolve and update model pretrained config and model kwargs
        pretrained_cfg = resolve_pretrained_cfg(
            backbone,
            pretrained_cfg=None,
            pretrained_cfg_overlay=None
        )
        pretrained_cfg = pretrained_cfg.to_dict()
        
        # For classification models, check class attr, then kwargs, then default to 1k, otherwise 0 for feats
        num_classes_pretrained = 0 if features else getattr(self, 'num_classes', kwargs.get('num_classes', 1000))
        if download:
            checkpoint = load_pretrained(
                timm.models.ResNet,
                pretrained_cfg=pretrained_cfg,
                num_classes=num_classes_pretrained,
                in_chans=kwargs.get('in_chans', 3),
                filter_fn=None,
                strict=None,
                cache_dir=None,
                )
        print(checkpoint)
        if checkpoint:
            self.load_state_dict(torch.load(checkpoint))

        self.njoints = num_joints
        self.fc = nn.Linear(self.fc.in_features, self.njoints*2)

In [52]:
if __name__=='__main__':
    import torch 

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = DeepPose(8, 'resnet18', download=True)
    model = model.to(device)

    random_noise = torch.randn((32, 3, 224, 224)).to(device)
    result = model(random_noise)

    print(f'INPUT SIZE : {random_noise.shape}')
    print(f'OUTPUT SIZE : {result.shape}')

TypeError: Module.load_state_dict() missing 1 required positional argument: 'state_dict'

# FCMAE

In [15]:
import torch
import torch.nn as nn
import spconv.pytorch as spconv

class SpconvGRN(nn.Module):
    """ GRN layer for spconv tensors.
    """
    def __init__(self, dim):
        super().__init__()
        # 파라미터 정의는 동일합니다.
        self.gamma = nn.Parameter(torch.zeros(1, dim))
        self.beta = nn.Parameter(torch.zeros(1, dim))

    def forward(self, x: spconv.SparseConvTensor):
        # 1. spconv 텐서에서 Dense Feature(N, C)를 꺼냅니다.
        # MinkowskiEngine의 x.F와 동일합니다.
        features = x.features 

        # 2. GRN 연산 수행 (PyTorch 연산이므로 완전히 동일합니다)
        # 주의: dim=0은 배치 내 모든 포인트에 대해 통계를 집계합니다. (원본 코드 동작 유지)
        Gx = torch.norm(features, p=2, dim=0, keepdim=True)
        Nx = Gx / (Gx.mean(dim=-1, keepdim=True) + 1e-6)
        
        # 공식: gamma * (X * Nx) + beta + X
        out_features = self.gamma * (features * Nx) + self.beta + features

        # 3. spconv 방식의 반환
        # 새로운 텐서를 밑바닥부터 만들지 않고, 
        # 기존 텐서(x)의 구조(좌표, shape 등)는 유지하되 피처만 갈아끼웁니다.
        return x.replace_feature(out_features)
    

class SpconvLayerNorm(nn.Module):
    """ Channel-wise layer normalization for spconv tensors.
    """

    def __init__(
        self,
        normalized_shape,
        eps=1e-6,
    ):
        super(SpconvLayerNorm, self).__init__()
        # nn.LayerNorm은 (N, C) 2D 텐서를 잘 처리하므로 그대로 씁니다.
        self.ln = nn.LayerNorm(normalized_shape, eps=eps)

    def forward(self, input: spconv.SparseConvTensor):
        # 1. spconv 텐서에서 피처 추출 (.F -> .features)
        output_features = self.ln(input.features)
        
        # 2. 좌표 등 구조는 유지하고 피처만 교체하여 반환
        return input.replace_feature(output_features)

In [1]:
import torch
import torch.nn as nn
import spconv.pytorch as spconv

# --------------------------------------------------------
# 1. 작성하신 클래스 정의 (위에서 만드신 코드)
# --------------------------------------------------------
class SpconvGRN(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.gamma = nn.Parameter(torch.zeros(1, dim))
        self.beta = nn.Parameter(torch.zeros(1, dim))

    def forward(self, x: spconv.SparseConvTensor):
        features = x.features 
        # dim=0: 모든 복셀(N)에 대해 통계 집계 (Global Context)
        Gx = torch.norm(features, p=2, dim=0, keepdim=True)
        Nx = Gx / (Gx.mean(dim=-1, keepdim=True) + 1e-6)
        out_features = self.gamma * (features * Nx) + self.beta + features
        return x.replace_feature(out_features)

class SpconvLayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-6):
        super(SpconvLayerNorm, self).__init__()
        self.ln = nn.LayerNorm(normalized_shape, eps=eps)

    def forward(self, input: spconv.SparseConvTensor):
        output_features = self.ln(input.features)
        return input.replace_feature(output_features)

# --------------------------------------------------------
# 2. 실행 예시 (Input 만들기 & Forward)
# --------------------------------------------------------

def main():
    # 설정 값
    batch_size = 2
    in_channels = 32
    spatial_shape = [100, 100, 100] # [Z, Y, X] 크기

    # 1) 좌표 (Indices) 생성 - [Batch_idx, Z, Y, X]
    # 주의: spconv는 반드시 **int32** 타입을 사용해야 합니다!
    indices = torch.tensor([
        [0, 10, 20, 30],  # Batch 0번의 점
        [0, 10, 20, 31],  # Batch 0번의 점 (바로 옆)
        [1, 50, 50, 50],  # Batch 1번의 점
        [1, 51, 51, 51],  # Batch 1번의 점
    ], dtype=torch.int32).cuda() # GPU 사용 시 .cuda() 필수

    # 2) 피처 (Features) 생성 - (N, C)
    # 점이 4개이므로 N=4
    features = torch.randn(4, in_channels).cuda()

    # 3) SparseConvTensor 생성
    # ME와 달리 spatial_shape와 batch_size를 꼭 넣어줘야 합니다.
    input_tensor = spconv.SparseConvTensor(
        features=features,
        indices=indices,
        spatial_shape=spatial_shape,
        batch_size=batch_size
    )

    print(f"Input Features:\n{input_tensor.features[0][:5]}... (Shape: {input_tensor.features.shape})")
    print("-" * 50)

    # 4) LayerNorm 테스트
    ln_layer = SpconvLayerNorm(in_channels).cuda()
    out_ln = ln_layer(input_tensor)
    
    print("running LayerNorm...")
    print(f"LN Output:\n{out_ln.features[0][:5]}... (Shape: {out_ln.features.shape})")
    print("-" * 50)

    # 5) GRN 테스트
    grn_layer = SpconvGRN(in_channels).cuda()
    out_grn = grn_layer(out_ln)

    print("running GRN...")
    print(f"GRN Output:\n{out_grn.features[0][:5]}... (Shape: {out_grn.features.shape})")
    
    # 6) 구조가 유지되었는지 확인 (Indices가 바뀌면 안됨)
    print("-" * 50)
    print("Structure Check:")
    is_same = torch.equal(input_tensor.indices, out_grn.indices)
    print(f"Indices preserved? {is_same}")

if __name__ == "__main__":
    # CUDA가 사용 가능할 때만 실행 (spconv는 기본적으로 CUDA 의존성이 강함)
    if torch.cuda.is_available():
        main()
    else:
        print("CUDA device not found. spconv requires GPU.")

Input Features:
tensor([-0.2196,  0.0146,  0.2861,  0.3721, -0.8692], device='cuda:0')... (Shape: torch.Size([4, 32]))
--------------------------------------------------
running LayerNorm...
LN Output:
tensor([-0.1184,  0.0743,  0.2975,  0.3683, -0.6527], device='cuda:0',
       grad_fn=<SliceBackward0>)... (Shape: torch.Size([4, 32]))
--------------------------------------------------
running GRN...
GRN Output:
tensor([-0.1184,  0.0743,  0.2975,  0.3683, -0.6527], device='cuda:0',
       grad_fn=<SliceBackward0>)... (Shape: torch.Size([4, 32]))
--------------------------------------------------
Structure Check:
Indices preserved? True


In [12]:
import spconv.pytorch as spconv
import spconv.pytorch.quantization as spconvq
from spconv.pytorch.quantization import get_default_spconv_trt_ptq_qconfig
from spconv.pytorch.quantization.core import quantize_per_tensor
from spconv.pytorch.quantization.fake_q import \
    get_default_spconv_qconfig_mapping
from spconv.pytorch.quantization.intrinsic.modules import SpconvBnAddReLUNd, SpconvAddReLUNd
import spconv.pytorch.quantization.intrinsic.quantized as snniq

In [16]:
# --------------------------------------------------------
class SpconvGRN(spconv.SparseModule):
    def __init__(self, dim):
        super().__init__()
        self.gamma = nn.Parameter(torch.zeros(1, dim))
        self.beta = nn.Parameter(torch.zeros(1, dim))

    def forward(self, x: spconv.SparseConvTensor):
        features = x.features 
        # dim=0: 모든 복셀(N)에 대해 통계 집계 (Global Context)
        Gx = torch.norm(features, p=2, dim=0, keepdim=True)
        Nx = Gx / (Gx.mean(dim=-1, keepdim=True) + 1e-6)
        out_features = self.gamma * (features * Nx) + self.beta + features
        return out_features

class SpconvLayerNorm(spconv.SparseModule):
    def __init__(self, normalized_shape, eps=1e-6):
        super(SpconvLayerNorm, self).__init__()
        self.ln = nn.LayerNorm(normalized_shape, eps=eps)

    def forward(self, input: spconv.SparseConvTensor):
        output_features = self.ln(input.features)
        return output_features

In [None]:
import torch.nn as nn
nn.Linear()

In [3]:
import torch
import torch.nn as nn
import spconv.pytorch as spconv
from timm.models.layers import trunc_normal_

# 필요한 helper 함수들
def to_sparse(x):
    """Dense tensor를 sparse tensor로 변환"""
    batch_size, in_channels, H, W = x.shape
    
    # Non-zero 위치 찾기
    mask = (x.abs().sum(dim=1) > 1e-6)  # [B, H, W]
    
    indices_list = []
    features_list = []
    
    for b in range(batch_size):
        nz_indices = torch.nonzero(mask[b], as_tuple=False)  # [N, 2] (y, x)
        
        if len(nz_indices) > 0:
            batch_indices = torch.full((len(nz_indices), 1), b, dtype=torch.int32, device=x.device)
            batch_nz_indices = torch.cat([batch_indices, nz_indices.int()], dim=1)
            indices_list.append(batch_nz_indices)
            
            y_coords = nz_indices[:, 0]
            x_coords = nz_indices[:, 1]
            feats = x[b, :, y_coords, x_coords].T  # [N, C]
            features_list.append(feats)
    
    if len(indices_list) > 0:
        indices = torch.cat(indices_list, dim=0)
        features = torch.cat(features_list, dim=0)
    else:
        indices = torch.zeros((0, 3), dtype=torch.int32, device=x.device)
        features = torch.zeros((0, in_channels), dtype=x.dtype, device=x.device)
    
    sparse_tensor = spconv.SparseConvTensor(
        features=features,
        indices=indices,
        spatial_shape=[H, W],
        batch_size=batch_size
    )
    
    return sparse_tensor


class LayerNorm(nn.Module):
    """LayerNorm for channels_first format"""
    def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.bias = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps
        self.data_format = data_format
        self.normalized_shape = (normalized_shape,)
    
    def forward(self, x):
        if self.data_format == "channels_last":
            return nn.functional.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps)
        elif self.data_format == "channels_first":
            u = x.mean(1, keepdim=True)
            s = (x - u).pow(2).mean(1, keepdim=True)
            x = (x - u) / torch.sqrt(s + self.eps)
            x = self.weight[:, None, None] * x + self.bias[:, None, None]
            return x


class SpLayerNorm(nn.Module):
    """Sparse LayerNorm"""
    def __init__(self, normalized_shape, eps=1e-6):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.bias = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps
        self.normalized_shape = (normalized_shape,)
    
    def forward(self, x):
        if isinstance(x, spconv.SparseConvTensor):
            features = x.features
            features = nn.functional.layer_norm(features, self.normalized_shape, self.weight, self.bias, self.eps)
            return x.replace_feature(features)
        else:
            return nn.functional.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps)


class SparseGELU(nn.Module):
    """Sparse GELU activation"""
    def __init__(self, approximate='none'):
        super().__init__()
        self.gelu = nn.GELU(approximate=approximate)
    
    def forward(self, x):
        if isinstance(x, spconv.SparseConvTensor):
            new_features = self.gelu(x.features)
            return x.replace_feature(new_features)
        else:
            return self.gelu(x)


class SpGRN(nn.Module):
    """Sparse Global Response Normalization"""
    def __init__(self, dim):
        super().__init__()
        self.gamma = nn.Parameter(torch.zeros(1, dim))
        self.beta = nn.Parameter(torch.zeros(1, dim))
    
    def forward(self, x):
        if isinstance(x, spconv.SparseConvTensor):
            features = x.features  # [N, C]
            # L2 norm per sample
            gx = torch.norm(features, p=2, dim=1, keepdim=True)  # [N, 1]
            nx = gx / (gx.mean(dim=0, keepdim=True) + 1e-6)
            features = self.gamma * (features * nx) + self.beta + features
            return x.replace_feature(features)
        else:
            return x


class SpDropPath(nn.Module):
    """Sparse Drop Path"""
    def __init__(self, drop_prob=0.):
        super().__init__()
        self.drop_prob = drop_prob
    
    def forward(self, x):
        if self.drop_prob == 0. or not self.training:
            return x
        
        if isinstance(x, spconv.SparseConvTensor):
            keep_prob = 1 - self.drop_prob
            random_tensor = keep_prob + torch.rand((x.features.shape[0], 1), 
                                                   dtype=x.features.dtype, 
                                                   device=x.features.device)
            random_tensor.floor_()  # binarize
            features = x.features / keep_prob * random_tensor
            return x.replace_feature(features)
        else:
            return x


class EfficientSparseDepthwiseConv2d(nn.Module):
    """효율적인 Sparse Depthwise Convolution"""
    def __init__(self, channels, kernel_size=7, padding=3, bias=True):
        super().__init__()
        self.channels = channels
        self.kernel_size = kernel_size
        self.padding = padding
        
        # 7x7 depthwise를 위한 weight
        self.weight = nn.Parameter(torch.randn(channels, kernel_size * kernel_size))
        nn.init.kaiming_normal_(self.weight)
        
        if bias:
            self.bias = nn.Parameter(torch.zeros(channels))
        else:
            self.register_parameter('bias', None)
        
        self.unfold = nn.Unfold(kernel_size=kernel_size, padding=padding)
    
    def forward(self, x):
        if not isinstance(x, spconv.SparseConvTensor):
            raise TypeError("Input must be SparseConvTensor")
        
        # Dense로 변환
        dense = x.dense()  # [B, C, H, W]
        B, C, H, W = dense.shape
        
        # Unfold: [B, C*K*K, H*W]
        unfolded = self.unfold(dense)
        unfolded = unfolded.view(B, C, -1, H*W)  # [B, C, K*K, H*W]
        
        # Depthwise
        weight = self.weight.view(1, C, -1, 1)
        out = (unfolded * weight).sum(dim=2)  # [B, C, H*W]
        out = out.view(B, C, H, W)
        
        if self.bias is not None:
            out = out + self.bias.view(1, -1, 1, 1)
        
        # 다시 sparse로 변환
        return to_sparse(out)


class SpBlock(nn.Module):
    """Sparse ConvNeXtV2 Block"""
    def __init__(self, dim, drop_path=0., layer_scale_init_value=1e-6):
        super().__init__()
        
        self.dwconv = EfficientSparseDepthwiseConv2d(
            channels=dim, 
            kernel_size=7, 
            padding=3, 
            bias=True
        )
        
        self.norm = SpLayerNorm(dim, eps=1e-6)
        self.pwconv1 = spconv.SubMConv2d(dim, 4 * dim, kernel_size=1, bias=True)
        self.act = SparseGELU()
        self.grn = SpGRN(4 * dim)
        self.pwconv2 = spconv.SubMConv2d(4 * dim, dim, kernel_size=1, bias=True)
        
        self.gamma = nn.Parameter(
            layer_scale_init_value * torch.ones(dim),
            requires_grad=True
        ) if layer_scale_init_value > 0 else None
        
        self.drop_path = SpDropPath(drop_path) if drop_path > 0. else nn.Identity()
    
    def forward(self, x):
        shortcut = x
        
        x = self.dwconv(x)
        x = self.norm(x)
        x = self.pwconv1(x)
        x = self.act(x)
        x = self.grn(x)
        x = self.pwconv2(x)
        
        if self.gamma is not None:
            x = x.replace_feature(x.features * self.gamma.view(1, -1))
        
        x = self.drop_path(x)
        x = x.replace_feature(shortcut.features + x.features)
        
        return x


class SparseConvNeXtV2(nn.Module):
    """Sparse ConvNeXtV2"""
    def __init__(self, 
                 in_chans=3, 
                 num_classes=1000, 
                 depths=[3, 3, 9, 3], 
                 dims=[96, 192, 384, 768], 
                 drop_path_rate=0., 
                 D=2):
        super().__init__()
        self.depths = depths
        self.num_classes = num_classes
        self.D = D
        
        self.downsample_layers = nn.ModuleList()
        
        # Stem
        stem = nn.Sequential(
            nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4),
            LayerNorm(dims[0], eps=1e-6, data_format="channels_first")
        )
        self.downsample_layers.append(stem)
        
        # Downsampling layers
        for i in range(3):
            downsample_layer = nn.Sequential(
                SpLayerNorm(dims[i], eps=1e-6),
                spconv.SparseConv2d(dims[i], dims[i+1], kernel_size=2, stride=2, bias=True)
            )
            self.downsample_layers.append(downsample_layer)
        
        # Stages
        self.stages = nn.ModuleList()
        dp_rates = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))] 
        cur = 0
        for i in range(4):
            stage = nn.Sequential(
                *[SpBlock(dim=dims[i], drop_path=dp_rates[cur + j]) for j in range(depths[i])]
            )
            self.stages.append(stage)
            cur += depths[i]

        self.apply(self._init_weights)
        
    def _init_weights(self, m):
        if isinstance(m, (spconv.SubMConv2d, spconv.SparseConv2d)):
            trunc_normal_(m.weight, std=.02)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.Conv2d):
            trunc_normal_(m.weight, std=.02)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

    def upsample_mask(self, mask, scale):
        """Mask upsampling"""
        assert len(mask.shape) == 2
        p = int(mask.shape[1] ** .5)
        return mask.reshape(-1, p, p).\
                    repeat_interleave(scale, axis=1).\
                    repeat_interleave(scale, axis=2)

    def forward(self, x, mask):
        B, C, H, W = x.shape
        
        # Upsample mask to input resolution
        mask = self.upsample_mask(mask, 4)
        mask = mask.unsqueeze(1).type_as(x)
        
        # Apply mask
        x = x * (1. - mask)
        
        # Stem
        x = self.downsample_layers[0](x)
        
        # To sparse
        x = to_sparse(x)
        
        # Sparse stages
        for i in range(4):
            if i > 0:
                x = self.downsample_layers[i](x)
            x = self.stages[i](x)
        
        # Densify
        x = x.dense()
        
        return x


# Test
if __name__ == "__main__":
    print("Creating model...")
    model = SparseConvNeXtV2(
        in_chans=3,
        num_classes=1000,
        depths=[3, 3, 9, 3],
        dims=[96, 192, 384, 768],
        drop_path_rate=0.1,
        D=2
    )
    
    batch_size = 2
    H, W = 224, 224
    x = torch.randn(batch_size, 3, H, W)
    
    patch_size = 4
    num_patches = (H // patch_size) * (W // patch_size)
    
    print(f"Input shape: {x.shape}")
    print(f"Number of patches: {num_patches}")
    
    mask = torch.rand(batch_size, num_patches) > 0.5
    mask = mask.float()
    
    print(f"Mask shape: {mask.shape}")
    print(f"Mask ratio: {mask.mean().item():.2%}")
    
    try:
        print("\nForward pass...")
        output = model(x, mask)
        print(f"Output shape: {output.shape}")
        print("✅ Success!")
    except Exception as e:
        print(f"❌ Error: {e}")
        import traceback
        traceback.print_exc()

Creating model...
Input shape: torch.Size([2, 3, 224, 224])
Number of patches: 3136
Mask shape: torch.Size([2, 3136])
Mask ratio: 49.84%

Forward pass...
❌ Error: The size of tensor a (3146) must match the size of tensor b (6272) at non-singleton dimension 0


Traceback (most recent call last):
  File "/tmp/ipykernel_115957/2694884605.py", line 352, in <module>
    output = model(x, mask)
  File "/home/otter/.local/share/mamba/envs/hpe/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1736, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
  File "/home/otter/.local/share/mamba/envs/hpe/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1747, in _call_impl
    return forward_call(*args, **kwargs)
  File "/tmp/ipykernel_115957/2694884605.py", line 314, in forward
    x = self.stages[i](x)
  File "/home/otter/.local/share/mamba/envs/hpe/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1736, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
  File "/home/otter/.local/share/mamba/envs/hpe/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1747, in _call_impl
    return forward_call(*args, **kwargs)
  File "/home/otter/.local/share/mamba/envs/hpe/lib/python3.9/site-pac

In [1]:
import timm

Error importing huggingface_hub.hf_api: No module named 'tqdm'


In [2]:
timm.models.create_model('dinov3_vits14', pretrained=True)

RuntimeError: Unknown model (dinov3_vits14)