# Import

In [None]:
import os
import random
import cv2
import pandas as pd
import numpy as np

from PIL import Image
from tqdm import tqdm 

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision.models as models
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch import nn, optim
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.metrics import log_loss

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
import albumentations
print(albumentations.__version__)


# Hyperparameter Setting

In [None]:
CFG = {
    'IMG_SIZE': 384,
    'BATCH_SIZE': 8,
    'EPOCHS': 30,
    'LEARNING_RATE': 1e-4,
    'SEED' : 42,
}

# Fixed RandomSeed

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CFG['SEED']) # Seed 고정

In [None]:
# outlier_paths

# CustomDataset

In [None]:
from PIL import Image

class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None, is_test=False):
        self.root_dir = root_dir
        self.transform = transform
        self.is_test = is_test
        self.samples = []

        if is_test:
            for fname in sorted(os.listdir(root_dir)):
                if fname.lower().endswith('.jpg'):
                    img_path = os.path.join(root_dir, fname)
                    self.samples.append((img_path,))
        else:
            self.classes = sorted(os.listdir(root_dir))
            self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
            for cls_name in self.classes:
                cls_folder = os.path.join(root_dir, cls_name)
                for fname in os.listdir(cls_folder):
                    if fname.lower().endswith('.jpg'):
                        img_path = os.path.join(cls_folder, fname)
                        label = self.class_to_idx[cls_name]
                        self.samples.append((img_path, label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        if self.is_test:
            img_path = self.samples[idx][0]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image=np.array(image))['image']
            return image
        else:
            img_path, label = self.samples[idx]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image=np.array(image))['image']
            return image, label



In [None]:
import random
from collections import defaultdict

# 고정된 random seed로 항상 같은 샘플링 결과가 나오도록 설정
random.seed(42)

def get_classwise_indices(dataset, n_val_per_class):
    class_to_indices = defaultdict(list)
    
    # dataset.samples에서 클래스별로 인덱스 수집
    for idx, (_, label) in enumerate(dataset.samples):
        class_to_indices[label].append(idx)
    
    train_idx, val_idx = [], []

    for label, indices in class_to_indices.items():
        # 섞고 n개를 validation, 나머지는 train
        random.shuffle(indices)
        val_samples = indices[:n_val_per_class]
        train_samples = indices[n_val_per_class:]

        val_idx.extend(val_samples)
        train_idx.extend(train_samples)

    return train_idx, val_idx


In [None]:
def cutmix_data(x, y, alpha=1.0):
    ''' CutMix augmentation '''
    indices = torch.randperm(x.size(0))
    shuffled_x = x[indices]
    shuffled_y = y[indices]

    lam = np.random.beta(alpha, alpha)
    bbx1, bby1, bbx2, bby2 = rand_bbox(x.size(), lam)
    x[:, :, bbx1:bbx2, bby1:bby2] = shuffled_x[:, :, bbx1:bbx2, bby1:bby2]

    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (x.size(-1) * x.size(-2)))
    y_a, y_b = y, shuffled_y
    return x, y_a, y_b, lam

def rand_bbox(size, lam):
    ''' Random bounding box for CutMix '''
    W = size[2]
    H = size[3]
    cut_rat = np.sqrt(1. - lam)
    cut_w = int(W * cut_rat)
    cut_h = int(H * cut_rat)

    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    return bbx1, bby1, bbx2, bby2


In [None]:
from PIL import Image
import numpy as np

def visualize_transform_effect_from_subset(subset_dataset, index=0):
    image, label = subset_dataset[index]
    image_aug = image.permute(1, 2, 0).cpu().numpy()
    image_aug = (image_aug * 255).astype('uint8')

    img_path, _ = subset_dataset.dataset.samples[subset_dataset.indices[index]]
    print(f"Trying to read: {img_path}")

    try:
        image_orig = np.array(Image.open(img_path).convert("RGB"))
    except Exception as e:
        raise FileNotFoundError(f"Failed to load image using PIL: {img_path}, Error: {e}")

    fig, axs = plt.subplots(1, 2, figsize=(10, 5))
    axs[0].imshow(image_orig)
    axs[0].set_title("Original")
    axs[0].axis("off")

    axs[1].imshow(image_aug)
    axs[1].set_title("Transformed")
    axs[1].axis("off")

    plt.tight_layout()
    plt.show()


