# Library import

In [1]:
# 필요 library들을 import합니다.
import os
from typing import Tuple, Any, Callable, List, Optional, Union

import cv2
import timm
import torch
import numpy as np
import pandas as pd
import albumentations as A
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models, datasets, transforms
from tqdm.auto import tqdm
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from albumentations.pytorch import ToTensorV2

  from .autonotebook import tqdm as notebook_tqdm


# Dataset Class

In [2]:
class CustomDataset(Dataset):
    def __init__(
        self,
        root_dir: str,
        info_df: pd.DataFrame,
        transform: Callable,
        is_inference: bool = False
    ):
        # 데이터셋의 기본 경로, 이미지 변환 방법, 이미지 경로 및 레이블을 초기화합니다.
        self.root_dir = root_dir  # 이미지 파일들이 저장된 기본 디렉토리
        self.transform = transform  # 이미지에 적용될 변환 처리
        self.is_inference = is_inference # 추론인지 확인
        self.image_paths = info_df['image_path'].tolist()  # 이미지 파일 경로 목록

        if not self.is_inference:
            self.targets = info_df['target'].tolist()  # 각 이미지에 대한 레이블 목록

    def __len__(self) -> int:
        # 데이터셋의 총 이미지 수를 반환합니다.
        return len(self.image_paths)

    def __getitem__(self, index: int) -> Union[Tuple[torch.Tensor, int], torch.Tensor]:
        # 주어진 인덱스에 해당하는 이미지를 로드하고 변환을 적용한 후, 이미지와 레이블을 반환합니다.
        img_path = os.path.join(self.root_dir, self.image_paths[index])  # 이미지 경로 조합
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)  # 이미지를 BGR 컬러 포맷의 numpy array로 읽어옵니다.
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # BGR 포맷을 RGB 포맷으로 변환합니다.
        image = self.transform(image)  # 설정된 이미지 변환을 적용합니다.

        if self.is_inference:
            return image
        else:
            target = self.targets[index]  # 해당 이미지의 레이블
            return image, target  # 변환된 이미지와 레이블을 튜플 형태로 반환합니다.

# Transform Class

In [3]:
class TorchvisionTransform:
    def __init__(self, is_train: bool = True):
        # 공통 변환 설정: 이미지 리사이즈, 텐서 변환, 정규화
        common_transforms = [
            transforms.Resize((224, 224)),  # 이미지를 224x224 크기로 리사이즈
            transforms.ToTensor(),  # 이미지를 PyTorch 텐서로 변환
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
        ]

        if is_train:
            # 훈련용 변환: 랜덤 수평 뒤집기, 랜덤 회전, 색상 조정 추가
            self.transform = transforms.Compose(
                [
                    transforms.RandomHorizontalFlip(p=0.5),  # 50% 확률로 이미지를 수평 뒤집기
                    transforms.RandomRotation(15),  # 최대 15도 회전
                    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # 밝기 및 대비 조정
                ] + common_transforms
            )
        else:
            # 검증/테스트용 변환: 공통 변환만 적용
            self.transform = transforms.Compose(common_transforms)

    def __call__(self, image: np.ndarray) -> torch.Tensor:
        image = Image.fromarray(image)  # numpy 배열을 PIL 이미지로 변환

        transformed = self.transform(image)  # 설정된 변환을 적용

        return transformed  # 변환된 이미지 반환

In [4]:
class AlbumentationsTransform:
    def __init__(self, is_train: bool = True):
        # 공통 변환 설정: 이미지 리사이즈, 정규화, 텐서 변환
        common_transforms = [
            A.Resize(384, 384),  # 이미지를 224x224 크기로 리사이즈
            A.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),  # 정규화
            ToTensorV2()  # albumentations에서 제공하는 PyTorch 텐서 변환
        ]

        if is_train:
            # 훈련용 변환: 랜덤 수평 뒤집기, 랜덤 회전, 랜덤 밝기 및 대비 조정 추가
            self.transform = A.Compose(
                [
                    A.HorizontalFlip(p=0.5),  # 50% 확률로 이미지를 수평 뒤집기
                    A.Rotate(limit=15),  # 최대 15도 회전
                    A.RandomBrightnessContrast(p=0.2),  # 밝기 및 대비 무작위 조정
                ] + common_transforms
            )
        else:
            # 검증/테스트용 변환: 공통 변환만 적용
            self.transform = A.Compose(common_transforms)

    def __call__(self, image) -> torch.Tensor:
        # 이미지가 NumPy 배열인지 확인
        if not isinstance(image, np.ndarray):
            raise TypeError("Image should be a NumPy array (OpenCV format).")

        # 이미지에 변환 적용 및 결과 반환
        transformed = self.transform(image=image)  # 이미지에 설정된 변환을 적용

        return transformed['image']  # 변환된 이미지의 텐서를 반환

