In [None]:

이 폴더 한번 싹 훑어보고 f1 macro의 점수를 좀더 높게 받으려면 어떤부분을 개선해야할지 한번 말해줄수있어?

Gemini
안녕하세요! Gemini Code Assist입니다. 제출하신 전체 코드를 꼼꼼하게 검토했습니다. 이미 K-Fold 교차 검증, TTA, 앙상블 등 매우 수준 높은 기법들을 잘 활용하고 계십니다. F1 Macro 점수를 한 단계 더 끌어올리기 위한 몇 가지 개선 방안을 제안해 드립니다.

종합 분석 및 개선 방향
현재 프로젝트는 다음과 같은 강점을 가지고 있습니다.

상세한 EDA: korean_document_eda.py를 통해 데이터 불균형, 이미지 크기 등 핵심 특징을 잘 파악하고 있습니다.
체계적인 훈련: main_test.py에서 StratifiedKFold를 사용하여 안정적인 검증을 수행합니다.
고급 추론 기법: inference.py와 inference_ensemble.py에서 TTA와 다중 모델 앙상블을 구현하여 성능을 극대화하고 있습니다.
이러한 강점을 바탕으로, F1 Macro 점수를 높이기 위해 소수 클래스(minority class)의 성능을 개선하고 모델의 일반화 성능을 극대화하는 데 초점을 맞춘 개선안을 제안합니다.

🎯 개선 제안 (F1 Macro 점수 향상)
1. K-Fold 교차 검증 결과물을 활용한 앙상블 (가장 효과적인 방법)
현재 main_test.py에서 K-Fold 훈련을 하고 있지만, 각 Fold에서 생성된 모델을 앙상블에 직접 활용하고 있지 않습니다. 각 Fold의 최고 성능 모델들을 앙상블하면 단일 모델이나 서로 다른 아키텍처를 앙상블하는 것보다 더 안정적이고 높은 성능을 기대할 수 있습니다.

실행 방안:

main_test.py를 실행하여 각 Fold의 best-checkpoint를 모두 저장합니다. (EfficientNetV2M_fold0_....pth, EfficientNetV2M_fold1_....pth, ...)
inference_ensemble.py를 수정하여 이 모델들을 모두 불러와 추론하고, 결과의 평균을 사용합니다.
inference_ensemble.py 수정 제안:

inference_ensemble.py
-20
+11
import torch
import pandas as pd
from tqdm import tqdm
import glob
from torch.utils.data import DataLoader
import torchvision.transforms as T

from datasets.transforms import build_unified_transforms
from utils.predict_tta import get_tta_predictions


# --- 기본 설정 ---
cfg = load_config("config/inference_config.yaml")
ModelClass_A = get_model(cfg['MODEL_A']['name'])
ModelClass_B = get_model(cfg['MODEL_B']['name'])
ModelClass_C = get_model(cfg['MODEL_C']['name'])
ModelClass_D = get_model(cfg['MODEL_D']['name'])
ModelClassList = [
    ModelClass_A,
    ModelClass_B,
    ModelClass_C,
]
# K-Fold 앙상블을 위해 단일 모델 클래스를 사용합니다.
ModelClass = get_model(cfg['MODEL']) # config에 앙상블할 모델 이름 추가 필요 (예: 'EfficientNetV2MArcFaceModel')
DatasetClass = get_dataset(cfg['DATASET'])
num_classes = 17
num_workers = os.cpu_count() // 2

# --- 앙상블 설정 ---
# 1. 사용할 모델들의 체크포인트 경로 리스트
model_paths = [
    f"{output_root}/EfficientNetV2MArcFaceModel_2025-07-05_20-40_checkpoint.pth",
    f"{output_root}/ResNeStModelArcFaceModel_2025-07-06_03-13_checkpoint.pth",
    f"{output_root}/ConvNeXtArcFaceModel_2025-07-06_05-22_checkpoint.pth",
]
# K-Fold로 생성된 모든 모델 체크포인트를 동적으로 불러옵니다.
# 예시: "EfficientNetV2MArcFaceModel_fold*_checkpoint.pth" 패턴으로 파일 검색
model_pattern = f"{cfg['MODEL']}_fold*_checkpoint.pth" 
model_paths = glob.glob(os.path.join(output_root, model_pattern))
print(f"발견된 K-Fold 모델 {len(model_paths)}개: {model_paths}")

# 가중치 리스트 정의
# 모델 경로(model_paths) 순서와 동일하게 가중치를 설정합니다.
num_models = len(model_paths)
use_weight = False
if use_weight:
    weights = [0.2, 0.4, 0.4]
    # weights = [0.2, 0.3, 0.3, 0.2]
    weights = [0.2, 0.2, 0.2, 0.2, 0.2] # Fold 개수에 맞게 수정
else:
    weights = [1 / num_models] * num_models

# --- 모델 로드 ---
ensemble_models = []
print("앙상블 모델 로딩 시작...")
for path, ModelClass in zip(model_paths, ModelClassList):
    # 주의: 모델 아키텍처가 다를 경우, get_model 등을 통해 각 모델에 맞는 클래스를 불러와야 합니다.
    model = ModelClass(num_classes).to(device)
for path in model_paths:
    model = ModelClass(num_classes=num_classes).to(device)
    model.load_state_dict(torch.load(path, map_location=device))
    model.eval() # 평가 모드로 즉시 설정
    ensemble_models.append(model)

2. 데이터 불균형 문제 심층 해결
F1 Macro 점수는 각 클래스의 F1 점수의 산술 평균이므로, 데이터가 적은 클래스의 성능이 전체 점수에 큰 영향을 미칩니다. EDA에서 확인된 데이터 불균형을 해결하는 것이 점수 향상의 핵심입니다.

실행 방안:

Focal Loss 사용: main.py에서 get_loss 함수를 통해 손실 함수를 불러오고 있습니다. 일반적인 CrossEntropyLoss 대신 Focal Loss를 사용해 보세요. Focal Loss는 맞추기 어려운 샘플(주로 소수 클래스)에 더 큰 가중치를 부여하여 모델이 소수 클래스에 집중하도록 유도합니다. config.yaml에서 손실 함수 관련 설정을 변경하여 실험해 보시는 것을 추천합니다.

yaml
# config/main_config.yaml 예시
loss:
  name: "FocalLoss"
  params:
    alpha: 0.25
    gamma: 2.0
클래스별 가중치 적용: nn.CrossEntropyLoss를 사용하더라도, 클래스별 데이터 수의 역수에 비례하는 가중치(weight 인자)를 계산하여 적용할 수 있습니다. 이는 소수 클래스의 loss 영향력을 높여줍니다.

3. 데이터 증강(Augmentation) 기법 고도화
모델이 다양한 형태의 문서 이미지를 학습하게 하여 일반화 성능을 높일 수 있습니다.

실행 방안:

Mixup / CutMix 적용: 이미지와 레이블을 일정 비율로 섞는 Mixup이나 이미지의 일부를 다른 이미지로 대체하는 CutMix는 분류 모델의 성능과 강건함을 높이는 데 매우 효과적입니다. 훈련 루프(trainer.py의 training_loop) 내에서 배치를 받아 처리하는 부분에 적용할 수 있습니다.

test_data_preprocess.py의 전처리 기법을 훈련에도 일부 적용: test_data_preprocess.py에서 사용한 왜곡 보정, 명암 조절 등의 기법들을 훈련 시 데이터 증강의 일부로 확률적으로 적용(e.g., transforms.RandomApply)하면 훈련 데이터와 테스트 데이터의 분포 차이를 줄여 성능 향상에 도움이 될 수 있습니다.

4. 코드 리팩토링 및 관리 효율성 증대
main.py 정리: 현재 main.py 파일에 DataLoader가 두 번 정의되어 있습니다. 중복되는 코드를 제거하여 가독성과 유지보수성을 높이는 것이 좋습니다.
하드코딩된 경로 제거: inference_ensemble.py에 모델 경로가 하드코딩되어 있습니다. 위에서 제안한 것처럼 glob을 사용하거나, 훈련 스크립트에서 생성된 모델 경로들을 별도의 설정 파일이나 로그 파일에 기록하고 추론 시 이를 읽어오는 방식으로 자동화하는 것이 좋습니다.
요약 및 다음 단계 제안
가장 먼저 main_test.py를 활용한 K-Fold 훈련 및 앙상블(제안 1)을 시도해 보세요. 가장 큰 성능 향상을 기대할 수 있는 방법입니다.
그다음, **Focal Loss를 적용(제안 2)**하여 소수 클래스의 성능을 개선하는 실험을 진행합니다.
추가적인 성능 향상이 필요할 때 **Mixup/CutMix와 같은 고급 증강 기법(제안 3)**을 도입하는 것을 고려해 보세요.
이 제안들이 F1 Macro 점수를 높이는 데 도움이 되기를 바랍니다. 성공적인 결과 있기를 응원합니다!

이 폴더 한번 싹 훑어보고 f1 macro의 점수를 좀더 높게 받으려면 어떤부분을 개선해야할지 한번 말해줄수있어?


Gemini
안녕하세요! Gemini Code Assist입니다. 제출하신 전체 코드를 꼼꼼하게 검토해 보았습니다. K-Fold 교차 검증, TTA, 앙상블 등 이미 훌륭한 기법들을 많이 적용하고 계시네요. 현재 구조에서 F1 Macro 점수를 더 높이기 위해 시도해 볼 만한 몇 가지 개선 방안을 제안해 드립니다.

종합 평가 및 개선 방향
F1 Macro 점수는 각 클래스에 동일한 가중치를 부여하여 평균을 내기 때문에, 데이터가 적은 클래스(minority class)의 성능이 전체 점수에 큰 영향을 미칩니다. 따라서 개선의 핵심은 **"데이터 불균형 문제를 얼마나 잘 해결하고, 소수 클래스의 성능을 높이는가"**에 있습니다.

🎯 주요 개선 제안 (F1 Macro 점수 향상 초점)
1. 클래스 불균형 문제 심층 해결: Focal Loss 도입
현재 main.py에서 sampler를 사용하여 클래스 불균형을 일부 완화하고 있지만, 이것만으로는 부족할 수 있습니다. 특히 분류하기 어려운(hard-to-classify) 샘플에 더 집중하여 학습하는 Focal Loss를 도입하는 것을 강력히 추천합니다. Focal Loss는 맞추기 쉬운 다수 클래스(majority class) 샘플들의 loss 기여도를 낮추고, 맞추기 어려운 소수 클래스 샘플에 더 집중하게 만들어 F1 Macro 점수 향상에 매우 효과적입니다.

main.py의 get_loss 함수를 사용하고 있으므로, loss_factory.py에 Focal Loss를 추가하고 main_config.yaml에서 손쉽게 변경할 수 있도록 구성하는 것이 좋습니다.

예시 (loss_factory.py에 Focal Loss 추가):

python
# utils/loss_factory.py (가상 경로)
import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1-pt)**self.gamma * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