# Data Load

In [None]:
train_root = './train'
test_root = './test'

In [None]:
train_transform = A.Compose([
    A.Resize(512, 512),
        A.CoarseDropout(
            max_holes=2,
            max_height=int(0.3 * 512),
            max_width=int(0.3 * 512),
            min_holes=1,
            min_height=int(0.1 * 512),
            min_width=int(0.1 * 512),
            fill_value=(255, 255, 255),
            p=0.5
        ),
        A.ColorJitter(
            brightness=0.2,
            contrast=0.2,
            saturation=0.2,
            hue=0.1,
            p=0.5
        ),
        #A.Lambda(image=lambda x: x[..., np.random.permutation(3)] if random.random() < 0.5 else x),  # RGB shift
        A.Rotate(limit=180, p=0.1),
        A.RandomResizedCrop(
    height=512,  # 제거하거나
    width=512,   # 제거하고
    size=(512, 512),  # 추가
    scale=(0.5, 1.0),
    ratio=(0.75, 1.33),
    p=0.25
),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])


val_transform = A.Compose([
    A.Resize(512, 512),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])


In [None]:
# 전체 데이터셋 로드
full_dataset = CustomImageDataset(train_root, transform=None)
print(f"총 이미지 수: {len(full_dataset)}")

targets = [label for _, label in full_dataset.samples]
class_names = full_dataset.classes

train_idx, val_idx = train_test_split(
    range(len(targets)), test_size=0.2, stratify=targets, random_state=42
)
# Subset + transform 각각 적용
train_dataset = Subset(CustomImageDataset(train_root, transform=train_transform), train_idx)
val_dataset = Subset(CustomImageDataset(train_root, transform=val_transform), val_idx)
print(f'train 이미지 수: {len(train_dataset)}, valid 이미지 수: {len(val_dataset)}')


# DataLoader 정의
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

In [None]:
len(class_names)

In [None]:
for i in range(10):
    visualize_transform_effect_from_subset(train_dataset, index=i)

# Model Define

In [None]:
import timm
import torch.nn as nn

