In [29]:
import os
import random

import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm

# TTA에 필요한 cv2 임포트
import cv2
# Softmax를 위한 torch.nn.functional 임포트
import torch.nn.functional as F

# Weights & Biases (wandb) 임포트
import wandb


In [30]:

# 시드를 고정합니다. (추론 시에도 재현성을 위해)
SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True


In [31]:

# 데이터셋 클래스를 정의합니다. (training.py와 동일하게 유지)
class ImageDataset(Dataset):
    def __init__(self, csv_file, path, transform=None, return_raw=False):
        self.df = pd.read_csv(csv_file)
        self.path = path
        self.transform = transform
        self.return_raw = return_raw

        if 'ID' not in self.df.columns:
            raise ValueError(f"CSV 파일 '{csv_file}'에 'ID' 컬럼이 없습니다.")
        if 'target' not in self.df.columns:
            # 테스트 CSV 파일은 'target' 컬럼이 없을 수 있으므로 경고만 표시
            print(f"경고: CSV 파일 '{csv_file}'에 'target' 컬럼이 없습니다. 테스트 데이터셋으로 가정합니다.", flush=True)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_name = self.df.loc[idx, 'ID']
        # 테스트셋의 경우 'target' 컬럼이 없을 수 있으므로 기본값 0 사용
        target = self.df.loc[idx, 'target'] if 'target' in self.df.columns else 0 

        img_path = os.path.join(self.path, img_name)
        
        if not os.path.exists(img_path):
            print(f"경고: 이미지 파일이 존재하지 않습니다: {img_path}. 이 항목은 건너뛰거나 에러가 발생할 수 있습니다.", flush=True)

        if self.return_raw:
            img = np.array(Image.open(img_path).convert('RGB'))
            return img, target
        else:
            if self.transform:
                if hasattr(self.transform, '__module__') and 'torchvision.transforms' in self.transform.__module__:
                    img = Image.open(img_path).convert('RGB')
                    img = self.transform(img)
                else:
                    img = np.array(Image.open(img_path).convert('RGB'))
                    img = self.transform(image=img)['image']
            else:
                img = np.array(Image.open(img_path).convert('RGB'))

            return img, target


In [None]:

# TTA를 위한 추론 함수
# def inference_with_tta(loader, model, device, tta_augments, base_transform):
#     model.eval()
#     all_preds_proba = []

#     with torch.no_grad():
#         pbar = tqdm(loader)
#         for raw_image_batch, _ in pbar:
#             raw_image = raw_image_batch.squeeze(0).cpu().numpy()

#             single_image_tta_preds = []

#             for tta_aug in tta_augments:
#                 augmented_img_np = tta_aug(image=raw_image)['image']
#                 transformed_img_tensor = base_transform(image=augmented_img_np)['image']
#                 transformed_img_tensor = transformed_img_tensor.unsqueeze(0).to(device)
#                 output = model(transformed_img_tensor)
#                 prob = F.softmax(output, dim=1).cpu().numpy()
#                 single_image_tta_preds.append(prob)

#             avg_prob = np.mean(single_image_tta_preds, axis=0)
#             all_preds_proba.append(avg_prob)

#     final_preds_proba = np.concatenate(all_preds_proba, axis=0)

#     return final_preds_proba


In [None]:

# TTA를 위한 추론 함수
def inference_with_tta(loader, model, device, tta_augments, base_transform):
    model.eval()
    all_preds_proba = []
    all_img_names = [] # 이미지 이름을 저장

    with torch.no_grad():
        pbar = tqdm(loader)
        for raw_image_batch, img_names_batch in pbar: # 이미지 이름도 받음
            raw_image = raw_image_batch.squeeze(0).cpu().numpy()
            img_name = img_names_batch[0] # 배치 크기 1이므로 첫 번째 요소

            single_image_tta_preds = []

            for tta_aug in tta_augments:
                augmented_img_np = tta_aug(image=raw_image)['image']
                transformed_img_tensor = base_transform(image=augmented_img_np)['image']
                transformed_img_tensor = transformed_img_tensor.unsqueeze(0).to(device)
                output = model(transformed_img_tensor)
                prob = F.softmax(output, dim=1).cpu().numpy()
                single_image_tta_preds.append(prob)

            avg_prob = np.mean(single_image_tta_preds, axis=0)
            all_preds_proba.append(avg_prob)
            all_img_names.append(img_name) # 이미지 이름 저장

    final_preds_proba = np.concatenate(all_preds_proba, axis=0)

    return final_preds_proba, all_img_names # 확률과 이미지 이름 반환
    

