# 2025 데이터 크리에이터 캠프

@PHASE: Mission 2

@TEAM: 최후의 인공지능

## Check GPU Availability

In [None]:
!nvidia-smi

In [None]:
# Set CUDA Device Number
DEVICE_NUM = 0

from os import environ
environ["CUDA_VISIBLE_DEVICES"] = str(DEVICE_NUM)

## Imports

In [None]:
import random
import numpy as np
import torch

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

In [None]:
from os import path

import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import transforms, models

from tqdm.notebook import tqdm
import matplotlib.pyplot as plt

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print(f"INFO: Using device - {device}")

## Define Dataset

In [None]:
from dataclasses import dataclass
from torch.utils.data import Dataset


@dataclass
class DatasetHolder:
    train: Dataset = None
    valid: Dataset = None
    test: Dataset = None

    def __post_init__(self):
        print(f"INFO: Dataset loaded successfully. Number of samples - ", end='')
        if self.train:
            print(f"Train: {len(self.train)}", end='')
        if self.valid:
            if self.train: print(', ', end='')
            print(f"Valid: {len(self.valid)}", end='')
        if self.test:
            if self.train: print(', ', end='')
            print(f"Test: {len(self.test)}", end='')
        print('\n')

In [None]:
from enum import Enum


# Github Release URL for datasets
# This will be removed after the contest ends due to the copyright issue.
base_git_path = "https://github.com/b-re-w/K-ICT_DataCreatorCamp_2025/releases/download/dt/"


class KompsatIndex(Enum):
    TRAIN = "TS_KS.zip"
    VALID = "VS_KS.zip"
    TRAIN_BBOX = "TL_KS_BBOX.zip"
    VALID_BBOX = "VL_KS_BBOX.zip"
    TRAIN_LINE = "TL_KS_LINE.zip"
    VALID_LINE = "VL_KS_LINE.zip"

    @property
    def url(self):
        return f"{base_git_path}{self.value}"

In [None]:
from torchvision.datasets import VisionDataset, utils, folder
from torchvision.ops import box_convert
import torch

import traceback
from os import path
from glob import glob
from enum import Enum
from pathlib import Path
from typing import Union, Optional, Callable

from json import load as json_load

from tqdm.asyncio import tqdm
import concurrent.futures
import asyncio

import nest_asyncio
nest_asyncio.apply()


class KompsatType(Enum):
    BBOX = "bbox"
    LINE = "line"