class BaseModel(nn.Module):
    def __init__(self, model_name='convnext_base', num_classes=10):
        super(BaseModel, self).__init__()
        self.backbone = timm.create_model(model_name, pretrained=True, num_classes=0)
        self.head = nn.Linear(self.backbone.num_features, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        return self.head(x)

# from internimage import intern_image

# class BaseModel(nn.Module):
#     def __init__(self, model_name='internimage_b', num_classes=10):
#         super(BaseModel, self).__init__()
#         self.backbone = intern_image(pretrained=True, checkpoint_path=None, variant=model_name)
#         # InternImage는 기본적으로 1024차원 출력입니다 (variant에 따라 다름)
#         self.head = nn.Linear(self.backbone.embed_dim, num_classes)
        
#     def forward(self, x):
#         x = self.backbone(x)
#         x = self.head(x)
#         return x


In [None]:
from sklearn.utils.class_weight import compute_class_weight

# full_dataset.samples -> [(img_path, label), ...]
labels = [label for _, label in full_dataset.samples]
train_labels = np.array(labels)[train_idx]  # train_idx에서 학습 데이터만 뽑음

# class_weight 계산
classes = np.unique(train_labels)
class_weights = compute_class_weight('balanced', classes=classes, y=train_labels)
class_weights = torch.FloatTensor(class_weights).to(device)

print(f"클래스 가중치: {class_weights}")


# Train/ Validation

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.metrics import log_loss
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import CosineAnnealingLR

# 모델 정의 및 초기화
model = BaseModel(num_classes=len(class_names)).to(device)
best_logloss = float('inf')

# 손실 함수 및 옵티마이저
criterion = nn.CrossEntropyLoss(weight=class_weights)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.AdamW(model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=0.001)
optimizer = optim.Adam(model.parameters(), lr=CFG['LEARNING_RATE'])
scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

# 로그 저장용 리스트
train_losses = []
val_losses = []
val_accuracies = []
val_loglosses = []

# 학습 루프
for epoch in range(CFG['EPOCHS']):
    # ------------------------- TRAIN -------------------------
    model.train()
    train_loss = 0.0
    for images, labels in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{CFG['EPOCHS']}] Training"):
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()

        if np.random.rand() < CFG.get('CUTMIX_PROB', 0.5):
            images, targets1, targets2, lam = cutmix_data(images, labels)
            outputs = model(images)
            loss = criterion(outputs, targets1) * lam + criterion(outputs, targets2) * (1. - lam)
        else:
            outputs = model(images)
            loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)


    # ------------------------- VALID -------------------------
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    all_probs = []
    all_labels = []

    with torch.no_grad():
        for images, labels in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{CFG['EPOCHS']}] Validation"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            # Accuracy
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # LogLoss
            probs = F.softmax(outputs, dim=1)
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = 100 * correct / total
    val_logloss = log_loss(all_labels, all_probs, labels=list(range(len(class_names))))

    # 로그 저장
    train_losses.append(avg_train_loss)
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)
    val_loglosses.append(val_logloss)

    # 출력
    print(f"Train Loss : {avg_train_loss:.4f} || Valid Loss : {avg_val_loss:.4f} | "
          f"Valid Accuracy : {val_accuracy:.2f}% | LogLoss: {val_logloss:.4f}")

    # 🔹 스케줄러 업데이트
    scheduler.step()
    current_lr = scheduler.get_last_lr()[0]
    print(f"📉 Learning Rate after Epoch {epoch+1}: {current_lr:.6f}")

    # Best model 저장
    if val_logloss < best_logloss:
        best_logloss = val_logloss
        torch.save(model.state_dict(), f'best_model.pth')
        print(f"📦 Best model saved at epoch {epoch+1} (logloss: {val_logloss:.4f})")

# ------------------------- 그래프 시각화 -------------------------
epochs = range(1, CFG['EPOCHS'] + 1)

In [None]:
plt.figure(figsize=(18, 5))