def get_loss(name, params):
    if name == "CrossEntropyLoss":
        return nn.CrossEntropyLoss(**params)
    elif name == "FocalLoss":
        return FocalLoss(**params)
    # ... 다른 손실 함수들
    else:
        raise ValueError(f"Unknown loss function: {name}")
적용 (main_config.yaml 수정):

yaml
# config/main_config.yaml
loss:
  # name: "CrossEntropyLoss"
  # params: {}
  name: "FocalLoss"
  params:
    alpha: 0.25
    gamma: 2.0
2. K-Fold 교차 검증과 앙상블의 결합
main_test.py에서 K-Fold를 구현하셨고, inference_ensemble.py에서 여러 모델을 앙상블하고 있습니다. 이 두 가지 강력한 기법을 결합하여 성능을 극대화할 수 있습니다.

현재 방식:

Model A, B, C를 각각 한 번씩 학습 -> 3개 모델 앙상블
제안 방식:

5-Fold 교차 검증을 사용한다고 가정
Model A를 5-Fold로 학습 -> 5개의 Model A 체크포인트 생성
Model B를 5-Fold로 학습 -> 5개의 Model B 체크포인트 생성
Model C를 5-Fold로 학습 -> 5개의 Model C 체크포인트 생성
총 15개의 모델을 모두 앙상블하여 최종 예측 수행
이렇게 하면 데이터셋을 훨씬 효율적으로 활용하게 되어 모델의 일반화 성능이 크게 향상되고, 단일 모델의 분산(variance)을 줄여주어 최종 앙상블 결과가 훨씬 안정적이고 강력해집니다.

3. 테스트 데이터 전처리 로직 오류 수정
test_data_preprocess.py 스크립트에서 심각한 논리적 오류가 발견되었습니다. 테스트 이미지에 어떤 전처리를 적용할지 결정하기 위해 sample_submission.csv의 target 값을 사용하고 있습니다.

python
# /root/DTC/upstageailab-cv-classification-cv_6/test_data_preprocess.py
...
    # CSV 파일 읽기
    df = pd.read_csv(input_csv) # input_csv = './data/sample_submission.csv'
...
    for index, row in df.iterrows():
        file_name = row['ID']
        target = row['target'] # <-- 문제 지점
...
        # 클래스가 지정된 리스트에 있는 경우 전처리 수행
        if target in classes_to_crop: # <-- 문제 지점
            processed_image = process_image(image_path)
...
sample_submission.csv의 target은 실제 정답이 아닌, 제출 형식을 위한 임의의 값입니다. 실제 테스트 환경에서는 정답(target)을 알 수 없으므로 이 로직은 동작할 수 없으며, 현재는 우연히 sample_submission의 값에 따라 전처리가 적용되고 있을 뿐입니다.

해결 방안: 모든 테스트 이미지에 동일한 전처리 파이프라인을 적용해야 합니다. 특정 클래스에만 다른 전처리를 적용하고 싶다면, 그 결정은 모델 스스로(예: 1단계 분류 모델) 해야 하며 외부 정보(정답)에 의존해서는 안 됩니다.

가장 간단하고 올바른 수정은 조건문을 제거하고 모든 이미지에 일관된 전처리를 적용하거나, 혹은 전처리가 불필요하다고 판단되면 원본을 그대로 사용하는 것입니다. 아래는 모든 이미지에 process_image를 적용하는 수정안입니다.

test_data_preprocess.py
-39
+32
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 크롭할 클래스 리스트
    classes_to_crop = [1, 3, 4, 6, 7, 10, 11, 12, 13, 14, 15]
    """
    0,계좌번호
    1,지급 신청서
    2,자동차 계기판
    3,입_퇴원 확인서
    4,진단서
    5,운전면허증
    6,진료비영수증
    7,통원_진료 확인서
    8,주민등록증
    9,여권
    10,진료비_납입_확인서
    11,약제비_영수증
    12,처방전
    13,이력서
    14,소견서
    15,자동차_등록증
    16,자동차_번호판
    """

    # CSV 파일 읽기
    df = pd.read_csv(input_csv)
    
    processed_data = []

    # 각 행에 대해 반복
    for index, row in df.iterrows():
    # for index, row in df.iterrows():
    #     file_name = row['ID']
    #     target = row['target']
    #     image_path = os.path.join(input_dir, file_name)
    #     print(f"Processing image: {index} - {file_name}")

    #     # 클래스가 지정된 리스트에 있는 경우 전처리 수행
    #     if target in classes_to_crop:
    #         processed_image = process_image(image_path)
    #         processed_image_path = os.path.join(output_dir, file_name)
    #         cv2.imwrite(processed_image_path, processed_image)
            
    #         # 새로운 CSV 파일에 기록할 데이터 추가
    #         processed_data.append({'ID': file_name, 'target': target})
    #     else:
    #         # 원본 이미지 복사
    #         image = load_image(image_path)
    #         processed_image_path = os.path.join(output_dir, file_name)
    #         cv2.imwrite(processed_image_path, image)
            
    #         # 처리하지 않은 데이터도 리스트에 추가
    #         processed_data.append({'ID': file_name, 'target': target})
    
    # 모든 이미지에 일관된 전처리 적용
    for index, row in df.iterrows():
        file_name = row['ID']
        target = row['target']
        image_path = os.path.join(input_dir, file_name)
        print(f"Processing image: {index} - {file_name}")

        # 클래스가 지정된 리스트에 있는 경우 전처리 수행
        if target in classes_to_crop:
            processed_image = process_image(image_path)
            processed_image_path = os.path.join(output_dir, file_name)
            cv2.imwrite(processed_image_path, processed_image)
            
            # 새로운 CSV 파일에 기록할 데이터 추가
            processed_data.append({'ID': file_name, 'target': target})
        else:
            # 원본 이미지 복사
            image = load_image(image_path)
            processed_image_path = os.path.join(output_dir, file_name)
            cv2.imwrite(processed_image_path, image)
            
            # 처리하지 않은 데이터도 리스트에 추가
            processed_data.append({'ID': file_name, 'target': target})
        # 모든 이미지에 대해 전처리 수행
        processed_image = process_image(image_path)
        processed_image_path = os.path.join(output_dir, file_name)
        cv2.imwrite(processed_image_path, processed_image)
        
        # 새로운 CSV 파일에 기록할 데이터 추가
        processed_data.append({'ID': file_name, 'target': target})

    print(f"전처리된 이미지를 {output_dir}에 저장했습니다.")


4. 코드 리팩토링 및 정리
main.py 파일에 약간의 중복 코드와 사용되지 않는 함수가 있습니다. 코드를 정리하면 실수를 줄이고 유지보수성을 높일 수 있습니다.

중복 DataLoader 정의: train_loader와 val_loader가 두 번 정의되어 있습니다. 첫 번째 정의는 sampler를 사용하고, 두 번째 정의는 shuffle=True를 사용하며 num_workers와 prefetch_factor가 다릅니다. 의도한 설정을 하나만 남기고 나머지는 삭제해야 합니다.
사용되지 않는 함수: train_block, just_one_train, n_fold_train 함수들이 정의되어 있지만 실제 if __name__ == "__main__": 블록에서는 사용되지 않습니다. main_test.py의 run_fold처럼 모듈화하려는 의도였다면 구조를 완성하거나, 아니라면 혼동을 피하기 위해 삭제하는 것이 좋습니다.
main.py
-85
+0
    drop_last=False,
    persistent_workers=True,
)
val_loader = DataLoader(
    val_dataset,
    batch_size=cfg["BATCH_SIZE"],
    shuffle=True,
    num_workers=num_workers,
    pin_memory=True,
    drop_last=False,
    persistent_workers=True
)

train_loader = DataLoader(
    train_dataset,
    batch_size=cfg["BATCH_SIZE"],
    shuffle=True,
    num_workers=24,
    pin_memory=True,
    drop_last=False,
    persistent_workers=True,
    prefetch_factor=4,
)
val_loader = DataLoader(
    val_dataset,
    batch_size=cfg["BATCH_SIZE"],
    prefetch_factor=4,
)

def unfreeze(model: nn.Module) -> nn.Parameter:
    # 1. 모델의 모든 파라미터를 우선 동결(freeze)합니다.
    for param in model.parameters():
        param.requires_grad = False

    # 2. 특징 추출기(backbone)의 마지막 n개 블록의 동결을 해제(unfreeze)합니다.
    # efficientnet_b3는 7개의 블록(0~6)을 가집니다.
    for i in range(cfg["num_blocks_to_unfreeze"]):
        for param in model.backbone.blocks[-(i+1)].parameters():
            param.requires_grad = True

    # 3. 분류기(head)의 동결을 해제합니다.
    for param in model.head.parameters():
        param.requires_grad = True
        
    # 4. 학습시킬 파라미터만 필터링하여 옵티마이저에 전달합니다.
    # requires_grad=True인 파라미터만 업데이트됩니다.
    params_to_update = filter(lambda p: p.requires_grad, model.parameters())
    
    return params_to_update


def train_block():
    
    # load model
    model: nn.Module = ModelClass(num_classes=num_classes).to(device)
    
    if cfg["use_unfreeze"]:
        params_to_update = unfreeze(model)
    else:
        params_to_update = model.parameters()
    
    early_stopping = EarlyStopping(patience=cfg["patience"], delta=cfg["delta"], verbose=True, save_path=save_path, mode='max')

    # 손실 함수
    criterion = nn.CrossEntropyLoss()

    # 옵티마이저
    optimizer = get_optimizer(cfg_optimizer["name"], params_to_update, cfg_optimizer["params"])

    # 스케쥴러
    Scheduler = get_scheduler(cfg_scheduler["name"], optimizer, cfg_scheduler['params'])

    # amp를 위한 scaler 준비
    training_args = {}
    if cfg["training_mode"] == 'on_amp':
        training_args['scaler'] = GradScaler()

    model, valid_max_accuracy = training_loop(
        training_fn,
        model, train_loader, val_loader, train_dataset, val_dataset, 
        criterion, optimizer, device, cfg["EPOCHS"], 
        early_stopping, logger, class_names, Scheduler,
        training_args,
        )
    return model, valid_max_accuracy

    
def just_one_train():
    pass


def n_fold_train():
    pass