class KompsatDataset(VisionDataset):
    dataset_name = "Kompsat"

    @classmethod
    async def download_method(cls, url, root, filename):
        loop = asyncio.get_event_loop()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            await loop.run_in_executor(executor, utils.download_url, url, root, filename)

    @classmethod
    async def extract_method(cls, from_path, to_path):
        try:
            loop = asyncio.get_event_loop()
            with concurrent.futures.ThreadPoolExecutor() as executor:
                await loop.run_in_executor(executor, utils.extract_archive, from_path, to_path)
        except FileExistsError as e:
            traceback.print_exc()
            raise FileExistsError(str(e) + "\nPlease use Python 3.13 or later. 3.12 or earlier versions not support unzip over existing directory.")

    def __init__(
        self,
        root: Union[str, Path] = None,
        train: bool = True,
        data_type: KompsatType = KompsatType.BBOX,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None
    ):
        """
        Kompsat-3/3A dataset for object detection and height estimation.

        Args:
            root: Dataset root directory
            train: True for training set, False for validation set
            data_type: BBOX or LINE
            transform: Image transforms
            target_transform: Mask transforms
        """
        super().__init__(root, transforms=transform, transform=transform, target_transform=target_transform)

        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.download(root))

        self.root = path.join(root, self.dataset_name)
        self.train = train
        self.data_type = data_type
        split = "train" if train else "val"
        img_dir = path.join(self.root, "images", split)
        ann_dir = path.join(self.root, "annotations", split)
        line_dir = path.join(self.root, "lines", split)

        self.images = sorted(glob(path.join(img_dir, "*.jpg")))
        self.labels = []
        for pth in self.images:
            if data_type is KompsatType.BBOX:
                annotation_path = path.join(ann_dir, Path(pth).stem + ".json")
                if not path.exists(annotation_path):
                    raise FileNotFoundError(f"Annotation file {annotation_path} does not exist.")
                annotation = list(json_load(open(annotation_path, "r", encoding="utf-8")).values())[0]
                regions = []
                for anno in sorted(annotation['regions'], key=lambda x: int(x['region_attributes']['chi_id'])):
                    bbox = anno["shape_attributes"]
                    bbox = [bbox["x"], bbox["y"], bbox["width"], bbox["height"]]
                    regions.append(dict(
                        chi_id=int(anno["region_attributes"]["chi_id"]),
                        xywh=bbox,
                        xyxy=box_convert(torch.tensor(bbox), "xywh", "xyxy").tolist(),
                        cxcywh=box_convert(torch.tensor(bbox), "xywh", "cxcywh").tolist()
                    ))
                annotation['regions'] = regions
                annotation['file_attributes']['img_width'] = int(annotation['file_attributes']['img_width'])
                annotation['file_attributes']['img_height'] = int(annotation['file_attributes']['img_height'])
                self.labels.append(annotation)
            else:
                label_path = path.join(line_dir, Path(pth).stem + ".json")
                if not path.exists(label_path):
                    raise FileNotFoundError(f"Label file {label_path} does not exist.")
                label = list(json_load(open(label_path, "r", encoding="utf-8")).values())[0]
                regions = []
                for ln in sorted(label['regions'], key=lambda x: int(x['region_attributes']['chi_id'])):
                    poly = ln["shape_attributes"]
                    xyxy = [poly["all_points_x"][0], poly["all_points_y"][0], poly["all_points_x"][1], poly["all_points_y"][1]]
                    x1 = max(min(xyxy[0], xyxy[2]) - 1, 0)
                    y1 = max(min(xyxy[1], xyxy[3]) - 1, 0)
                    w = abs(xyxy[2] - xyxy[0]) + 2
                    h = max(abs(xyxy[3] - xyxy[1]), 1) + 2
                    regions.append(dict(
                        polyline=xyxy,
                        xywh=[x1, y1, w, h],
                        chi_height=float(ln["region_attributes"]['chi_height_m']),
                    ))
                label['regions'] = regions
                label['file_attributes']['img_width'] = int(label['file_attributes']['img_width'])
                label['file_attributes']['img_height'] = int(label['file_attributes']['img_height'])
                self.labels.append(label)

        assert len(self.images) == len(self.labels), \
            f"Number of images ({len(self.images)}) and labels ({len(self.labels)}) do not match."

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        """Get a sample from the dataset"""
        # Load image/label using default_loader
        image = folder.default_loader(self.images[idx])
        label = self.labels[idx]

        # Apply transforms
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)

        return image, label

    @classmethod
    async def download(cls, root: str):
        dataset_root = path.join(root, cls.dataset_name)
        if path.exists(dataset_root):  # If the dataset directory already exists, skip download
            return

        data_list = [
            KompsatIndex.TRAIN, KompsatIndex.VALID,
            KompsatIndex.TRAIN_BBOX, KompsatIndex.VALID_BBOX,
            KompsatIndex.TRAIN_LINE, KompsatIndex.VALID_LINE
        ]

        print(f"INFO: Downloading '{cls.dataset_name}' from server to {root}...")
        routines = []
        for data in data_list:
            if path.isfile(path.join(root, data.value)):
                print(f"INFO: Dataset archive {data.value} found in the root directory. Skipping download.")
                continue

            routines.append(cls.download_method(data.url, root=root, filename=data.value))
        await tqdm.gather(*routines, desc="Downloading files")

        print(f"INFO: Extracting '{cls.dataset_name}' dataset...")
        routines = []
        img_dir, anno_dir, line_dir = path.join(dataset_root, "images"), path.join(dataset_root, "annotations"), path.join(dataset_root, "lines")
        as_train, as_valid = lambda d: path.join(d, "train"), lambda d: path.join(d, "val")
        routines.extend((
            cls.extract_method(path.join(root, KompsatIndex.TRAIN.value), to_path=as_train(img_dir)),
            cls.extract_method(path.join(root, KompsatIndex.VALID.value), to_path=as_valid(img_dir)),
            cls.extract_method(path.join(root, KompsatIndex.TRAIN_BBOX.value), to_path=as_train(anno_dir)),
            cls.extract_method(path.join(root, KompsatIndex.VALID_BBOX.value), to_path=as_valid(anno_dir)),
            cls.extract_method(path.join(root, KompsatIndex.TRAIN_LINE.value), to_path=as_train(line_dir)),
            cls.extract_method(path.join(root, KompsatIndex.VALID_LINE.value), to_path=as_valid(line_dir)),
        ))
        await tqdm.gather(*routines, desc="Extracting files")