# 1. Loss 그래프
plt.subplot(1, 3, 1)
plt.plot(epochs, train_losses, label='Train Loss', marker='o')
plt.plot(epochs, val_losses, label='Validation Loss', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss per Epoch')
plt.legend()
plt.grid(True)

# 2. Accuracy 그래프
plt.subplot(1, 3, 2)
plt.plot(epochs, val_accuracies, label='Validation Accuracy (%)', color='green', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Validation Accuracy per Epoch')
plt.legend()
plt.grid(True)

# 3. LogLoss 그래프
plt.subplot(1, 3, 3)
plt.plot(epochs, val_loglosses, label='Validation LogLoss', color='red', marker='o')
plt.xlabel('Epoch')
plt.ylabel('LogLoss')
plt.title('Validation LogLoss per Epoch')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
import koreanize_matplotlib

def imshow(img_tensor):
    """ De-normalize 후 matplotlib로 출력 가능하게 변환 """
    img = img_tensor.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std  = np.array([0.229, 0.224, 0.225])
    img = std * img + mean
    img = np.clip(img, 0, 1)
    return img

import pandas as pd
import torch
import numpy as np
import matplotlib.pyplot as plt
import koreanize_matplotlib

def analyze_and_save_wrong_predictions(model, val_dataset, class_names, save_path='val_wrong_predictions.csv', device='cuda'):

    model.eval()

    total = len(val_dataset)
    correct = 0
    wrong = 0
    wrong_records = []

    for i in range(total):
        image, label = val_dataset[i]
        global_idx = val_dataset.indices[i] if isinstance(val_dataset, torch.utils.data.Subset) else i
        image_input = image.unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = model(image_input)
            predicted = outputs.argmax(1).item()

        if predicted == label:
            correct += 1
        else:
            wrong += 1

            # 원본 경로 얻기
            if hasattr(val_dataset.dataset, 'samples'):
                img_path = val_dataset.dataset.samples[global_idx][0]
            else:
                img_path = 'Unknown'

            wrong_records.append({
                'ID': i,
                '데이터 경로': img_path,
                '원래 라벨 이름': class_names[label],
                '예측 라벨 이름': class_names[predicted]
            })

    # 통계 출력
    print(f"✅ 총 Validation 샘플 수: {total}")
    print(f"🎯 정확하게 예측한 수: {correct}")
    print(f"❌ 잘못 예측한 수: {wrong}")

    # 잘못 예측한 것만 CSV로 저장
    df_wrong = pd.DataFrame(wrong_records)
    #df_wrong.to_csv(save_path, index=False, encoding='utf-8-sig')
    print(f"📁 잘못 예측한 결과 출력 완료!.")

    return df_wrong




def visualize_wrong_predictions(model, val_dataset, class_names, device='cuda', start=0, n=8):
    """
    예측이 틀린 Validation 이미지들만 순차적으로 시각화
    """
    model.eval()
    wrong_samples = []

    for idx in range(len(val_dataset)):
        image, label = val_dataset[idx]
        image_input = image.unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = model(image_input)
            predicted = outputs.argmax(1).item()

        if predicted != label:
            wrong_samples.append((image, label, predicted))

    total_wrong = len(wrong_samples)
    print(f"\n🖼️ 총 잘못 분류된 샘플 수: {total_wrong}")

    end = min(start + n, total_wrong)
    plt.figure(figsize=(20, 8))

    for i in range(start, end):
        image, label, predicted = wrong_samples[i]
        true_label_name = class_names[label]
        pred_label_name = class_names[predicted]

        plt.subplot(2, n // 2, i - start + 1)
        plt.imshow(imshow(image.cpu()))
        plt.title(f"True: {true_label_name}\nPred: {pred_label_name}", fontsize=12)
        plt.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
# 예측 분석 및 CSV 저장
df = analyze_and_save_wrong_predictions(model, val_dataset, class_names, save_path='val_predictions.csv', device=device)

In [None]:
# 잘못 예측한 행만 필터링
df_wrong_only = df[df['원래 라벨 이름'] != df['예측 라벨 이름']].reset_index(drop=True)

# ID 컬럼 제거
df_wrong_only = df_wrong_only.drop(columns=['ID'])

# CSV로 저장
df_wrong_only.to_csv('val_wrong_only_v8.csv', index=False, encoding='utf-8-sig')

# 결과 확인
print(f"저장 완료: {len(df_wrong_only)}개의 잘못 예측된 샘플을 'val_wrong_only_2.csv'에 저장했습니다.")


In [None]:
# 틀린 예측 시각화 (9번부터 16개 보기)
visualize_wrong_predictions(model, val_dataset, class_names, device=device, start=9, n=16)

In [None]:
import cv2
import numpy as np
import torch
import torch.nn.functional as F

def generate_gradcam(model, image_tensor, target_class, target_layer, device='cuda'):
    model.eval()
    image_tensor = image_tensor.unsqueeze(0).to(device)

    activations = {}
    gradients = {}

    def forward_hook(module, input, output):
        activations['value'] = output

    def backward_hook(module, grad_input, grad_output):
        gradients['value'] = grad_output[0]

    # Register hooks
    handle_fw = target_layer.register_forward_hook(forward_hook)
    handle_bw = target_layer.register_backward_hook(backward_hook)

    output = model(image_tensor)
    model.zero_grad()
    one_hot = torch.zeros_like(output)
    one_hot[0][target_class] = 1
    output.backward(gradient=one_hot)

    # Get stored activations and gradients
    act = activations['value'].squeeze(0)  # [C, H, W]
    grad = gradients['value'].squeeze(0)   # [C, H, W]

    weights = grad.mean(dim=(1, 2))  # GAP
    cam = torch.sum(weights[:, None, None] * act, dim=0)

    cam = cam.cpu().detach().numpy()
    cam = np.maximum(cam, 0)
    cam = cam / cam.max()
    cam = cv2.resize(cam, (image_tensor.size(3), image_tensor.size(2)))

    # Cleanup
    handle_fw.remove()
    handle_bw.remove()

    return cam


In [None]:
def show_gradcam_on_image(image_tensor, cam):
    img = imshow(image_tensor.cpu())
    heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
    heatmap = np.float32(heatmap) / 255
    cam_img = heatmap + img
    cam_img = cam_img / np.max(cam_img)
    return cam_img


In [None]:
import os
import matplotlib.pyplot as plt
from torchvision.transforms.functional import to_pil_image

def visualize_wrong_with_gradcam(model, val_dataset, class_names, device='cuda', start=0, n=8, save_dir='wrong_predictions_gradcam'):
    model.eval()
    wrong_samples = []

    os.makedirs(save_dir, exist_ok=True)  # 저장할 폴더 생성

    # 잘못 예측한 샘플 수집
    for idx in range(len(val_dataset)):
        image, label = val_dataset[idx]
        image_input = image.unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = model(image_input)
            predicted = outputs.argmax(1).item()

        if predicted != label:
            wrong_samples.append((image, label, predicted, idx))  # 인덱스 포함

    # Grad-CAM 시각화 저장 (전체)
    for i, (image, label, predicted, data_idx) in enumerate(wrong_samples):
        try:
            if hasattr(model, 'backbone') and hasattr(model.backbone, 'stages'):
                target_layer = list(model.backbone.stages[-1].children())[-1]
            else:
                target_layer = list(model.children())[-1]
        except Exception as e:
            print("Grad-CAM target layer 설정 실패:", e)
            return

        # Grad-CAM 생성
        cam = generate_gradcam(model, image, predicted, target_layer, device)
        gradcam_img = show_gradcam_on_image(image, cam)

        # 이미지 저장
        true_label_name = class_names[label]
        pred_label_name = class_names[predicted]
        save_path = os.path.join(
            save_dir,
            f"wrong_{i:03d}_idx_{data_idx}_true_{true_label_name}_pred_{pred_label_name}.png"
        )
        gradcam_img_pil = to_pil_image(gradcam_img)
        gradcam_img_pil.save(save_path)

    # 일부 시각화
    plt.figure(figsize=(20, 8))
    end = min(start + n, len(wrong_samples))

    for i in range(start, end):
        image, label, predicted, _ = wrong_samples[i]
        cam = generate_gradcam(model, image, predicted, target_layer, device)
        gradcam_img = show_gradcam_on_image(image, cam)

        true_label_name = class_names[label]
        pred_label_name = class_names[predicted]

        plt.subplot(2, n // 2, i - start + 1)
        plt.imshow(gradcam_img)
        plt.title(f"True: {true_label_name}\nPred: {pred_label_name}", fontsize=12)
        plt.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
visualize_wrong_with_gradcam(model, val_dataset, class_names, device=device, start=9, n=16)

# Inference

In [None]:
test_dataset = CustomImageDataset(test_root, transform=val_transform, is_test=True)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

In [None]:
# 저장된 모델 로드
model = BaseModel(num_classes=len(class_names))
model.load_state_dict(torch.load('best_model.pth', map_location=device))
model.to(device)

# 추론
model.eval()
results = []

with torch.no_grad():
    for images in test_loader:
        images = images.to(device)
        outputs = model(images)
        probs = F.softmax(outputs, dim=1)

        # 각 배치의 확률을 리스트로 변환
        for prob in probs.cpu():  # prob: (num_classes,)
            result = {
                class_names[i]: prob[i].item()
                for i in range(len(class_names))
            }
            results.append(result)
            
pred = pd.DataFrame(results)

# Submission

In [None]:
submission = pd.read_csv('./sample_submission.csv', encoding='utf-8-sig')

# 'ID' 컬럼을 제외한 클래스 컬럼 정렬
class_columns = submission.columns[1:]
pred = pred[class_columns]

submission[class_columns] = pred.values
submission.to_csv('submit41_size_384.csv', index=False, encoding='utf-8-sig')

In [None]:
submission