def setup_optimizer_params(
    model: nn.Module, 

요약
1순위 (성능 향상): Focal Loss를 도입하여 소수 클래스 학습을 강화하세요.
2순위 (성능 향상): K-Fold와 앙상블을 결합하여 모델의 일반화 성능과 안정성을 극대화하세요.
3순위 (오류 수정): 테스트 데이터 전처리 시 target을 사용하는 치명적인 오류를 수정하세요.
4순위 (코드 품질): main.py의 중복 코드를 정리하여 잠재적인 버그를 방지하세요.
이 제안들이 F1 Macro 점수를 한 단계 더 끌어올리는 데 도움이 되기를 바랍니다!



추가로 모델 파라미터 더 잘나오게 설정 해봐줄수있어?


Gemini
안녕하세요! 물론입니다. 현재 main.py에 구현된 차등 학습률(differential learning rate) 설정은 매우 좋은 접근 방식입니다. 여기서 한 걸음 더 나아가, 모델의 성능과 안정성을 높일 수 있는 단계별 파인튜닝(Staged Fine-tuning) 전략과 함께 파라미터 설정 코드를 개선하는 방법을 제안해 드립니다.

핵심 개선 아이디어: 단계별 파인튜닝 (Staged Fine-tuning)
현재는 모델의 헤드와 백본 일부를 처음부터 함께 학습하고 있습니다. 하지만 사전 학습된 백본의 가중치는 매우 유용한 정보를 담고 있는데, 무작위로 초기화된 헤드(head)의 큰 그래디언트 때문에 이 정보가 초반에 손상될 수 있습니다.

단계별 파인튜닝은 이 문제를 해결하는 효과적인 전략입니다.

1단계: 헤드 학습 (Feature Extraction)

백본(backbone)은 완전히 동결(freeze)하고, 새로운 데이터에 맞게 헤드만 학습시킵니다.
이를 통해 백본의 사전 학습된 특징을 그대로 활용하면서, 새로운 클래스를 분류하는 방법만 빠르게 학습합니다.
2단계: 전체 미세 조정 (Fine-tuning)

헤드가 어느 정도 안정되면, 백본의 동결을 일부 또는 전체 해제합니다.
이후, 백본에는 낮은 학습률(learning rate)을, 헤드에는 더 높은 학습률을 적용하는 차등 학습률로 모델 전체를 부드럽게 미세 조정합니다.
이 방식을 통해 학습 안정성을 높이고, 모델이 데이터의 특징을 더 잘 학습하여 최종적으로 더 높은 성능을 기대할 수 있습니다.

💻 코드 개선 제안
위 전략을 적용하기 위해 main.py의 파라미터 설정 함수와 메인 실행 블록을 다음과 같이 개선할 수 있습니다.

1. setup_optimizer_params 함수 개선
기존 함수를 조금 더 유연하고 안정적으로 만들어, 어떤 모델 구조에도 잘 대응하고 단계별 학습을 제어하기 쉽게 변경합니다.

model.neck이 없는 경우에도 오류가 발생하지 않도록 hasattr 체크를 추가합니다.
파라미터 그룹을 생성할 때 빈 그룹이 추가되지 않도록 필터링 로직을 강화합니다.
2. main.py 메인 실행 블록 수정
if __name__ == "__main__": 부분을 2단계 학습 로직으로 변경합니다.

1단계 학습 루프: num_layers_to_unfreeze=0으로 설정하여 헤드만 학습합니다.
2단계 학습 루프: num_layers_to_unfreeze를 설정 파일 값으로 하여 전체를 미세 조정합니다.
각 단계에 맞는 옵티마이저와 스케줄러를 새로 생성합니다.
아래는 위 내용을 반영한 main.py의 전체 코드 변경 제안입니다.

main.py
-79
+98
    drop_last=False,
    persistent_workers=True,
)
val_loader = DataLoader(
    val_dataset,
    batch_size=cfg["BATCH_SIZE"],
    shuffle=True,
    num_workers=24,
    pin_memory=True,
    drop_last=False,
    persistent_workers=True,
    prefetch_factor=4,
)


def setup_optimizer_params(
    model: nn.Module, 
    model_type: str, 
    num_layers_to_unfreeze: int,
    backbone_lr: float,
    head_lr: float,
    use_differential_lr: bool,
):
    """
    모델의 동결/해제 상태를 설정하고, 차등 학습률을 적용할 파라미터 그룹을 생성합니다.
val_loader = DataLoader(
    val_dataset,
    batch_size=cfg["BATCH_SIZE"],
    shuffle=False, # 검증 데이터는 보통 섞지 않습니다.
    num_workers=num_workers,
    pin_memory=True,
    drop_last=False,
)

def setup_optimizer_params(model: nn.Module, model_type: str, num_layers_to_unfreeze: int, backbone_lr: float, head_lr: float, use_differential_lr: bool):
    """
    모델의 동결/해제 상태를 설정하고, 차등 학습률을 적용할 파라미터 그룹을 생성합니다.

    :param model: 설정할 PyTorch 모델 객체
    :param model_type: 모델의 종류 ('resnet', 'efficientnet', 'swin', 'convnext')
    :param backbone_lr: 백본에 적용할 낮은 학습률
    :param head_lr: 넥/헤드에 적용할 높은 학습률
    :return: 옵티마이저에 전달할 파라미터 그룹 리스트
    """
    """    
    # 1. 모델의 모든 파라미터를 우선 동결(freeze)합니다.
    for param in model.parameters():
        param.requires_grad = False
    backbone = model.backbone
    
    stages_to_unfreeze = []
    if model_type_lower.startswith('resnet'):
        all_stages = [backbone.layer1, backbone.layer2, backbone.layer3, backbone.layer4]
        if num_layers_to_unfreeze > len(all_stages):
            num_layers_to_unfreeze = len(all_stages)
        stages_to_unfreeze = all_stages[-num_layers_to_unfreeze:]
    elif model_type_lower.startswith(('efficientnet', 'swin', 'convnext')):
        if model_type_lower.startswith('swin'): all_stages = backbone.layers
        elif model_type_lower.startswith('convnext'): all_stages = backbone.stages
        else: all_stages = backbone.blocks
        num_total_blocks = len(all_stages)
        if num_layers_to_unfreeze > num_total_blocks:
            num_layers_to_unfreeze = num_total_blocks
        stages_to_unfreeze = all_stages[-num_layers_to_unfreeze:]
    else:
        raise ValueError(f"Unsupported model_type: {model_type}.")
    if num_layers_to_unfreeze > 0:
        if model_type_lower.startswith('resnet'):
            all_stages = [backbone.layer1, backbone.layer2, backbone.layer3, backbone.layer4]
            if num_layers_to_unfreeze > len(all_stages):
                num_layers_to_unfreeze = len(all_stages)
            stages_to_unfreeze = all_stages[-num_layers_to_unfreeze:]
        elif model_type_lower.startswith(('efficientnet', 'swin', 'convnext')):
            if model_type_lower.startswith('swin'): all_stages = backbone.layers
            elif model_type_lower.startswith('convnext'): all_stages = backbone.stages
            else: all_stages = backbone.blocks
            num_total_blocks = len(all_stages)
            if num_layers_to_unfreeze > num_total_blocks:
                num_layers_to_unfreeze = num_total_blocks
            stages_to_unfreeze = all_stages[-num_layers_to_unfreeze:]
        else:
            raise ValueError(f"Unsupported model_type: {model_type}.")

    for stage in stages_to_unfreeze:
        for param in stage.parameters():
            
    # 4. 차등 학습률을 적용할 파라미터 그룹 생성 혹은 단일 그룹 생성
    #    requires_grad=True인 파라미터만 필터링하여 각 그룹에 포함시킵니다.
    param_groups = []
    if use_differential_lr:
        param_groups = [
            {
                "params": filter(lambda p: p.requires_grad, model.backbone.parameters()),
                "lr": backbone_lr
            },
            {
                "params": filter(lambda p: p.requires_grad, model.neck.parameters()),
                "lr": head_lr
            },
            {
                "params": filter(lambda p: p.requires_grad, model.head.parameters()),
                "lr": head_lr
            }
        ]
        
        print(f"Unfrozen the last {num_layers_to_unfreeze} backbone layers, neck, and head for model type: {model_type}.")
        backbone_params = list(filter(lambda p: p.requires_grad, model.backbone.parameters()))
        if backbone_params:
            param_groups.append({"params": backbone_params, "lr": backbone_lr})

        if hasattr(model, 'neck'):
            neck_params = list(filter(lambda p: p.requires_grad, model.neck.parameters()))
            if neck_params:
                param_groups.append({"params": neck_params, "lr": head_lr})

        if hasattr(model, 'head'):
            head_params = list(filter(lambda p: p.requires_grad, model.head.parameters()))
            if head_params:
                param_groups.append({"params": head_params, "lr": head_lr})

        print(f"Unfrozen the last {num_layers_to_unfreeze} backbone layers, neck, and head.")
        print(f"Applied differential learning rate: backbone_lr={backbone_lr}, head_lr={head_lr}")
    else:
        # 단일 학습률 그룹 생성 (학습 가능한 모든 파라미터를 하나의 그룹으로 묶음)
        param_groups = filter(lambda p: p.requires_grad, model.parameters())
        print("Differential learning rate disabled. Using a single LR for all trainable parameters.")
        

    return param_groups


if __name__ == "__main__":
    
    save_path = f'{output_root}/checkpoint.pth'
    
    # load model
    # 모델 로드
    model: nn.Module = ModelClass(num_classes=num_classes).to(device)
    
    if cfg["use_unfreeze"]:
    # 손실 함수
    criterion = get_loss(cfg_loss["name"], cfg_loss["params"])

    # --- 1단계: 헤드 학습 ---
    print("\n" + "="*50)
    print("🚀 STARTING STAGE 1: HEAD TRAINING")
    print("="*50)

    # 헤드만 학습하도록 파라미터 설정 (백본 동결)
    head_params = setup_optimizer_params(
        model=model,
        model_type=cfg["model_type"], 
        num_layers_to_unfreeze=0, # 백본은 동결
        backbone_lr=0, # 의미 없음
        head_lr=cfg["head_only_lr"], # 헤드 학습용 LR
        use_differential_lr=False # 모든 학습 파라미터에 단일 LR 적용
    )

    optimizer_head = get_optimizer(cfg_optimizer["name"], head_params, {"lr": cfg["head_only_lr"]})
    scheduler_head = get_scheduler(cfg_scheduler["name"], optimizer_head, cfg_scheduler['params'])
    
    # 1단계 학습 실행
    training_loop(
        training_fn, model, train_loader, val_loader, train_dataset, val_dataset, 
        criterion, optimizer_head, device, cfg["head_only_epochs"], 
        None, logger, class_names, scheduler_head, {}
    )

    # --- 2단계: 전체 미세 조정 ---
    print("\n" + "="*50)
    print("🚀 STARTING STAGE 2: FULL FINE-TUNING")
    print("="*50)

    # 백본 일부와 헤드를 함께 학습하도록 파라미터 설정
    if cfg["use_unfreeze"]:
        params_to_update = setup_optimizer_params(
                                model=model,
                                model_type=cfg["model_type"], 
                                num_layers_to_unfreeze=cfg["num_blocks_to_unfreeze"],
                                backbone_lr=cfg["backbone_lr"],
                                head_lr=cfg_optimizer["params"]["lr"],
                                use_differential_lr=cfg["use_differential_lr"]
                            )
            model=model,
            model_type=cfg["model_type"], 
            num_layers_to_unfreeze=cfg["num_blocks_to_unfreeze"],
            backbone_lr=cfg["backbone_lr"],
            head_lr=cfg_optimizer["params"]["lr"],
            use_differential_lr=cfg["use_differential_lr"]
        )
    else:
        params_to_update = model.parameters()

    # 2단계용 옵티마이저, 스케줄러, EarlyStopping 재설정
    save_path = f'{output_root}/{filename}_checkpoint.pth'
    early_stopping = EarlyStopping(patience=cfg["patience"], delta=cfg["delta"], verbose=True, save_path=save_path, mode='max')

    # 손실 함수
    criterion = get_loss(cfg_loss["name"], cfg_loss["params"])

    # 옵티마이저
    optimizer = get_optimizer(cfg_optimizer["name"], params_to_update, cfg_optimizer["params"])

    # 스케쥴러
    Scheduler = get_scheduler(cfg_scheduler["name"], optimizer, cfg_scheduler['params'])
    
    # amp를 위한 scaler 준비
    scheduler = get_scheduler(cfg_scheduler["name"], optimizer, cfg_scheduler['params'])
    
    training_args = {}
    if cfg["training_mode"] == 'on_amp':
        training_args['scaler'] = GradScaler()

    # 2단계 학습 실행
    model, valid_max_accuracy = training_loop(
        training_fn,
        model, train_loader, val_loader, train_dataset, val_dataset, 
        criterion, optimizer, device, cfg["EPOCHS"], 
        early_stopping, logger, class_names, Scheduler,
        early_stopping, logger, class_names, scheduler,
        training_args,
        )
    )

    print(f"\n🎉 Training Finished! Best validation accuracy: {valid_max_accuracy:.4f}")