class KompsatDatasetForObjectDetection(KompsatDataset):
    def __init__(
        self,
        root: Union[str, Path] = None,
        train: bool = True,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None
    ):
        super().__init__(root, train, KompsatType.BBOX, transform, target_transform)


class KompsatDatasetForHeightRegression(KompsatDataset):
    def __init__(
        self,
        root: Union[str, Path] = None,
        train: bool = True,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None
    ):
        super().__init__(root, train, KompsatType.LINE, transform, target_transform)

In [None]:
DATA_ROOT = path.join(".", "data")

kompstats = DatasetHolder(
    train=KompsatDatasetForHeightRegression(root=DATA_ROOT, train=True),
    valid=KompsatDatasetForHeightRegression(root=DATA_ROOT, train=False)
)
kompstats.test = kompstats.valid

In [None]:
kompstats.train[0]

In [None]:
rgb_image, annotation = kompstats.train[0]
fig, axes = plt.subplots(1, 1, figsize=(6, 6))

axes.imshow(rgb_image)
axes.set_title('Image')
axes.axis('off')

for region in annotation['regions']:
    x1, y1, w, h = region['xywh']
    rect = plt.Rectangle((x1, y1), w, h, fill=False, edgecolor='red', linewidth=2)
    axes.add_patch(rect)

    polyline = region['polyline']
    xs = [polyline[0], polyline[2]]
    ys = [polyline[1], polyline[3]]
    axes.plot(xs, ys, color='blue', linewidth=2)

plt.tight_layout()
plt.show()

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def target_transform(data):
    regions = data['regions']
    polylines = []
    heights = []
    for region in regions:
        polyline = region['polyline']  # region['xywh']
        height = region['chi_height']
        polylines.append(polyline)
        heights.append(height)
    return torch.tensor(polylines), torch.tensor(heights)

In [None]:
kompstats.train.transform = transform
kompstats.valid.transform = transform
kompstats.train.target_transform = target_transform
kompstats.valid.target_transform = target_transform

In [None]:
kompstats.train[0]

## DataLoader

In [None]:
# Set Batch Size
BATCH_SIZE = 50, 100, 100  # A100
BATCH_SIZE = 12, 64, 64

print(f"INFO: Set batch size - Train: {BATCH_SIZE[0]}, Valid: {BATCH_SIZE[1]}, Test: {BATCH_SIZE[2]}")

In [None]:
def collate_fn(batch):
    images, labels = zip(*batch)
    coords, heights = zip(*labels)
    return torch.stack(images), coords, heights

In [None]:
train_loader = DataLoader(kompstats.train, batch_size=BATCH_SIZE[0], shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(kompstats.valid, batch_size=BATCH_SIZE[1], shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(kompstats.test, batch_size=BATCH_SIZE[2], shuffle=False, collate_fn=collate_fn)

## Load Model

In [None]:
class NaiveRegression(nn.Module):
    def __init__(self):
        super().__init__()
        self.feature_dim = 512
        self.image_encoder = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        self.image_encoder.fc = nn.Linear(self.image_encoder.fc.in_features, self.feature_dim)

        self.keypoint_encoder = nn.Sequential(
            nn.Linear(4, 64),     # [kpt_x1, kpt_y1, kpt_x2, kpt_y2]
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256)
        )

        self.predictor = nn.Sequential(
            nn.Linear(512 + 256, 512),  # 이미지 특징 + 키포인트 특징
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)    # 높이 출력
        )

    def forward(self, pixel_values, polylines):
        outputs = self.image_encoder(pixel_values.to(device))
        results = []

        for hidden_states, polyline in zip(outputs, polylines):
            hidden_states = hidden_states.unsqueeze(0)
            if polyline.shape[0] > 1:
                hidden_states = hidden_states.expand(polyline.shape[0], -1)

            kpt_features = self.keypoint_encoder(polyline.float().to(device))
            combined_features = torch.cat((hidden_states, kpt_features), dim=1)
            height = self.predictor(combined_features)

            results.append(height.reshape(-1))
        return results