In [33]:

# --- Hyper-parameters (추론에 필요한 최소한의 파라미터) ---
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

test_csv = "data/sample_submission.csv"
test_path = "data/test/"
checkpoint_dir = "checkpoints" # 학습된 모델이 저장된 경로

model_name = 'convnext_base' # 학습 시 사용한 모델 이름과 동일해야 함
img_size = 224 # 학습 시 사용한 이미지 크기와 동일해야 함
NUM_CLASSES = 17 # 분류할 클래스 개수 (학습 시와 동일해야 함)
BATCH_SIZE = 32 # 일반 추론 시 배치 크기
num_workers = 0 # DataLoader worker 수

USE_TTA = True # TTA 사용 여부 플래그
N_SPLITS = 5 # K-Fold 학습 시 사용한 폴드 개수와 동일해야 함


In [None]:

# --- Weights & Biases 초기화 (추론용) ---
# wandb.init(project="document-type-classification-inference",
#            name=f"inference_{model_name}_k{N_SPLITS}_tta{USE_TTA}", # 추론 실행 이름
#            config={
#                "model_name": model_name,
#                "img_size": img_size,
#                "num_classes": NUM_CLASSES,
#                "use_tta": USE_TTA,
#                "n_splits": N_SPLITS, # K-Fold 정보 추가
#            },
#            mode="online")


In [34]:

# --- Data Transforms (추론에 필요한 전처리) ---
tst_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

# TTA specific augmentations
tta_augments = [
    A.NoOp(),
    A.HorizontalFlip(p=1),
    A.VerticalFlip(p=1),
    A.Rotate(limit=90, p=1, interpolation=cv2.INTER_LINEAR),
    A.Rotate(limit=180, p=1, interpolation=cv2.INTER_LINEAR),
    A.Rotate(limit=270, p=1, interpolation=cv2.INTER_LINEAR),
    # A.RandomBrightnessContrast(p=1, brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2)),
    # A.RandomGamma(p=1, gamma_limit=(80, 120)),
    # A.GaussNoise(p=1, var_limit=(10.0, 50.0)),
    A.Sharpen(alpha=(0.3, 0.6), lightness=(0.7, 1.0), p=1.0),
    A.CLAHE(p=1, clip_limit=2.0, tile_grid_size=(8,8)),
]


In [35]:

# --- Inference & Save File ---
print("🚀 Starting inference process...", flush=True)

# 모든 폴드의 예측 확률을 저장할 리스트
all_fold_predictions_proba = []

# K-Fold 모델 로드 및 예측 수행
for fold in range(N_SPLITS):
    fold_model_path = os.path.join(checkpoint_dir, f'best_model_fold_{fold}.pth')
    
    if not os.path.exists(fold_model_path):
        print(f"Error: Model for Fold {fold} not found at {fold_model_path}. Skipping this fold.", flush=True)
        continue

    print(f"\n--- Loading model for Fold {fold+1}/{N_SPLITS} ---", flush=True)
    model = timm.create_model(
        model_name,
        pretrained=False,
        num_classes=NUM_CLASSES
    ).to(device)
    model.load_state_dict(torch.load(fold_model_path)['model_state_dict'])
    model.to(device)
    print(f"Model loaded from {fold_model_path}", flush=True)

    if USE_TTA:
        print(f"Performing inference with Test-Time Augmentation (TTA) for Fold {fold+1}...", flush=True)
        tta_test_dataset = ImageDataset(csv_file=test_csv, path=test_path, return_raw=True)
        tta_test_loader = DataLoader(tta_test_dataset, batch_size=1, shuffle=False, num_workers=num_workers)

        fold_preds_proba = inference_with_tta(
            loader=tta_test_loader,
            model=model,
            device=device,
            tta_augments=tta_augments,
            base_transform=tst_transform
        )
        all_fold_predictions_proba.append(fold_preds_proba)
        print(f"Inference for Fold {fold+1} completed.", flush=True)

    else:
        print(f"Performing normal inference for Fold {fold+1}...", flush=True)
        test_dataset_normal_inference = ImageDataset(
            csv_file=test_csv,
            path=test_path,
            transform=tst_transform,
            return_raw=False
        )
        test_loader_normal = DataLoader(
            test_dataset_normal_inference,
            batch_size=BATCH_SIZE,
            shuffle=False,
            num_workers=num_workers
        )

        model.eval()
        fold_preds_proba_list = []
        with torch.no_grad():
            pbar = tqdm(test_loader_normal)
            for image, _ in pbar:
                image = image.to(device)
                outputs = model(image)
                prob = F.softmax(outputs, dim=1).cpu().numpy()
                fold_preds_proba_list.extend(prob)
        all_fold_predictions_proba.append(np.array(fold_preds_proba_list))
        print(f"Inference for Fold {fold+1} completed.", flush=True)