# Model Class

In [5]:
# 학습에 사용할 장비를 선택.
# torch라이브러리에서 gpu를 인식할 경우, cuda로 설정.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [6]:
class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self, smoothing: float = 0.1):
        """
        Cross-Entropy Loss with Label Smoothing
        :param smoothing: amount of smoothing to apply (default: 0.1)
        """
        super(LabelSmoothingCrossEntropy, self).__init__()
        self.smoothing = smoothing
        self.confidence = 1.0 - smoothing

    def forward(self, pred: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
        """
        :param pred: Predicted logits (before applying softmax) of shape (batch_size, num_classes)
        :param target: Ground truth labels of shape (batch_size)
        :return: Label Smoothed Cross Entropy Loss
        """
        log_probs = torch.nn.functional.log_softmax(pred, dim=-1)  # Apply softmax to get log-probabilities
        n_classes = pred.size(-1)

        # Create a smoothing version of the target distribution
        with torch.no_grad():
            true_dist = torch.zeros_like(log_probs)
            true_dist.fill_(self.smoothing / (n_classes - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)

        # Compute loss
        return torch.mean(torch.sum(-true_dist * log_probs, dim=-1))

In [7]:
class SoftTargetCrossEntropy(nn.Module):
    def __init__(self):
        super(SoftTargetCrossEntropy, self).__init__()

    def forward(self, pred: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
        
        log_probs = torch.nn.functional.log_softmax(pred, dim=-1)  # Apply softmax to get log-probabilities
        # Compute loss
        return torch.mean(torch.sum(-target * log_probs, dim=-1))

In [8]:
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from transformers import SwinForImageClassification, SwinConfig
import requests

class SwinTransformerLightning(pl.LightningModule):
    def __init__(self, num_classes=10, lr=1e-5, weight_decay=0.01):
        super().__init__()
        self.model = SwinForImageClassification.from_pretrained("microsoft/swin-large-patch4-window12-384-in22k")

        self.model.classifier = nn.Linear(in_features=1536, out_features=500, bias=True)

        self.lr = lr
        self.weight_decay = weight_decay
        self.loss_fn = SoftTargetCrossEntropy()
        self.loss_fn_crossentropy = nn.CrossEntropyLoss()

        self.head_params = list(self.model.classifier.parameters())
        head_param_ids = {id(p) for p in self.head_params}
        self.all_params = [p for p in self.model.parameters() if id(p) not in head_param_ids]

    def forward(self, pixel_values):
        return self.model(pixel_values=pixel_values).logits

    def training_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn(logits, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn_crossentropy(logits, labels)
        self.log('val_loss', loss)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(
            [
                {'params': self.head_params, 'lr': 5e-5, 'weight_decay': 1e-4},  # Classifier에 대한 설정
                {'params': self.all_params, 'lr': 1e-5, 'weight_decay': 1e-2}  # Swin에 대한 설정
            ]
        )
        return optimizer

In [9]:
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from transformers import ConvNextV2ForImageClassification, AutoModelForImageClassification 
import torch.nn.functional as F
from torch.optim.lr_scheduler import CosineAnnealingLR
import timm
import requests

class SwinConvNextClassifier(pl.LightningModule):
    def __init__(self, num_classes=500, lr=1e-4, weight_decay=0.01):
        super().__init__()
       
        self.swin = AutoModelForImageClassification.from_pretrained("team-lucid/swinv2-base-path4-window24-384-doc")
        self.convnext = ConvNextV2ForImageClassification.from_pretrained("facebook/convnextv2-base-22k-384")

        self.swin_output_dim = self.swin.classifier.in_features
        self.swin.classifier = nn.Identity()
     
        
        self.convnext_output_dim = self.convnext.classifier.in_features
        self.convnext.classifier = nn.Identity()

        
        combined_dim = self.swin_output_dim + self.convnext_output_dim

        self.classifier = nn.Linear(combined_dim, num_classes)
 
        self.lr = lr

        self.weight_decay = weight_decay
        self.loss_fn = SoftTargetCrossEntropy()
        self.loss_fn_crossentropy = nn.CrossEntropyLoss()

    def forward(self, pixel_values):
        swin_features = self.swin(pixel_values).logits
        convnet_features = self.convnext(pixel_values).logits

        # Swin과 ConvNexT 특징을 결합 (Concat)
        combined_features = torch.cat((swin_features, convnet_features), dim=1)
        
        # 결합된 특징을 classifier에 통과시켜 최종 출력
        logits = self.classifier(combined_features)
        return logits

    def training_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn(logits, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn_crossentropy(logits, labels)
        self.log('val_loss', loss)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(
            [
                {'params': self.classifier.parameters(), 'lr': 3e-5, 'weight_decay': 1e-2},  # Classifier에 대한 설정
                {'params': self.convnext.parameters(), 'lr': 1e-5, 'weight_decay': 1e-4},
                {'params': self.swin.parameters(), 'lr': 1e-5, 'weight_decay': 1e-2},
            ]
        )
        scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6)
        return [optimizer], [scheduler]

In [10]:
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from transformers import BeitModel, SwinModel
import torch.nn.functional as F
from torch.optim.lr_scheduler import CosineAnnealingLR
import requests

class CoAtNetClassifier(pl.LightningModule):
    def __init__(self, num_classes=500, lr=1e-4, weight_decay=0.01):
        super().__init__()
        
        self.coatnet = timm.create_model('coatnet_rmlp_2_rw_384.sw_in12k_ft_in1k', pretrained=True)
        self.coatnet.head.fc = nn.Linear(self.coatnet.head.fc.in_features, num_classes)


        self.classifier_params = list(self.coatnet.head.parameters())
        head_param_ids = {id(p) for p in self.classifier_params}
        self.all_params = [p for p in self.parameters() if id(p) not in head_param_ids]

        self.lr = lr

        self.weight_decay = weight_decay
        self.loss_fn_crossentropy = nn.CrossEntropyLoss()
        self.loss_fn = SoftTargetCrossEntropy()

    def forward(self, pixel_values):
        
        logits = self.coatnet(pixel_values)
        return logits

    def training_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn_crossentropy(logits, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn_crossentropy(logits, labels)
        self.log('val_loss', loss)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(
            [
                {'params': self.classifier_params, 'lr': 5e-5, 'weight_decay': 1e-4},  # Classifier에 대한 설정
                {'params': self.all_params, 'lr': 1e-5, 'weight_decay': 1e-2}  # Swin에 대한 설정
            ]
        )
        scheduler = CosineAnnealingLR(optimizer, T_max=5, eta_min=1e-6)
        return [optimizer], [scheduler]


In [11]:
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from transformers import BeitModel, SwinModel
import timm
import torch.nn.functional as F
from torch.optim.lr_scheduler import CosineAnnealingLR

class EfficientNetV2(pl.LightningModule):
    def __init__(self, num_classes=500, lr=1e-4, weight_decay=0.01):
        super().__init__()
        
        self.model = timm.create_model('tf_efficientnetv2_xl.in21k_ft_in1k', pretrained=True, num_classes=0)
        self.model.classifier = nn.Linear(in_features=1280, out_features=num_classes, bias=True)
       
        # 파라미터 ID를 사용하여 필터링하기
        self.head_params = list(self.model.classifier.parameters())
        head_param_ids = {id(p) for p in self.head_params}
        self.all_params = [p for p in self.model.parameters() if id(p) not in head_param_ids]

        self.lr = lr

        self.weight_decay = weight_decay
        self.loss_fn = SoftTargetCrossEntropy()
        self.loss_fn_crossentropy = nn.CrossEntropyLoss()

    def forward(self, pixel_values):
        outputs = self.model(pixel_values)  # Swin hidden_states 추출
        return outputs

    def training_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn(logits, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        pixel_values, labels = batch
        logits = self.forward(pixel_values)
        loss = self.loss_fn_crossentropy(logits, labels)
        self.log('val_loss', loss)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(
            [
                {'params': self.head_params, 'lr': 5e-5, 'weight_decay': 1e-4},  # Classifier에 대한 설정
                {'params': self.all_params, 'lr': 1e-5, 'weight_decay': 1e-2}  # Swin에 대한 설정
            ]
        )
        scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6)
        return [optimizer], [scheduler]


# Inference

In [12]:
swin = SwinTransformerLightning.load_from_checkpoint('./checkpoints/swin_large_384/epoch=10-val_loss=0.45.ckpt')

  return self.fget.__get__(instance, owner)()


In [13]:
swin2convnext2 = SwinConvNextClassifier.load_from_checkpoint('./checkpoints/swin2convnext2_384/epoch=13-val_loss=0.39.ckpt')

Some weights of Swinv2ForImageClassification were not initialized from the model checkpoint at team-lucid/swinv2-base-path4-window24-384-doc and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [14]:
coatnet = CoAtNetClassifier.load_from_checkpoint('./checkpoints/coatnet/epoch=05-val_loss=0.49.ckpt')

In [15]:
efficientnet2 = EfficientNetV2.load_from_checkpoint('./checkpoints/efficientnetv2/epoch=33-val_loss=0.55.ckpt')

In [23]:
# 모델 추론을 위한 함수
def inference(
    swin: nn.Module,
    swin2convnext2: nn.Module,
    coatnet: nn.Module,
    efficientnet2: nn.Module,
    device: torch.device,
    test_loader: DataLoader
):
    # 모델을 평가 모드로 설정
    swin.to(device)
    swin.eval()

    swin2convnext2.to(device)
    swin2convnext2.eval()

    coatnet.to(device)
    coatnet.eval()

    efficientnet2.to(device)
    efficientnet2.eval()

    predictions = []
    with torch.no_grad():  # Gradient 계산을 비활성화
        for image in tqdm(test_loader):
            # 데이터를 같은 장치로 이동
            image = image.to(device)

            # 모델을 통해 예측 수행
            swin_logits = swin(image)
            swin2convnext2_logits = swin2convnext2(image)
            coatnet_logits = coatnet(image)
            efficientnet2_logits = efficientnet2(image)

            average_logits = (swin_logits + swin2convnext2_logits + coatnet_logits + efficientnet2_logits) / 4

            logits = F.softmax(average_logits, dim=1)
            preds = logits.argmax(dim=1)

            # 예측 결과 저장
            predictions.extend(preds.cpu().detach().numpy())  # 결과를 CPU로 옮기고 리스트에 추가

    return predictions

In [24]:
# 추론 데이터의 경로와 정보를 가진 파일의 경로를 설정.
testdata_dir = "../data/test"
testdata_info_file = "../data/test.csv"
save_result_path = "../train_result"

In [25]:
# 추론 데이터의 class, image path, target에 대한 정보가 들어있는 csv파일을 읽기.
test_info = pd.read_csv(testdata_info_file)

# 총 class 수.
num_classes = 500

In [26]:
# 추론에 사용할 Transform을 선언.

test_transform = AlbumentationsTransform(is_train=False)


test_dataset = CustomDataset(
    root_dir=testdata_dir,
    info_df=test_info,
    transform=test_transform,
    is_inference=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=False,
    drop_last=False,
    num_workers=4
)

In [27]:
# predictions를 CSV에 저장할 때 형식을 맞춰서 저장
# 테스트 함수 호출

predictions = inference(
    swin=swin,
    swin2convnext2=swin2convnext2,
    coatnet=coatnet,
    efficientnet2=efficientnet2,
    device=device,
    test_loader=test_loader,
)

100%|██████████| 626/626 [13:19<00:00,  1.28s/it]


In [28]:
# 모든 클래스에 대한 예측 결과를 하나의 문자열로 합침
test_info['target'] = predictions
test_info = test_info.reset_index().rename(columns={"index": "ID"})
test_info

Unnamed: 0,ID,image_path,target
0,0,0.JPEG,328
1,1,1.JPEG,414
2,2,2.JPEG,493
3,3,3.JPEG,17
4,4,4.JPEG,388
...,...,...,...
10009,10009,10009.JPEG,235
10010,10010,10010.JPEG,191
10011,10011,10011.JPEG,466
10012,10012,10012.JPEG,400


In [29]:
# DataFrame 저장
test_info.to_csv("./result/bagging_384with4model.csv", index=False)

In [24]:
# torch.save(model.state_dict(), 'model_state_dict_deit_v0=epoch30.pth')

: 