⚙️ 설정 파일(main_config.yaml) 추가 항목
이 새로운 학습 전략을 사용하려면 config/main_config.yaml 파일에 1단계 학습을 제어하기 위한 파라미터를 추가해야 합니다.

yaml
# config/main_config.yaml

# ... 기존 설정 ...

# === 단계별 파인튜닝 설정 ===
head_only_epochs: 3       # 1단계(헤드만) 학습 에포크 수
head_only_lr: 0.001       # 1단계 학습 시 사용할 학습률

# === 기존 학습 설정 (2단계용) ===
EPOCHS: 15                # 2단계(전체) 학습 에포크 수
use_unfreeze: True
num_blocks_to_unfreeze: 4
use_differential_lr: True
backbone_lr: 0.00001

optimizer:
  name: "AdamW"
  params:
    lr: 0.0001            # 2단계 학습 시 헤드에 적용될 학습률
    weight_decay: 0.01

# ... 나머지 설정 ...
이처럼 학습 과정을 체계적으로 분리하고 각 단계에 맞는 파라미터를 신중하게 설정하면, 모델의 잠재력을 최대한으로 이끌어내어 더 좋은 결과를 얻으실 수 있을 겁니다.

PyTorch의 OneCycleLR 스케줄러를 단계별 파인튜닝에 적용하는 방법을 알려줘.


Gemini
안녕하세요! OneCycleLR 스케줄러를 단계별 파인튜닝에 적용하는 것은 모델의 수렴 속도를 높이고 최종 성능을 개선하는 데 매우 효과적인 전략입니다.

OneCycleLR의 핵심은 매 배치(step)마다 학습률을 조절한다는 점입니다. 따라서 기존의 에포크(epoch)마다 학습률을 조절하던 방식에서 약간의 코드 수정이 필요합니다.

단계별 파인튜닝에 OneCycleLR을 적용하는 전체 과정을 단계별로 안내해 드리겠습니다.

1. config/main_config.yaml 파일 수정
먼저 설정 파일에서 스케줄러를 OneCycleLR로 변경하고, 관련 하이퍼파라미터를 설정합니다. max_lr과 total_steps는 코드 내에서 동적으로 계산되므로 여기서는 다른 파라미터만 정의합니다.

main_config.yaml
-6
+9
BATCH_SIZE: 16
EPOCHS: 100
EPOCHS: 15 # 2단계(전체) 학습 에포크 수
SEED: 42
head_only_epochs: 3 # 1단계(헤드만) 학습 에포크 수
head_only_lr: 0.001 # 1단계 학습 시 사용할 최대 학습률

use_unfreeze: True
num_blocks_to_unfreeze: 4 # 모델의 피처 부분을 얼마나 더 학습시킬 것인지
model_type: resnet # efficientnet, resnet, convnext, swin
  }

scheduler:
  name: cosine_warm_restart
  name: OneCycleLR # 스케줄러 변경
  params: {
        T_0: 20,
        T_mult: 1,
        eta_min: 0.0000001,
    }
    pct_start: 0.3, # 전체 스텝 중 학습률이 올라가는 구간의 비율
    div_factor: 25, # 초기 학습률 = max_lr / div_factor
    final_div_factor: 1.0e+4, # 마지막 학습률 = 초기 학습률 / final_div_factor
  }

loss:
  name: FocalLoss

2. utils/scheduler_factory.py 파일 수정
get_scheduler 함수가 OneCycleLR을 생성할 수 있도록 레지스트리에 추가합니다.

(해당 파일이 제공되지 않아, 일반적인 구조를 가정하여 작성했습니다. 기존 코드에 맞게 수정해주세요.)

scheduler_factory.py
-0
+11
The code change produced by Gemini cannot be automatically applied. You can manually apply the change or ask Gemini to try again.
from typing import Iterable

from torch.optim import Adam, AdamW, SGD
from torch import optim
from torch.nn import Module, Parameter
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, OneCycleLR


OPTIMIZER_REGISTRY: dict[str, optim.Optimizer] = {
    "Adam": Adam,
    "AdamW": AdamW,
    "SGD": SGD,
}
SCHEDULER_REGISTRY: dict[str, optim.lr_scheduler._LRScheduler] = {
    "cosine_warm_restart": CosineAnnealingWarmRestarts,
    "OneCycleLR": OneCycleLR,
}

def get_optimizer(name: str, params_to_update: Iterable[Parameter], params: dict) -> optim.Optimizer:
        raise ValueError(f"Unknown optimizer: {name}")
    return OPTIMIZER_REGISTRY[name](params_to_update, **params)

def get_scheduler(name: str, optimizer: optim.Optimizer, params: dict) -> optim.lr_scheduler._LRScheduler:
    if name not in SCHEDULER_REGISTRY:
        raise ValueError(f"Unknown scheduler: {name}")
    # OneCycleLR은 epoch마다 step하는 다른 스케줄러와 달리, 매 step마다 호출되어야 합니다.
    # trainer.py의 training_loop 내에서 scheduler.step()이 배치(batch)마다 호출되도록 수정해야 합니다.
    return SCHEDULER_REGISTRY[name](optimizer, **params)

if __name__ == '__main__':
    from torch import nn

3. main.py 파일 수정
메인 스크립트에서 각 학습 단계에 맞게 OneCycleLR 스케줄러를 동적으로 생성하도록 수정합니다.

1단계: head_only_lr을 max_lr로 사용합니다.
2단계: 차등 학습률을 위해 max_lr을 리스트([backbone_lr, head_lr, ...]) 형태로 전달합니다.
main.py
-3
+16
        use_differential_lr=False # 모든 학습 파라미터에 단일 LR 적용
    )

    # 1단계용 OneCycleLR 스케줄러 설정
    scheduler_params_stage1 = cfg_scheduler["params"].copy()
    scheduler_params_stage1['max_lr'] = cfg["head_only_lr"]
    scheduler_params_stage1['total_steps'] = len(train_loader) * cfg["head_only_epochs"]
    if scheduler_params_stage1['total_steps'] == 0: scheduler_params_stage1['total_steps'] = 1 # epochs=0일때 에러 방지

    optimizer_head = get_optimizer(cfg_optimizer["name"], head_params, {"lr": cfg["head_only_lr"]})
    scheduler_head = get_scheduler(cfg_scheduler["name"], optimizer_head, cfg_scheduler['params'])
    scheduler_head = get_scheduler(cfg_scheduler["name"], optimizer_head, scheduler_params_stage1)
    
    # 1단계 학습 실행
    training_loop(
        training_fn, model, train_loader, val_loader, train_dataset, val_dataset, 
        criterion, optimizer_head, device, cfg["head_only_epochs"], 
        criterion, optimizer_head, device, cfg["head_only_epochs"],
        None, logger, class_names, scheduler_head, {}
    )


    # 2단계용 옵티마이저, 스케줄러, EarlyStopping 재설정
    save_path = f'{output_root}/{filename}_checkpoint.pth'
    early_stopping = EarlyStopping(patience=cfg["patience"], delta=cfg["delta"], verbose=True, save_path=save_path, mode='max')
    optimizer = get_optimizer(cfg_optimizer["name"], params_to_update, cfg_optimizer["params"])
    scheduler = get_scheduler(cfg_scheduler["name"], optimizer, cfg_scheduler['params'])
    
    # 2단계용 OneCycleLR 스케줄러 설정
    scheduler_params_stage2 = cfg_scheduler["params"].copy()
    # 차등 학습률을 위해 max_lr을 리스트로 구성
    max_lrs = [pg.get('lr', cfg_optimizer['params']['lr']) for pg in optimizer.param_groups]
    scheduler_params_stage2['max_lr'] = max_lrs
    scheduler_params_stage2['total_steps'] = len(train_loader) * cfg["EPOCHS"]
    scheduler = get_scheduler(cfg_scheduler["name"], optimizer, scheduler_params_stage2)

    training_args = {}
    if cfg["training_mode"] == 'on_amp':
        training_args['scaler'] = GradScaler()

4. trainer 모듈 수정 (가장 중요)
OneCycleLR은 매 배치(step)마다 scheduler.step()을 호출해야 합니다. 현재 training_loop는 에포크(epoch)가 끝날 때 스케줄러를 업데이트할 가능성이 높습니다. training_loop 함수 내부의 학습 루프를 다음과 같이 수정해야 합니다.

(파일이 제공되지 않아, trainer/trainer.py에 training_loop가 있다고 가정하고 수정안을 제시합니다.)

trainer.py
-5
+10
The code change produced by Gemini cannot be automatically applied. You can manually apply the change or ask Gemini to try again.
# 이 파일은 가상의 trainer/trainer.py 입니다. 실제 파일 구조에 맞게 수정하세요.
from tqdm import tqdm
import torch