# 모든 폴드의 예측 확률 평균
if all_fold_predictions_proba:
    print("\n--- Averaging predictions from all folds ---", flush=True)
    final_avg_preds_proba = np.mean(all_fold_predictions_proba, axis=0)
    final_preds = np.argmax(final_avg_preds_proba, axis=1)

    submission_df = pd.read_csv(test_csv)
    submission_file_name = 'submission_kfold_ensemble_tta.csv' if USE_TTA else 'submission_kfold_ensemble_normal.csv'
    submission_df['target'] = final_preds
    submission_df.to_csv(submission_file_name, index=False)
    print(f"Final ensemble predictions saved to {submission_file_name}", flush=True)

    # --- Weights & Biases에 제출 파일 아티팩트로 기록 ---
    # artifact = wandb.Artifact(f'submission_ensemble_tta_{N_SPLITS}folds' if USE_TTA else f'submission_ensemble_normal_{N_SPLITS}folds', type='submission')
    # artifact.add_file(submission_file_name)
    # wandb.log_artifact(artifact)
    # wandb.log({"final_submission_file": submission_file_name})

else:
    print("No models were successfully loaded or processed for inference. No submission file generated.", flush=True)

print("=" * 60, flush=True)
print(f"🎉 All inference processes completed! 🎉", flush=True)

# --- Weights & Biases 실행 종료 ---
# wandb.finish()


🚀 Starting inference process...

--- Loading model for Fold 1/5 ---
Model loaded from checkpoints/best_model_fold_0.pth
Performing inference with Test-Time Augmentation (TTA) for Fold 1...


100%|██████████| 3140/3140 [03:52<00:00, 13.53it/s]

Inference for Fold 1 completed.

--- Loading model for Fold 2/5 ---





Model loaded from checkpoints/best_model_fold_1.pth
Performing inference with Test-Time Augmentation (TTA) for Fold 2...


100%|██████████| 3140/3140 [03:53<00:00, 13.44it/s]

Inference for Fold 2 completed.

--- Loading model for Fold 3/5 ---





Model loaded from checkpoints/best_model_fold_2.pth
Performing inference with Test-Time Augmentation (TTA) for Fold 3...


100%|██████████| 3140/3140 [03:52<00:00, 13.50it/s]

Inference for Fold 3 completed.

--- Loading model for Fold 4/5 ---





Model loaded from checkpoints/best_model_fold_3.pth
Performing inference with Test-Time Augmentation (TTA) for Fold 4...


100%|██████████| 3140/3140 [03:51<00:00, 13.54it/s]

Inference for Fold 4 completed.

--- Loading model for Fold 5/5 ---





Model loaded from checkpoints/best_model_fold_4.pth
Performing inference with Test-Time Augmentation (TTA) for Fold 5...


100%|██████████| 3140/3140 [03:51<00:00, 13.55it/s]

Inference for Fold 5 completed.

--- Averaging predictions from all folds ---
Final ensemble predictions saved to submission_kfold_ensemble_tta.csv
🎉 All inference processes completed! 🎉





In [26]:
# pseudo labeling 을 위한 inference 코드