In [None]:
class HybridGeometricRegression(nn.Module):
    def __init__(self, img_size=224):
        super().__init__()
        self.img_size = img_size
        self.feature_dim = 256  # 이미지 특징 차원 축소
        
        # 이미지 인코더 (가벼워짐)
        self.image_encoder = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        self.image_encoder.fc = nn.Linear(self.image_encoder.fc.in_features, self.feature_dim)

        # 키포인트 인코더 (축소)
        self.keypoint_encoder = nn.Sequential(
            nn.Linear(4, 32),     # [kpt_x1, kpt_y1, kpt_x2, kpt_y2]
            nn.ReLU(),
            nn.Linear(32, 64)
        )

        # 기하학적 특징 인코더 (핵심 추가)
        self.geometric_encoder = nn.Sequential(
            nn.Linear(6, 32),     # [pixel_dist, angle, center_x, center_y, aspect_ratio, norm_dist]
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 128)
        )

        # 기하학적 높이 예측기 (물리적 계산 기반)
        self.geometric_predictor = nn.Sequential(
            nn.Linear(1, 16),     # 픽셀 거리만 입력
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 1)      # 기하학적 높이 추정
        )

        # 최종 융합 예측기 (가중치를 기하학적 쪽에 더 주기)
        self.fusion_predictor = nn.Sequential(
            nn.Linear(256 + 64 + 128 + 1, 256),  # 이미지 + 키포인트 + 기하학적 + 기하학적높이
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1)     # 최종 높이 출력
        )

    def calculate_geometric_features(self, polyline):
        """키포인트로부터 기하학적 특징 계산"""
        batch_size = polyline.shape[0]
        geometric_features = torch.zeros(batch_size, 6, device=polyline.device)
        
        for i in range(batch_size):
            kpt_x1, kpt_y1, kpt_x2, kpt_y2 = polyline[i]
            
            # 1. 픽셀 거리 계산
            pixel_distance = torch.sqrt((kpt_x2 - kpt_x1)**2 + (kpt_y2 - kpt_y1)**2)
            
            # 2. 키포인트 간 각도
            angle = torch.atan2(kpt_y2 - kpt_y1, kpt_x2 - kpt_x1)
            
            # 3. 중심점 좌표 (정규화)
            center_x = (kpt_x1 + kpt_x2) / 2 / self.img_size
            center_y = (kpt_y1 + kpt_y2) / 2 / self.img_size
            
            # 4. 종횡비 (세로/가로)
            height_diff = torch.abs(kpt_y2 - kpt_y1)
            width_diff = torch.abs(kpt_x2 - kpt_x1)
            aspect_ratio = height_diff / (width_diff + 1e-8)
            
            # 5. 정규화된 거리 (이미지 대각선 대비)
            diagonal = torch.sqrt(torch.tensor(self.img_size**2 + self.img_size**2, device=polyline.device))
            normalized_distance = pixel_distance / diagonal
            
            geometric_features[i] = torch.stack([
                pixel_distance, angle, center_x, center_y, aspect_ratio, normalized_distance
            ])
        
        return geometric_features

    def geometric_height_estimation(self, pixel_distance):
        """기하학적 높이 추정 (물리적 계산 시뮬레이션)"""
        # 간단한 스케일 변환 (실제로는 GSD, 카메라 파라미터 등을 고려)
        # 이 부분은 위성 이미지의 메타데이터가 있다면 더 정확하게 계산 가능
        normalized_distance = pixel_distance / self.img_size
        geometric_height = self.geometric_predictor(normalized_distance.unsqueeze(-1))
        return geometric_height

    def forward(self, pixel_values, polylines):
        # 이미지 인코딩
        image_features = self.image_encoder(pixel_values.to(device))
        results = []

        for hidden_states, polyline in zip(image_features, polylines):
            hidden_states = hidden_states.unsqueeze(0)
            if polyline.shape[0] > 1:
                hidden_states = hidden_states.expand(polyline.shape[0], -1)

            # 1. 키포인트 특징 추출
            kpt_features = self.keypoint_encoder(polyline.float().to(device))
            
            # 2. 기하학적 특징 계산
            geometric_features = self.calculate_geometric_features(polyline.float().to(device))
            geo_features = self.geometric_encoder(geometric_features)
            
            # 3. 기하학적 높이 추정
            pixel_distances = geometric_features[:, 0]  # 첫 번째 특징이 픽셀 거리
            geo_heights = self.geometric_height_estimation(pixel_distances)
            
            # 4. 모든 특징 융합
            combined_features = torch.cat((
                hidden_states,      # 이미지 특징
                kpt_features,       # 키포인트 특징  
                geo_features,       # 기하학적 특징
                geo_heights         # 기하학적 높이 추정
            ), dim=1)
            
            # 5. 최종 높이 예측
            height = self.fusion_predictor(combined_features)
            results.append(height.reshape(-1))
            
        return results