def training_loop(
    training_fn,
            # ... (기존 학습 코드: loss 계산, backpropagation 등)
            optimizer.step()

            # ⚠️ 중요: OneCycleLR을 위해 스케줄러 업데이트를 배치 루프 안으로 이동
            if scheduler is not None:
                # OneCycleLR은 배치마다 step을 호출합니다.
                if isinstance(scheduler, torch.optim.lr_scheduler.OneCycleLR):
                    scheduler.step()

        # --- Epoch 종료 ---
        # ⚠️ 기존 코드에서는 여기에 스케줄러 업데이트가 있었을 수 있습니다.
        # if scheduler is not None:
        #     scheduler.step()

        # ... (기존 검증 루프 및 로깅 코드)
        # OneCycleLR이 아닌 다른 스케줄러(예: CosineAnnealing)를 위한 로직
        if scheduler is not None and not isinstance(scheduler, torch.optim.lr_scheduler.OneCycleLR):
            scheduler.step()

    # ... (나머지 코드)
    return model, best_score

핵심 변경 사항:

scheduler.step() 호출을 에포크 루프의 끝에서 배치(학습) 루프의 끝으로 이동시켰습니다.
isinstance를 사용하여 스케줄러가 OneCycleLR일 때만 배치마다 업데이트하고, 다른 스케줄러는 기존처럼 에포크마다 업데이트하도록 하여 호환성을 유지했습니다.
이 변경 사항들을 적용하면, 단계별 파인튜닝의 각 단계에서 OneCycleLR 스케줄러가 최적의 학습률을 찾아내어 더 빠르고 안정적으로 모델을 학습시킬 수 있습니다.

그리고 너가 말해줬던 K-Fold 교차 검증과 앙상블의 결합 이것도 한번 적용시켜줬으면 좋겠어


Gemini
물론입니다! K-Fold 교차 검증과 앙상블을 결합하는 것은 모델의 성능과 안정성을 극대화하는 매우 강력한 전략입니다. 기존의 main_test.py와 inference_ensemble.py를 하나로 합치고, 전체 과정을 체계적으로 관리할 수 있는 새로운 파이프라인을 구성해 드리겠습니다.

새로운 워크플로우: run_kfold_ensemble.py
이 새로운 접근 방식의 핵심은 하나의 스크립트와 하나의 설정 파일로 전체 과정을 제어하는 것입니다.

중앙 설정 파일 (kfold_ensemble_config.yaml): 어떤 모델들을, 몇 개의 Fold로, 어떤 가중치로 앙상블할지 모든 것을 여기서 정의합니다.
통합 실행 스크립트 (run_kfold_ensemble.py): 이 스크립트는 두 가지 모드로 동작합니다.
train 모드: 설정 파일에 정의된 모든 모델에 대해 K-Fold 교차 검증 학습을 실행하고, 각 Fold의 베스트 모델을 체크포인트로 저장합니다. (예: 3개 모델, 5-Fold -> 15개 체크포인트 생성)
inference 모드: train 모드에서 생성된 모든 체크포인트를 자동으로 불러와 가중 평균 앙상블 및 TTA(Test Time Augmentation)를 적용하여 최종 예측 결과를 생성합니다.
1. 스케줄러 팩토리 수정
먼저, OneCycleLR 스케줄러를 사용할 수 있도록 utils/scheduler_factory.py에 추가합니다.

scheduler_factory.py
-1
+2
from torch.optim.lr_scheduler import StepLR, MultiStepLR, CosineAnnealingLR, ReduceLROnPlateau, CosineAnnealingWarmRestarts
from torch.optim.lr_scheduler import StepLR, MultiStepLR, CosineAnnealingLR, ReduceLROnPlateau, CosineAnnealingWarmRestarts, OneCycleLR
from torch import optim


SCHEDULER_REGISTRY: dict[str, type] = {
    "step": StepLR,
    "multistep": MultiStepLR,
    "cosine": CosineAnnealingLR,
    "plateau": ReduceLROnPlateau,
    "cosine_warm_restart": CosineAnnealingWarmRestarts,
    "OneCycleLR": OneCycleLR,
}

def get_scheduler(name: str, optimizer: optim.Optimizer, params: dict) -> optim.lr_scheduler._LRScheduler:

2. 새로운 중앙 설정 파일 생성
프로젝트의 루트 config 폴더에 아래 내용으로 kfold_ensemble_config.yaml 파일을 새로 생성합니다. 이 파일이 전체 파이프라인의 "조종석" 역할을 합니다.

New file: kfold_ensemble_config.yaml
+34
# Global settings for K-Fold and Ensemble
SEED: 42
N_SPLITS: 5 # K-Fold의 K 값
DATA_PATH: './data'
OUTPUT_ROOT: './output'
NUM_CLASSES: 17

# 앙상블에 사용할 모델 리스트
# 각 모델은 고유한 학습 설정(config_path)을 가질 수 있습니다.
MODELS:
  - # 모델 A: ResNeSt
    config_path: "config/main_config.yaml" # 이 모델에 대한 학습 설정 파일
    weight: 0.4 # 앙상블 가중치

  - # 모델 B: EfficientNet (가정)
    # main_config.yaml을 복사하여 MODEL 이름만 바꿔서 사용 가능
    config_path: "config/effnet_config.yaml"
    weight: 0.3

  - # 모델 C: ConvNeXt (가정)
    # main_config.yaml을 복사하여 MODEL 이름만 바꿔서 사용 가능
    config_path: "config/convnext_config.yaml"
    weight: 0.3

# 추론 시 설정
INFERENCE:
  batch_size: 16
  use_tta: True
  # TTA에 사용할 변환 리스트 (torchvision.transforms 형식)
  tta_transforms:
    - name: "RandomHorizontalFlip"
      params: { p: 1.0 }
    - name: "RandomAffine"
      params: { degrees: 10,


지금 너가 개선해준부분 전부 변경했는데 이러면 어떤 py파일부터 순차적으로 실행해야할까?


Gemini
네, 그럼요! 지금까지 제안해 드린 모든 개선 사항(단계별 파인튜닝, K-Fold, 앙상블)을 통합하여, 이제 어떤 파일을 어떻게 실행해야 하는지 명확하게 정리해 드리겠습니다.

가장 큰 변화는 여러 스크립트를 따로 실행할 필요 없이, 단 하나의 통합 스크립트(run_kfold_ensemble.py)로 전체 학습과 추론 과정을 관리하게 되었다는 점입니다.

새로운 실행 순서 요약
이제 복잡한 과정은 잊으셔도 됩니다. 실행 순서는 아래와 같이 매우 간단해집니다.

(선택 사항) 테스트 데이터 전처리: test_data_preprocess.py를 실행하여 테스트 이미지를 미리 처리합니다.
학습 실행: run_kfold_ensemble.py를 train 모드로 실행하여 설정 파일에 정의된 모든 모델을 K-Fold로 학습시킵니다.
추론 실행: run_kfold_ensemble.py를 inference 모드로 실행하여 학습된 모든 모델을 자동으로 불러와 앙상블 추론을 수행하고 최종 제출 파일을 생성합니다.
⚙️ 1단계: kfold_ensemble_config.yaml 설정하기
먼저, 프로젝트의 "조종석" 역할을 할 설정 파일을 완성해야 합니다. 이 파일 하나로 어떤 모델을, 몇 Fold로, 어떻게 앙상블할지 모두 제어합니다.

config/kfold_ensemble_config.yaml 파일을 아래 내용으로 생성하거나 수정해 주세요.

kfold_ensemble_config.yaml
-12
+18
# Global settings for K-Fold and Ensemble
SEED: 42
N_SPLITS: 5 # K-Fold의 K 값
DATA_PATH: './data'
DATA_ROOT: './data'
OUTPUT_ROOT: './output'
NUM_CLASSES: 17

# 앙상블에 사용할 모델 리스트
# 각 모델은 고유한 학습 설정(config_path)을 가질 수 있습니다.
MODELS:
  - # 모델 A: ResNeSt
  - # --- 모델 A: ResNeSt101e ---
    model_name: "ResNeSt101e" # 체크포인트 파일명에 사용될 고유 이름
    config_path: "config/main_config.yaml" # 이 모델에 대한 학습 설정 파일
    weight: 0.4 # 앙상블 가중치

  - # 모델 B: EfficientNet (가정)
    # main_config.yaml을 복사하여 MODEL 이름만 바꿔서 사용 가능
    config_path: "config/effnet_config.yaml"
  - # --- 모델 B: EfficientNetV2 (가정) ---
    model_name: "EfficientNetV2"
    # main_config.yaml을 복사하여 MODEL, model_type 등만 바꿔서 사용
    config_path: "config/effnet_config.yaml" # 예시 파일명
    weight: 0.3

  - # 모델 C: ConvNeXt (가정)
    # main_config.yaml을 복사하여 MODEL 이름만 바꿔서 사용 가능
    config_path: "config/convnext_config.yaml"
  - # --- 모델 C: ConvNeXt (가정) ---
    model_name: "ConvNeXt"
    # main_config.yaml을 복사하여 MODEL, model_type 등만 바꿔서 사용
    config_path: "config/convnext_config.yaml" # 예시 파일명
    weight: 0.3

# 추론 시 설정
INFERENCE:
  batch_size: 16
  use_tta: True
  submission_filename: "submission_ensemble.csv"
  # TTA에 사용할 변환 리스트 (torchvision.transforms 형식)
  tta_transforms:
    - name: "RandomHorizontalFlip"
      params: { p: 1.0 }
    - name: "RandomAffine"
      params: { degrees: 10,
    - name: "HorizontalFlip"
      params: { p: 1.0 } # torchvision.transforms.v2
    - name: "RandomRotation"
      params: { degrees: 15 }
    - name: "RandomRotation"
      params: { degrees: -15 }

중요: MODELS 리스트에 있는 각 모델의 config_path (예: config/effnet_config.yaml)는 실제로 존재하는 파일이어야 합니다. main_config.yaml을 복사하여 MODEL, model_type 등만 수정해서 여러 개를 만들어두시면 됩니다.

🖥️ 2단계: 통합 실행 스크립트 run_kfold_ensemble.py 생성
기존의 main.py, main_test.py, inference_ensemble.py의 핵심 기능들을 모두 합친 통합 스크립트입니다. 프로젝트 루트에 아래 내용으로 run_kfold_ensemble.py 파일을 새로 생성해 주세요.

New file: run_kfold_ensemble.py
+309
The code change produced by Gemini cannot be automatically applied. You can manually apply the change or ask Gemini to try again.
import os
import gc
import glob
import argparse
import datetime

import torch
import pandas as pd
import numpy as np
import torch.nn as nn
from torch.cuda.amp import GradScaler
from torch.utils.data import DataLoader
from sklearn.model_selection import StratifiedKFold
from tqdm import tqdm

from config.config import load_config
from utils.utils import *
from datasets.transforms import build_unified_transforms, build_tta_transforms

from datasets import get_dataset
from models import get_model

from utils.EarlyStopping import EarlyStopping
from utils.scheduler_factory import get_scheduler
from utils.optimizer_factory import get_optimizer
from utils.loss_factory import get_loss
from trainer import *
from trainer.wandb_logger import WandbLogger
from utils.predict_tta import get_tta_predictions

# --- 유틸리티 함수 (main.py에서 가져옴) ---

def setup_optimizer_params(model: nn.Module, model_type: str, num_layers_to_unfreeze: int, backbone_lr: float, head_lr: float, use_differential_lr: bool):
    for param in model.parameters():
        param.requires_grad = False

    model_type_lower = model_type.lower()
    backbone = model.backbone
    
    stages_to_unfreeze = []
    if num_layers_to_unfreeze > 0:
        if model_type_lower.startswith('resnet'):
            all_stages = [backbone.layer1, backbone.layer2, backbone.layer3, backbone.layer4]
        elif model_type_lower.startswith('swin'): all_stages = backbone.layers
        elif model_type_lower.startswith('convnext'): all_stages = backbone.stages
        else: all_stages = backbone.blocks # efficientnet
        
        if num_layers_to_unfreeze > len(all_stages): num_layers_to_unfreeze = len(all_stages)
        stages_to_unfreeze = all_stages[-num_layers_to_unfreeze:]

    for stage in stages_to_unfreeze:
        for param in stage.parameters():
            param.requires_grad = True

    if hasattr(model, 'neck'):
        for param in model.neck.parameters(): param.requires_grad = True
    if hasattr(model, 'head'):
        for param in model.head.parameters(): param.requires_grad = True
            
    param_groups = []
    if use_differential_lr:
        backbone_params = list(filter(lambda p: p.requires_grad, model.backbone.parameters()))
        if backbone_params: param_groups.append({"params": backbone_params, "lr": backbone_lr})

        if hasattr(model, 'neck'):
            neck_params = list(filter(lambda p: p.requires_grad, model.neck.parameters()))
            if neck_params: param_groups.append({"params": neck_params, "lr": head_lr})

        if hasattr(model, 'head'):
            head_params = list(filter(lambda p: p.requires_grad, model.head.parameters()))
            if head_params: param_groups.append({"params": head_params, "lr": head_lr})
    else:
        param_groups = filter(lambda p: p.requires_grad, model.parameters())

    return param_groups

# --- 학습 파이프라인 ---

def run_fold_training(
    fold: int,
    model_config: dict,
    global_config: dict,
    train_df: pd.DataFrame,
    val_df: pd.DataFrame,
    device: torch.device
):
    # --- 설정 불러오기 ---
    cfg = load_config(model_config['config_path'])
    model_name = model_config['model_name']
    
    print(f"\n===== FOLD {fold+1}/{global_config['N_SPLITS']} | MODEL: {model_name} | CONFIG: {model_config['config_path']} =====")
    
    set_seed(global_config['SEED'] + fold) # Fold마다 다른 시드 적용

    # --- 데이터셋 및 로더 ---
    train_transform, val_transform = build_unified_transforms(cfg["transforms"]["train"]), build_unified_transforms(cfg["transforms"]["val"])
    DatasetClass = get_dataset(cfg['DATASET'])
    train_dataset = DatasetClass(df=train_df, data_root=f"{global_config['DATA_ROOT']}/train/", transform=train_transform)
    val_dataset = DatasetClass(df=val_df, data_root=f"{global_config['DATA_ROOT']}/train/", transform=val_transform)
    
    sampler = setting_sampler(train_df)
    train_loader = DataLoader(train_dataset, batch_size=cfg["BATCH_SIZE"], sampler=sampler, num_workers=os.cpu_count()//2, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=cfg["BATCH_SIZE"], shuffle=False, num_workers=os.cpu_count()//2, pin_memory=True)

    # --- 모델, 손실함수, 로거 ---
    ModelClass = get_model(cfg['MODEL'])
    model = ModelClass(num_classes=global_config['NUM_CLASSES']).to(device)
    criterion = get_loss(cfg["loss"]["name"], cfg["loss"]["params"])
    
    date = datetime.datetime.now().strftime("%Y%m%d-%H%M")
    run_name = f"{model_name}_fold{fold+1}_{date}"
    save_path = f"{global_config['OUTPUT_ROOT']}/{run_name}_checkpoint.pth"
    
    logger = WandbLogger(
        project_name="DTC-KFold-Ensemble",
        run_name=run_name,
        config=cfg,
        save_path=save_path
    )

    # --- 1단계: 헤드 학습 ---
    print("\n--- STAGE 1: HEAD TRAINING ---")
    if cfg['head_only_epochs'] > 0:
        head_params = setup_optimizer_params(model, cfg["model_type"], 0, 0, cfg["head_only_lr"], False)
        optimizer_head = get_optimizer(cfg["optimizer"]["name"], head_params, {"lr": cfg["head_only_lr"]})
        
        scheduler_params_s1 = cfg["scheduler"]["params"].copy()
        scheduler_params_s1['max_lr'] = cfg["head_only_lr"]
        scheduler_params_s1['total_steps'] = len(train_loader) * cfg["head_only_epochs"]
        scheduler_head = get_scheduler(cfg["scheduler"]["name"], optimizer_head, scheduler_params_s1)
        
        training_loop(
            TRAINING_REGISTRY[cfg['training_mode']], model, train_loader, val_loader, train_dataset, val_dataset, 
            criterion, optimizer_head, device, cfg["head_only_epochs"], None, logger, None, scheduler_head, {}
        )

    # --- 2단계: 전체 미세 조정 ---
    print("\n--- STAGE 2: FULL FINE-TUNING ---")
    params_to_update = setup_optimizer_params(
        model, cfg["model_type"], cfg["num_blocks_to_unfreeze"],
        cfg["backbone_lr"], cfg["optimizer"]["params"]["lr"], cfg["use_differential_lr"]
    )
    optimizer = get_optimizer(cfg["optimizer"]["name"], params_to_update, cfg["optimizer"]["params"])
    
    scheduler_params_s2 = cfg["scheduler"]["params"].copy()
    max_lrs = [pg.get('lr', cfg['optimizer']['params']['lr']) for pg in optimizer.param_groups]
    scheduler_params_s2['max_lr'] = max_lrs
    scheduler_params_s2['total_steps'] = len(train_loader) * cfg["EPOCHS"]
    scheduler = get_scheduler(cfg["scheduler"]["name"], optimizer, scheduler_params_s2)

    early_stopping = EarlyStopping(patience=cfg["patience"], delta=cfg["delta"], verbose=True, save_path=save_path, mode='max')
    training_args = {'scaler': GradScaler()} if cfg["training_mode"] == 'on_amp' else {}

    training_loop(
        TRAINING_REGISTRY[cfg['training_mode']], model, train_loader, val_loader, train_dataset, val_dataset, 
        criterion, optimizer, device, cfg["EPOCHS"], early_stopping, logger, None, scheduler, training_args
    )

    # 메모리 정리
    del model, optimizer, scheduler, train_loader, val_loader
    gc.collect()
    torch.cuda.empty_cache()

def train_pipeline(global_config):
    print("🚀 STARTING K-FOLD TRAINING PIPELINE 🚀")
    
    # 전체 학습 데이터 로드
    full_train_df = pd.read_csv(f"{global_config['DATA_ROOT']}/train_valid_set/train-label-fix-v1.csv")
    
    # K-Fold 분할기 설정
    skf = StratifiedKFold(n_splits=global_config['N_SPLITS'], shuffle=True, random_state=global_config['SEED'])

    for model_cfg in global_config['MODELS']:
        # K-Fold 루프 실행
        for fold, (train_idx, val_idx) in enumerate(skf.split(full_train_df, full_train_df['target'])):
            train_df = full_train_df.iloc[train_idx]
            val_df = full_train_df.iloc[val_idx]
            
            run_fold_training(
                fold=fold,
                model_config=model_cfg,
                global_config=global_config,
                train_df=train_df,
                val_df=val_df,
                device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
            )
    print("\n🎉 ALL K-FOLD TRAINING FINISHED! 🎉")


# --- 추론 파이프라인 ---

def inference_pipeline(global_config):
    print("🚀 STARTING ENSEMBLE INFERENCE PIPELINE 🚀")
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # --- 테스트 데이터셋 준비 ---
    # test_data_preprocess.py를 실행했다면 test_modify 폴더 사용
    test_dir = f"{global_config['DATA_ROOT']}/test_modify"
    if not os.path.exists(test_dir):
        test_dir = f"{global_config['DATA_ROOT']}/test"
        print(f"'{test_dir}' not found. Using original test directory: '{test_dir}'")

    submission_df = pd.read_csv(f"{global_config['DATA_ROOT']}/sample_submission.csv")
    
    # 추론에는 간단한 리사이즈와 텐서 변환만 사용
    _, test_transform = build_unified_transforms(load_config(global_config['MODELS'][0]['config_path'])["transforms"]["val"])
    
    DatasetClass = get_dataset('BaseDataset') # 추론 시에는 간단한 데이터셋 사용
    test_dataset = DatasetClass(df=submission_df, data_root=test_dir, transform=test_transform)
    test_loader = DataLoader(test_dataset, batch_size=global_config['INFERENCE']['batch_size'], shuffle=False, num_workers=os.cpu_count()//2)

    # --- TTA 변환 준비 ---
    tta_transforms = None
    if global_config['INFERENCE']['use_tta']:
        tta_transforms = build_tta_transforms(global_config['INFERENCE']['tta_transforms'])
        print(f"TTA enabled with {len(tta_transforms)} transforms.")

    # --- 앙상블 추론 ---
    all_model_probs = []
    model_weights = []

    for model_cfg in global_config['MODELS']:
        model_name = model_cfg['model_name']
        cfg = load_config(model_cfg['config_path'])
        
        # 해당 모델의 모든 Fold 체크포인트 경로 찾기
        fold_checkpoints = sorted(glob.glob(f"{global_config['OUTPUT_ROOT']}/{model_name}_fold*_checkpoint.pth"))
        if not fold_checkpoints:
            print(f"⚠️ WARNING: No checkpoints found for model '{model_name}'. Skipping.")
            continue
        
        print(f"\n--- Inferencing with {model_name} ({len(fold_checkpoints)} folds) ---")
        model_weights.append(model_cfg['weight'])
        
        model_fold_probs = []
        ModelClass = get_model(cfg['MODEL'])

        for fold_path in fold_checkpoints:
            print(f"Loading: {os.path.basename(fold_path)}")
            model = ModelClass(num_classes=global_config['NUM_CLASSES']).to(device)
            model.load_state_dict(torch.load(fold_path, map_location=device))
            model.eval()

            with torch.no_grad():
                fold_preds = []
                for images, _, _ in tqdm(test_loader, desc=f"Fold {os.path.basename(fold_path)}", leave=False):
                    images = images.to(device)
                    
                    if tta_transforms:
                        # TTA 적용
                        avg_probs = get_tta_predictions(model, images, tta_transforms, device)
                        fold_preds.append(avg_probs)
                    else:
                        # TTA 미적용
                        outputs = model(images)
                        probs = torch.softmax(outputs, dim=1)
                        fold_preds.append(probs.cpu())

            model_fold_probs.append(torch.cat(fold_preds, dim=0))
            
            del model
            gc.collect()
            torch.cuda.empty_cache()

        # 각 Fold의 예측 확률을 평균
        avg_model_probs = torch.stack(model_fold_probs, dim=0).mean(dim=0)
        all_model_probs.append(avg_model_probs)

    if not all_model_probs:
        print("❌ ERROR: No models were successfully inferenced. Aborting.")
        return

    # --- 가중 앙상블 ---
    print("\n--- Performing weighted ensemble ---")
    # 가중치 정규화
    total_weight = sum(model_weights)
    normalized_weights = [w / total_weight for w in model_weights]

    final_probs = torch.zeros_like(all_model_probs[0])
    for probs, weight in zip(all_model_probs, normalized_weights):
        final_probs += probs * weight

    final_predictions = torch.argmax(final_probs, dim=1).numpy()

    # --- 제출 파일 생성 ---
    submission_df['target'] = final_predictions
    submission_path = os.path.join(global_config['OUTPUT_ROOT'], global_config['INFERENCE']['submission_filename'])
    submission_df.to_csv(submission_path, index=False)
    print(f"\n🎉 Inference finished! Submission file saved to: {submission_path} 🎉")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="K-Fold Ensemble Training and Inference Pipeline")
    parser.add_argument('--mode', type=str, required=True, choices=['train', 'inference'],
                        help="Set the pipeline to 'train' or 'inference' mode.")
    args = parser.parse_args()

    # 중앙 설정 파일 로드
    global_config = load_config("config/kfold_ensemble_config.yaml")
    
    # 출력 폴더 생성
    os.makedirs(global_config['OUTPUT_ROOT'], exist_ok=True)

    if args.mode == 'train':
        train_pipeline(global_config)
    elif args.mode == 'inference':
        inference_pipeline(global_config)

```

---

### 🚀 3단계: 스크립트 실행하기

이제 모든 준비가 끝났습니다. 터미널에서 아래 명령어를 순서대로 실행하시면 됩니다.

#### **1. (선택) 테스트 데이터 전처리**
만약 `test_data_preprocess.py`를 사용하신다면, 가장 먼저 실행해 주세요.

```bash
python test_data_preprocess.py
이 스크립트는 ./data/test 이미지를 처리하여 ./data/test_modify 폴더에 저장합니다. run_kfold_ensemble.py는 이 폴더를 우선적으로 사용합니다.

2. K-Fold 학습 시작
아래 명령어로 train 모드를 실행합니다. 스크립트는 kfold_ensemble_config.yaml을 읽어 정의된 모든 모델에 대해 K-Fold 학습을 순차적으로 진행하고, 각 Fold의 베스트 모델을 ./output 폴더에 저장합니다.

bash
python run_kfold_ensemble.py --mode train
3. 앙상블 추론 및 제출 파일 생성
학습이 모두 끝나면, 아래 명령어로 inference 모드를 실행합니다. 스크립트가 ./output 폴더에서 학습된 모든 체크포인트를 자동으로 찾아 TTA와 가중 앙상블을 적용한 후, 최종 제출 파일(submission_ensemble.csv)을 생성합니다.

bash
python run_kfold_ensemble.py --mode inference
이제 이 새로운 파이프라인을 통해 훨씬 더 체계적이고 재현 가능하게 실험을 관리하실 수 있습니다. 성공적인 결과 있기를 바랍니다!

너가 위에 말했던것들 전부 다시 반영해서 코드 만들어줘


Gemini
네, 알겠습니다! 지금까지 논의했던 모든 개선 사항—단계별 파인튜닝, K-Fold 교차 검증, TTA, 그리고 가중 앙상블—을 모두 통합한 최종 코드를 다시 만들어 드리겠습니다.

여러 스크립트를 복잡하게 실행할 필요 없이, 단 하나의 통합 스크립트(run_kfold_ensemble.py)와 설정 파일(kfold_ensemble_config.yaml)로 전체 파이프라인을 제어하도록 구성했습니다.

새로운 워크플로우 요약
중앙 설정 (kfold_ensemble_config.yaml): 앙상블에 사용할 모든 모델, K-Fold 설정, TTA, 가중치 등을 이 파일 하나로 관리합니다.
통합 실행 (run_kfold_ensemble.py): 이 스크립트를 --mode 옵션과 함께 실행하여 학습과 추론을 모두 수행합니다.
--mode train: 설정 파일에 정의된 모든 모델에 대해 K-Fold 학습을 실행하고, 각 Fold의 베스트 모델을 체크포인트로 저장합니다.
--mode inference: 저장된 모든 체크포인트를 자동으로 불러와 TTA와 가중 앙상블을 적용하여 최종 제출 파일을 생성합니다.
1. 중앙 설정 파일 (kfold_ensemble_config.yaml)
먼저, 프로젝트의 "조종석" 역할을 할 설정 파일입니다. 제공해주신 내용을 기반으로 최종 버전을 다시 확인합니다. 이 파일은 config/ 폴더에 위치해야 합니다.

yaml
# /root/DTC/upstageailab-cv-classification-cv_6/config/kfold_ensemble_config.yaml

# Global settings for K-Fold and Ensemble
SEED: 42
N_SPLITS: 5 # K-Fold의 K 값
DATA_ROOT: './data'
OUTPUT_ROOT: './output'
NUM_CLASSES: 17

# 앙상블에 사용할 모델 리스트
# 각 모델은 고유한 학습 설정(config_path)을 가질 수 있습니다.
MODELS:
  - # --- 모델 A: ResNeSt101e ---
    model_name: "ResNeSt101e" # 체크포인트 파일명에 사용될 고유 이름
    config_path: "config/main_config.yaml" # 이 모델에 대한 학습 설정 파일
    weight: 0.4 # 앙상블 가중치

  - # --- 모델 B: EfficientNetV2 (가정) ---
    model_name: "EfficientNetV2"
    # main_config.yaml을 복사하여 MODEL, model_type 등만 바꿔서 사용
    config_path: "config/effnet_config.yaml" # 예시 파일명
    weight: 0.3

  - # --- 모델 C: ConvNeXt (가정) ---
    model_name: "ConvNeXt"
    # main_config.yaml을 복사하여 MODEL, model_type 등만 바꿔서 사용
    config_path: "config/convnext_config.yaml" # 예시 파일명
    weight: 0.3

# 추론 시 설정
INFERENCE:
  batch_size: 16
  use_tta: True
  submission_filename: "submission_ensemble.csv"
  # TTA에 사용할 변환 리스트 (torchvision.transforms.v2 형식)
  tta_transforms:
    - name: "HorizontalFlip"
      params: { p: 1.0 }
    - name: "RandomRotation"
      params: { degrees: 15 }
    - name: "RandomRotation"
      params: { degrees: -15 }
중요: MODELS 리스트에 있는 각 모델의 config_path (예: config/effnet_config.yaml)는 실제로 존재하는 파일이어야 합니다. main_config.yaml을 복사하여 MODEL, model_type 등만 수정해서 여러 개를 만들어두시면 됩니다.

2. 통합 실행 스크립트 (run_kfold_ensemble.py)
기존의 main.py, main_test.py, inference_ensemble.py의 핵심 기능들을 모두 합친 통합 스크립트입니다. 프로젝트 루트 폴더에 아래 내용으로 run_kfold_ensemble.py 파일을 새로 생성해 주세요.

New file: run_kfold_ensemble.py
+309
import os
import gc
import glob
import argparse
import datetime

import torch
import pandas as pd
import numpy as np
import torch.nn as nn
from torch.cuda.amp import GradScaler
from torch.utils.data import DataLoader
from sklearn.model_selection import StratifiedKFold
from tqdm import tqdm

from config.config import load_config
from utils.utils import *
from datasets.transforms import build_unified_transforms, build_tta_transforms

from datasets import get_dataset
from models import get_model

from utils.EarlyStopping import EarlyStopping
from utils.scheduler_factory import get_scheduler
from utils.optimizer_factory import get_optimizer
from utils.loss_factory import get_loss
from trainer import *
from trainer.wandb_logger import WandbLogger
from utils.predict_tta import get_tta_predictions

# --- 유틸리티 함수 (main.py에서 가져옴) ---

def setup_optimizer_params(model: nn.Module, model_type: str, num_layers_to_unfreeze: int, backbone_lr: float, head_lr: float, use_differential_lr: bool):
    for param in model.parameters():
        param.requires_grad = False

    model_type_lower = model_type.lower()
    backbone = model.backbone
    
    stages_to_unfreeze = []
    if num_layers_to_unfreeze > 0:
        if model_type_lower.startswith('resnet'):
            all_stages = [backbone.layer1, backbone.layer2, backbone.layer3, backbone.layer4]
        elif model_type_lower.startswith('swin'): all_stages = backbone.layers
        elif model_type_lower.startswith('convnext'): all_stages = backbone.stages
        else: all_stages = backbone.blocks # efficientnet
        
        if num_layers_to_unfreeze > len(all_stages): num_layers_to_unfreeze = len(all_stages)
        stages_to_unfreeze = all_stages[-num_layers_to_unfreeze:]

    for stage in stages_to_unfreeze:
        for param in stage.parameters():
            param.requires_grad = True

    if hasattr(model, 'neck'):
        for param in model.neck.parameters(): param.requires_grad = True
    if hasattr(model, 'head'):
        for param in model.head.parameters(): param.requires_grad = True
            
    param_groups = []
    if use_differential_lr:
        backbone_params = list(filter(lambda p: p.requires_grad, model.backbone.parameters()))
        if backbone_params: param_groups.append({"params": backbone_params, "lr": backbone_lr})

        if hasattr(model, 'neck'):
            neck_params = list(filter(lambda p: p.requires_grad, model.neck.parameters()))
            if neck_params: param_groups.append({"params": neck_params, "lr": head_lr})

        if hasattr(model, 'head'):
            head_params = list(filter(lambda p: p.requires_grad, model.head.parameters()))
            if head_params: param_groups.append({"params": head_params, "lr": head_lr})
    else:
        param_groups = filter(lambda p: p.requires_grad, model.parameters())

    return param_groups

# --- 학습 파이프라인 ---

def run_fold_training(
    fold: int,
    model_config: dict,
    global_config: dict,
    train_df: pd.DataFrame,
    val_df: pd.DataFrame,
    device: torch.device
):
    # --- 설정 불러오기 ---
    cfg = load_config(model_config['config_path'])
    model_name = model_config['model_name']
    
    print(f"\n===== FOLD {fold+1}/{global_config['N_SPLITS']} | MODEL: {model_name} | CONFIG: {model_config['config_path']} =====")
    
    set_seed(global_config['SEED'] + fold) # Fold마다 다른 시드 적용

    # --- 데이터셋 및 로더 ---
    train_transform, val_transform = build_unified_transforms(cfg["transforms"]["train"]), build_unified_transforms(cfg["transforms"]["val"])
    DatasetClass = get_dataset(cfg['DATASET'])
    train_dataset = DatasetClass(df=train_df, data_root=f"{global_config['DATA_ROOT']}/train/", transform=train_transform)
    val_dataset = DatasetClass(df=val_df, data_root=f"{global_config['DATA_ROOT']}/train/", transform=val_transform)
    
    sampler = setting_sampler(train_df)
    train_loader = DataLoader(train_dataset, batch_size=cfg["BATCH_SIZE"], sampler=sampler, num_workers=os.cpu_count()//2, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=cfg["BATCH_SIZE"], shuffle=False, num_workers=os.cpu_count()//2, pin_memory=True)

    # --- 모델, 손실함수, 로거 ---
    ModelClass = get_model(cfg['MODEL'])
    model = ModelClass(num_classes=global_config['NUM_CLASSES']).to(device)
    criterion = get_loss(cfg["loss"]["name"], cfg["loss"]["params"])
    
    date = datetime.datetime.now().strftime("%Y%m%d-%H%M")
    run_name = f"{model_name}_fold{fold+1}_{date}"
    save_path = f"{global_config['OUTPUT_ROOT']}/{run_name}_checkpoint.pth"
    
    logger = WandbLogger(
        project_name="DTC-KFold-Ensemble",
        run_name=run_name,
        config=cfg,
        save_path=save_path
    )

    # --- 1단계: 헤드 학습 ---
    print("\n--- STAGE 1: HEAD TRAINING ---")
    if cfg['head_only_epochs'] > 0:
        head_params = setup_optimizer_params(model, cfg["model_type"], 0, 0, cfg["head_only_lr"], False)
        optimizer_head = get_optimizer(cfg["optimizer"]["name"], head_params, {"lr": cfg["head_only_lr"]})
        
        scheduler_params_s1 = cfg["scheduler"]["params"].copy()
        scheduler_params_s1['max_lr'] = cfg["head_only_lr"]
        scheduler_params_s1['total_steps'] = len(train_loader) * cfg["head_only_epochs"]
        if scheduler_params_s1['total_steps'] == 0: scheduler_params_s1['total_steps'] = 1
        scheduler_head = get_scheduler(cfg["scheduler"]["name"], optimizer_head, scheduler_params_s1)
        
        training_loop(
            TRAINING_REGISTRY[cfg['training_mode']], model, train_loader, val_loader, train_dataset, val_dataset, 
            criterion, optimizer_head, device, cfg["head_only_epochs"], None, logger, None, scheduler_head, {}
        )

    # --- 2단계: 전체 미세 조정 ---
    print("\n--- STAGE 2: FULL FINE-TUNING ---")
    params_to_update = setup_optimizer_params(
        model, cfg["model_type"], cfg["num_blocks_to_unfreeze"],
        cfg["backbone_lr"], cfg["optimizer"]["params"]["lr"], cfg["use_differential_lr"]
    )
    optimizer = get_optimizer(cfg["optimizer"]["name"], params_to_update, cfg["optimizer"]["params"])
    
    scheduler_params_s2 = cfg["scheduler"]["params"].copy()
    max_lrs = [pg.get('lr', cfg['optimizer']['params']['lr']) for pg in optimizer.param_groups]
    scheduler_params_s2['max_lr'] = max_lrs
    scheduler_params_s2['total_steps'] = len(train_loader) * cfg["EPOCHS"]
    scheduler = get_scheduler(cfg["scheduler"]["name"], optimizer, scheduler_params_s2)

    early_stopping = EarlyStopping(patience=cfg["patience"], delta=cfg["delta"], verbose=True, save_path=save_path, mode='max')
    training_args = {'scaler': GradScaler()} if cfg["training_mode"] == 'on_amp' else {}

    training_loop(
        TRAINING_REGISTRY[cfg['training_mode']], model, train_loader, val_loader, train_dataset, val_dataset, 
        criterion, optimizer, device, cfg["EPOCHS"], early_stopping, logger, None, scheduler, training_args
    )

    # 메모리 정리
    del model, optimizer, scheduler, train_loader, val_loader, logger, early_stopping
    gc.collect()
    torch.cuda.empty_cache()

def train_pipeline(global_config):
    print("🚀 STARTING K-FOLD TRAINING PIPELINE 🚀")
    
    # 전체 학습 데이터 로드
    full_train_df = pd.read_csv(f"{global_config['DATA_ROOT']}/train_valid_set/train-label-fix-v1.csv")
    
    # K-Fold 분할기 설정
    skf = StratifiedKFold(n_splits=global_config['N_SPLITS'], shuffle=True, random_state=global_config['SEED'])

    for model_cfg in global_config['MODELS']:
        # K-Fold 루프 실행
        for fold, (train_idx, val_idx) in enumerate(skf.split(full_train_df, full_train_df['target'])):
            train_df = full_train_df.iloc[train_idx]
            val_df = full_train_df.iloc[val_idx]
            
            run_fold_training(
                fold=fold,
                model_config=model_cfg,
                global_config=global_config,
                train_df=train_df,
                val_df=val_df,
                device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
            )
    print("\n🎉 ALL K-FOLD TRAINING FINISHED! 🎉")


# --- 추론 파이프라인 ---

def inference_pipeline(global_config):
    print("🚀 STARTING ENSEMBLE INFERENCE PIPELINE 🚀")
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # --- 테스트 데이터셋 준비 ---
    # test_data_preprocess.py를 실행했다면 test_modify 폴더 사용
    test_dir = f"{global_config['DATA_ROOT']}/test_modify"
    if not os.path.exists(test_dir) or not os.listdir(test_dir):
        test_dir = f"{global_config['DATA_ROOT']}/test"
        print(f"'{global_config['DATA_ROOT']}/test_modify' not found or empty. Using original test directory: '{test_dir}'")

    submission_df = pd.read_csv(f"{global_config['DATA_ROOT']}/sample_submission.csv")
    
    # 추론에는 간단한 리사이즈와 텐서 변환만 사용 (첫 번째 모델의 val transform 기준)
    _, test_transform = build_unified_transforms(load_config(global_config['MODELS'][0]['config_path'])["transforms"]["val"])
    
    DatasetClass = get_dataset('BaseDataset') # 추론 시에는 간단한 데이터셋 사용
    test_dataset = DatasetClass(df=submission_df, data_root=test_dir, transform=test_transform)
    test_loader = DataLoader(test_dataset, batch_size=global_config['INFERENCE']['batch_size'], shuffle=False, num_workers=os.cpu_count()//2)

    # --- TTA 변환 준비 ---
    tta_transforms = None
    if global_config['INFERENCE']['use_tta']:
        tta_transforms = build_tta_transforms(global_config['INFERENCE']['tta_transforms'])
        print(f"TTA enabled with {len(tta_transforms)} transforms.")

    # --- 앙상블 추론 ---
    all_model_probs = []
    model_weights = []

    for model_cfg in global_config['MODELS']:
        model_name = model_cfg['model_name']
        cfg = load_config(model_cfg['config_path'])
        
        # 해당 모델의 모든 Fold 체크포인트 경로 찾기
        fold_checkpoints = sorted(glob.glob(f"{global_config['OUTPUT_ROOT']}/{model_name}_fold*_checkpoint.pth"))
        if not fold_checkpoints:
            print(f"⚠️ WARNING: No checkpoints found for model '{model_name}'. Skipping.")
            continue
        
        print(f"\n--- Inferencing with {model_name} ({len(fold_checkpoints)} folds) ---")
        model_weights.append(model_cfg['weight'])
        
        model_fold_probs = []
        ModelClass = get_model(cfg['MODEL'])

        for fold_path in fold_checkpoints:
            print(f"Loading: {os.path.basename(fold_path)}")
            model = ModelClass(num_classes=global_config['NUM_CLASSES']).to(device)
            model.load_state_dict(torch.load(fold_path, map_location=device))
            model.eval()

            with torch.no_grad():
                fold_preds = []
                for images, _, _ in tqdm(test_loader, desc=f"Fold {os.path.basename(fold_path).split('_')[1]}", leave=False):
                    images = images.to(device)
                    
                    if tta_transforms:
                        # TTA 적용
                        avg_probs = get_tta_predictions(model, images, tta_transforms, device)
                        fold_preds.append(avg_probs)
                    else:
                        # TTA 미적용
                        outputs = model(images)
                        probs = torch.softmax(outputs, dim=1)
                        fold_preds.append(probs.cpu())

            model_fold_probs.append(torch.cat(fold_preds, dim=0))
            
            del model
            gc.collect()
            torch.cuda.empty_cache()

        # 각 Fold의 예측 확률을 평균
        avg_model_probs = torch.stack(model_fold_probs, dim=0).mean(dim=0)
        all_model_probs.append(avg_model_probs)

    if not all_model_probs:
        print("❌ ERROR: No models were successfully inferenced. Aborting.")
        return

    # --- 가중 앙상블 ---
    print("\n--- Performing weighted ensemble ---")
    # 가중치 정규화
    total_weight = sum(model_weights)
    normalized_weights = [w / total_weight for w in model_weights]

    final_probs = torch.zeros_like(all_model_probs[0])
    for probs, weight in zip(all_model_probs, normalized_weights):
        final_probs += probs * weight

    final_predictions = torch.argmax(final_probs, dim=1).numpy()

    # --- 제출 파일 생성 ---
    submission_df['target'] = final_predictions
    submission_path = os.path.join(global_config['OUTPUT_ROOT'], global_config['INFERENCE']['submission_filename'])
    submission_df.to_csv(submission_path, index=False)
    print(f"\n🎉 Inference finished! Submission file saved to: {submission_path} 🎉")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="K-Fold Ensemble Training and Inference Pipeline")
    parser.add_argument('--mode', type=str, required=True, choices=['train', 'inference'],
                        help="Set the pipeline to 'train' or 'inference' mode.")
    args = parser.parse_args()

    # 중앙 설정 파일 로드
    global_config = load_config("config/kfold_ensemble_config.yaml")
    
    # 출력 폴더 생성
    os.makedirs(global_config['OUTPUT_ROOT'], exist_ok=True)

    if args.mode == 'train':
        train_pipeline(global_config)
    elif args.mode == 'inference':
        inference_pipeline(global_config)

🚀 3. 실행 방법
이제 모든 준비가 끝났습니다. 터미널에서 아래 명령어를 순서대로 실행하시면 됩니다.

1. (선택) 테스트 데이터 전처리
만약 test_data_preprocess.py를 사용하신다면, 가장 먼저 실행해 주세요. 이 스크립트는 ./data/test 이미지를 처리하여 ./data/test_modify 폴더에 저장합니다. 추론 스크립트는 이 폴더를 우선적으로 사용합니다.

bash
python test_data_preprocess.py
2. K-Fold 학습 시작
아래 명령어로 train 모드를 실행합니다. 스크립트는 kfold_ensemble_config.yaml을 읽어 정의된 모든 모델에 대해 K-Fold 학습을 순차적으로 진행하고, 각 Fold의 베스트 모델을 ./output 폴더에 저장합니다.

bash
python run_kfold_ensemble.py --mode train
3. 앙상블 추론 및 제출 파일 생성
학습이 모두 끝나면, 아래 명령어로 inference 모드를 실행합니다. 스크립트가 ./output 폴더에서 학습된 모든 체크포인트를 자동으로 찾아 TTA와 가중 앙상블을 적용한 후, 최종 제출 파일(submission_ensemble.csv)을 생성합니다.

bash
python run_kfold_ensemble.py --mode inference
이 새로운 파이프라인을 통해 훨씬 더 체계적이고 재현 가능하게 실험을 관리하실 수 있습니다. 성공적인 결과 있기를 바랍니다!