# TTA를 위한 추론 함수
def inference_with_tta(loader, model, device, tta_augments, base_transform):
    model.eval()
    all_preds_proba = []
    all_img_names = [] # 이미지 이름을 저장

    with torch.no_grad():
        pbar = tqdm(loader)
        for raw_image_batch, img_names_batch in pbar: # 이미지 이름도 받음
            raw_image = raw_image_batch.squeeze(0).cpu().numpy()
            img_name = img_names_batch[0] # 배치 크기 1이므로 첫 번째 요소

            single_image_tta_preds = []

            for tta_aug in tta_augments:
                augmented_img_np = tta_aug(image=raw_image)['image']
                transformed_img_tensor = base_transform(image=augmented_img_np)['image']
                transformed_img_tensor = transformed_img_tensor.unsqueeze(0).to(device)
                output = model(transformed_img_tensor)
                prob = F.softmax(output, dim=1).cpu().numpy()
                single_image_tta_preds.append(prob)

            avg_prob = np.mean(single_image_tta_preds, axis=0)
            all_preds_proba.append(avg_prob)
            all_img_names.append(img_name) # 이미지 이름 저장

    final_preds_proba = np.concatenate(all_preds_proba, axis=0)

    return final_preds_proba, all_img_names # 확률과 이미지 이름 반환
    


In [None]:

# --- Hyper-parameters (추론에 필요한 최소한의 파라미터) ---
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

test_csv = "data/sample_submission.csv"
test_path = "data/test/"
# checkpoint_dir = "checkpoints_0707_kfold_conv" # 학습된 모델이 저장된 경로
checkpoint_dir = "checkpoints"

model_name = 'convnext_base' # 학습 시 사용한 모델 이름과 동일해야 함
img_size = 224 # 학습 시 사용한 이미지 크기와 동일해야 함
NUM_CLASSES = 17 # 분류할 클래스 개수 (학습 시와 동일해야 함)
BATCH_SIZE = 32 # 일반 추론 시 배치 크기
num_workers = 0 # DataLoader worker 수

USE_TTA = True # TTA 사용 여부 플래그
N_SPLITS = 5 # K-Fold 학습 시 사용한 폴드 개수와 동일해야 함


In [27]:

# Pseudo-labeling specific parameters
CONFIDENCE_THRESHOLD = 0.9 # 의사 레이블을 부여할 확신도 임계값
PSEUDO_LABEL_CSV_PATH = "train_pseudo_labeled.csv" # 의사 레이블링된 데이터 저장 경로

In [28]:
# --- Data Transforms (추론에 필요한 전처리) ---
tst_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

# TTA specific augmentations
tta_augments = [
    A.NoOp(),
    A.HorizontalFlip(p=1),
    A.VerticalFlip(p=1),
    A.Rotate(limit=90, p=1, interpolation=cv2.INTER_LINEAR),
    A.Rotate(limit=180, p=1, interpolation=cv2.INTER_LINEAR),
    A.Rotate(limit=270, p=1, interpolation=cv2.INTER_LINEAR),
    A.RandomBrightnessContrast(p=1, brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2)),
    A.RandomGamma(p=1, gamma_limit=(80, 120)),
    A.GaussNoise(p=1, var_limit=(10.0, 50.0)),
    A.CLAHE(p=1, clip_limit=2.0, tile_grid_size=(8,8)),
]