In [None]:
class GeometricPriorityRegression(nn.Module):
    def __init__(self, img_size=224):
        super().__init__()
        self.img_size = img_size

        # 기하학적 높이 예측기 (메인)
        self.geometric_predictor = nn.Sequential(
            nn.Linear(4, 64),     # [pixel_dist, norm_dist, angle, aspect_ratio]
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)      # 기하학적 높이 추정
        )

        # 이미지 맥락 보정기 (보조)
        self.image_encoder = models.resnet101(weights=models.ResNet101_Weights.DEFAULT)
        self.image_encoder.fc = nn.Linear(self.image_encoder.fc.in_features, 128)

        # 최종 보정기 (기하학적 결과를 미세 조정)
        self.corrector = nn.Sequential(
            nn.Linear(128 + 1, 64),  # 이미지 특징 + 기하학적 높이
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(64, 16),
            nn.ReLU(),
            nn.Linear(16, 1)         # 보정값 출력
        )

    def calculate_geometric_features(self, polyline):
        """핵심 기하학적 특징만 계산"""
        batch_size = polyline.shape[0]
        features = torch.zeros(batch_size, 4, device=polyline.device)

        for i in range(batch_size):
            kpt_x1, kpt_y1, kpt_x2, kpt_y2 = polyline[i]

            # 픽셀 거리
            pixel_distance = torch.sqrt((kpt_x2 - kpt_x1)**2 + (kpt_y2 - kpt_y1)**2)

            # 정규화된 거리
            diagonal = torch.sqrt(torch.tensor(2 * self.img_size**2, device=polyline.device))
            normalized_distance = pixel_distance / diagonal

            # 각도
            angle = torch.atan2(kpt_y2 - kpt_y1, kpt_x2 - kpt_x1)

            # 종횡비
            height_diff = torch.abs(kpt_y2 - kpt_y1)
            width_diff = torch.abs(kpt_x2 - kpt_x1)
            aspect_ratio = height_diff / (width_diff + 1e-8)

            features[i] = torch.stack([pixel_distance, normalized_distance, angle, aspect_ratio])

        return features

    def forward(self, pixel_values, polylines):
        # 이미지 전역 특징 (맥락 정보)
        image_features = self.image_encoder(pixel_values.to(device))
        results = []

        for hidden_states, polyline in zip(image_features, polylines):
            hidden_states = hidden_states.unsqueeze(0)
            if polyline.shape[0] > 1:
                hidden_states = hidden_states.expand(polyline.shape[0], -1)

            # 1. 기하학적 특징 계산 및 높이 추정 (메인)
            geometric_features = self.calculate_geometric_features(polyline.float().to(device))
            geometric_height = self.geometric_predictor(geometric_features)

            # 2. 이미지 맥락을 통한 보정 (보조)
            correction_input = torch.cat((hidden_states, geometric_height), dim=1)
            correction = self.corrector(correction_input)

            # 3. 최종 높이 = 기하학적 높이 + 보정값
            final_height = geometric_height + correction

            results.append(final_height.reshape(-1))

        return results

In [None]:
import gc

torch.cuda.empty_cache()
gc.collect()

model = GeometricPriorityRegression(img_size=224)
model.to(device)

## Train

In [None]:
# Set Epoch Count & Learning Rate
EPOCHS = 300
LEARNING_RATE = 1e-4, 1e-5

In [None]:
classifier = nn.HuberLoss(delta=1.0)
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE[0], weight_decay=0.05)
scheduler = optim.lr_scheduler.CyclicLR(optimizer, base_lr=LEARNING_RATE[1], max_lr=LEARNING_RATE[0], step_size_up=50, gamma=0.95)
#scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10, eta_min=LEARNING_RATE[1])
#scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=LEARNING_RATE[0], steps_per_epoch=len(train_loader), epochs=EPOCHS)

In [None]:
from tqdm.notebook import tqdm
import gc

torch.cuda.empty_cache()
while gc.collect():
    torch.cuda.empty_cache()

for epoch in tqdm(range(EPOCHS), desc="Running Epochs"):
    train_loss, valid_loss = [], []
    train_rmse, valid_rmse = [], []

    model.train()
    train_bar = tqdm(total=int(len(kompstats.train)/BATCH_SIZE[0]+0.5), desc=f"Training for {epoch+1}/{EPOCHS}")
    for inputs, coords, heights in train_loader:
        optimizer.zero_grad()

        preds = model(inputs, coords)
        all_preds = torch.cat([pred.flatten() for pred in preds])
        all_heights = torch.cat([height.flatten() for height in heights]).to(device).to(all_preds.dtype)

        losses = classifier(all_preds, all_heights)
        rmse = torch.sqrt(nn.functional.mse_loss(all_preds, all_heights)).item()
        if rmse >= 1000 or (epoch >= 50 and rmse >= 10) \
                or (epoch >= 100 and rmse >= 7) or (epoch >= 150 and rmse >= 5) \
                or (epoch >= 200 and rmse >= 3) or (epoch >= 250 and rmse >= 2):  # 이상치 로스 업데이트 안함
            train_bar.update(1); continue
        losses.backward()

        train_loss.append(losses.item())
        train_rmse.append(rmse)

        optimizer.step()
        train_bar.update(1)
        train_bar.set_postfix({"Loss": f"{losses.item():.6f}", "RMSE(m)": f"{rmse:.3f}", "LR": f"{optimizer.param_groups[0]['lr']:.1e}"})
    train_bar.set_postfix({"Loss": f"{sum(train_loss)/len(train_loss):.6f}", "RMSE(m)": f"{sum(train_rmse)/len(train_rmse):.3f}", "LR": f"{optimizer.param_groups[0]['lr']:.1e}"})
    train_bar.close()

    model.eval()
    valid_bar = tqdm(total=int(len(kompstats.valid)/BATCH_SIZE[1]+0.5), desc=f"Validating for {epoch+1}/{EPOCHS}")
    with torch.inference_mode():
        for inputs, coords, heights in valid_loader:
            preds = model(inputs, coords)
            all_preds = torch.cat([pred.flatten() for pred in preds])
            all_heights = torch.cat([height.flatten() for height in heights]).to(device).to(all_preds.dtype)

            losses = classifier(all_preds, all_heights)
            rmse = torch.sqrt(nn.functional.mse_loss(all_preds, all_heights))

            valid_loss.append(losses.item())
            valid_rmse.append(rmse.item())
            valid_bar.update(1)

    valid_bar.set_postfix({"Loss": f"{sum(valid_loss)/len(valid_loss):.6f}", "RMSE(m)": f"{sum(valid_rmse)/len(valid_rmse):.3f}"})
    valid_bar.close()

    scheduler.step()

 ## Evaluate

In [None]:
def evaluate(model, data_loader):
    model.eval()
    total_rmse = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, coords, heights in tqdm(data_loader):
            preds = model(inputs, coords)
            all_preds = torch.cat([pred.flatten() for pred in preds])
            all_heights = torch.cat([height.flatten() for height in heights]).to(device).to(all_preds.dtype)

            rmse = torch.sqrt(nn.functional.mse_loss(all_preds, all_heights))
            total_rmse += rmse.item() * len(all_heights)
            total_samples += len(all_heights)

    return total_rmse / total_samples

In [None]:
print("\n=== Final Evaluation ===")
test_rmse = evaluate(model, test_loader)
print(f"Test RSME: {test_rmse:.3f}m")