# --- Main Pseudo-Labeling Process ---
try:
    print("🚀 Starting pseudo-labeling process...", flush=True)

    # 1. 테스트 데이터 로드
    # ImageDataset에서 이미지 이름도 반환하도록 수정했으므로, return_raw=True를 사용
    test_dataset = ImageDataset(csv_file=test_csv, path=test_path, return_raw=True)
    # TTA를 위해 batch_size=1로 설정
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=num_workers)

    # 모든 폴드의 예측 확률을 저장할 리스트
    all_fold_predictions_proba = []

    # 2. K-Fold 모델 로드 및 테스트 데이터에 대한 예측 수행
    print(f"Loading {N_SPLITS} models and predicting on test data...", flush=True)
    for fold in range(N_SPLITS):
        fold_model_path = os.path.join(checkpoint_dir, f'best_model_fold_{fold}.pth')
        
        if not os.path.exists(fold_model_path):
            print(f"경고: Fold {fold}의 모델을 찾을 수 없습니다: {fold_model_path}. 이 폴드는 건너뜁니다.", flush=True)
            continue

        print(f"--- Predicting with model from Fold {fold+1}/{N_SPLITS} ---", flush=True)
        model = timm.create_model(
            model_name,
            pretrained=False,
            num_classes=NUM_CLASSES
        ).to(device)
        model.load_state_dict(torch.load(fold_model_path)['model_state_dict'])
        model.to(device)
        model.eval() # 평가 모드

        if USE_TTA:
            fold_preds_proba, img_names = inference_with_tta(
                loader=test_loader,
                model=model,
                device=device,
                tta_augments=tta_augments,
                base_transform=tst_transform
            )
        else:
            # TTA를 사용하지 않는 경우의 일반 추론
            fold_preds_proba_list = []
            img_names = []
            with torch.no_grad():
                pbar = tqdm(test_loader)
                for image_batch, _, img_names_batch in pbar:
                    image = image_batch.to(device)
                    outputs = model(image)
                    prob = F.softmax(outputs, dim=1).cpu().numpy()
                    fold_preds_proba_list.extend(prob)
                    img_names.extend(img_names_batch)
            fold_preds_proba = np.array(fold_preds_proba_list)
        
        all_fold_predictions_proba.append(fold_preds_proba)
        print(f"Prediction for Fold {fold+1} completed.", flush=True)

    if not all_fold_predictions_proba:
        print("모든 폴드의 모델 로드 또는 예측에 실패했습니다. 의사 레이블을 생성할 수 없습니다.", flush=True)
        wandb.finish()
        exit()

    # 3. 모든 폴드의 예측 확률 평균 (앙상블)
    print("\n--- Averaging predictions from all folds ---", flush=True)
    ensemble_preds_proba = np.mean(all_fold_predictions_proba, axis=0)
    
    # 4. 의사 레이블 선별
    print(f"Filtering pseudo-labels with confidence threshold: {CONFIDENCE_THRESHOLD}", flush=True)
    max_confidences = np.max(ensemble_preds_proba, axis=1)
    pseudo_labels = np.argmax(ensemble_preds_proba, axis=1)

    confident_indices = np.where(max_confidences >= CONFIDENCE_THRESHOLD)[0]
    
    # 의사 레이블링된 데이터프레임 생성
    pseudo_labeled_df = pd.DataFrame({
        'ID': [img_names[i] for i in confident_indices],
        'target': [pseudo_labels[i] for i in confident_indices]
    })
    
    print(f"Total test samples: {len(test_dataset)}", flush=True)
    print(f"Number of pseudo-labeled samples: {len(pseudo_labeled_df)}", flush=True)

    # 5. 기존 학습 데이터와 의사 레이블 데이터 결합
    # print("Combining original training data with pseudo-labeled data...", flush=True)
    # original_train_df = pd.read_csv(train_csv_file)
    # combined_df = pd.concat([original_train_df, pseudo_labeled_df], ignore_index=True)
    
    # 6. 새로운 CSV 파일로 저장
    pseudo_labeled_df.to_csv(PSEUDO_LABEL_CSV_PATH, index=False)
    print(f"Pseudo-labeled dataset saved to {PSEUDO_LABEL_CSV_PATH}", flush=True)

    
except Exception as e:
    print(f"An error occurred during pseudo-labeling: {e}", flush=True)
    wandb.finish(exit_code=1) # 에러 발생 시 wandb run을 실패로 표시
    raise # 에러를 다시 발생시켜 스크립트 종료
finally:
    print("=" * 60, flush=True)
    print(f"🎉 Pseudo-labeling process completed! 🎉", flush=True)
    print("=" * 60, flush=True)
    

🚀 Starting pseudo-labeling process...
Loading 5 models and predicting on test data...
--- Predicting with model from Fold 1/5 ---


100%|██████████| 3140/3140 [05:36<00:00,  9.33it/s]

Prediction for Fold 1 completed.
--- Predicting with model from Fold 2/5 ---



100%|██████████| 3140/3140 [05:36<00:00,  9.32it/s]

Prediction for Fold 2 completed.
--- Predicting with model from Fold 3/5 ---



100%|██████████| 3140/3140 [05:38<00:00,  9.29it/s]

Prediction for Fold 3 completed.
--- Predicting with model from Fold 4/5 ---



100%|██████████| 3140/3140 [05:38<00:00,  9.27it/s]

Prediction for Fold 4 completed.
--- Predicting with model from Fold 5/5 ---



100%|██████████| 3140/3140 [05:42<00:00,  9.17it/s]

Prediction for Fold 5 completed.

--- Averaging predictions from all folds ---
Filtering pseudo-labels with confidence threshold: 0.9
Total test samples: 3140
Number of pseudo-labeled samples: 1774
Pseudo-labeled dataset saved to train_pseudo_labeled.csv
🎉 Pseudo-labeling process completed! 🎉



