In [None]:
import os
import time
import random

import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
from timm.data.mixup import Mixup

import numpy as np
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, accuracy_score


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
def train_one_epoch(loader, model, optimizer, loss_fn, device, val_loader=None, epoch=None, mixup_fn=None):
    model.train()
    train_loss = 0
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)

    for image, targets in pbar:
        image = image.to(device)
        targets = targets.to(device, dtype=torch.long)

        if mixup_fn is not None:
            image, targets = mixup_fn(image, targets)

        model.zero_grad(set_to_none=True)
        preds = model(image)
        loss = loss_fn(preds, targets)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.argmax(dim=1).detach().cpu().numpy())

        pbar.set_description(f"Loss: {loss.item():.4f}")

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_loss": train_loss,
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    if val_loader:
        val_preds, val_targets = [], []
        model.eval()
        with torch.no_grad():
            for images, targets in val_loader:
                images = images.to(device)
                targets = targets.to(device, dtype=torch.long)

                if mixup_fn is not None:
                    images, targets = mixup_fn(images, targets)

                preds = model(images)
                val_preds.extend(preds.argmax(dim=1).cpu().numpy())
                val_targets.extend(targets.argmax(dim=1).cpu().numpy())

        val_loss = loss_fn(preds, targets).item()
        val_acc = accuracy_score(val_targets, val_preds)
        val_f1 = f1_score(val_targets, val_preds, average='macro')

        ret.update({
            "val_loss": val_loss,
            "val_acc": val_acc,
            "val_f1": val_f1,
            "val_preds": val_preds,  # val_preds를 결과 딕셔너리에 추가
            "val_targets": val_targets,  # val_targets를 결과 딕셔너리에 추가
        })

        if epoch is not None:
            print(f"Epoch: {epoch+1}/{EPOCHS}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Train F1: {train_f1:.4f}, Val F1: {val_f1:.4f}")

    return ret

In [None]:
import os
import cv2
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from albumentations import (
    Compose, RandomResizedCrop, Resize, HorizontalFlip, VerticalFlip,
    RandomRotate90, Rotate, GaussianBlur, HueSaturationValue,
    RandomBrightnessContrast, Normalize
)

# 증강 기법 정의
def get_train_augmentation(img_size, mixup_prob=0.5, alpha=1.0):
    return Compose([
        RandomResizedCrop(height=img_size, width=img_size, scale=(0.8, 1.0), p=0.5),
        Resize(height=img_size, width=img_size),
        HorizontalFlip(p=0.5),
        VerticalFlip(p=0.5),
        RandomRotate90(p=0.5),
        Rotate(limit=(-35, 35), p=0.5),
        GaussianBlur(blur_limit=(3, 7), p=0.5),
        HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),
        RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        Normalize(mean=[0.57433558, 0.58330406, 0.58818927], std=[0.18964056, 0.18694252, 0.18506919]),
        ToTensorV2()
    ])
    
# 테스트 데이터 변환 기법 정의
def get_test_augmentation(img_size):
    return Compose([
        Resize(height=img_size, width=img_size),
        Normalize(mean=[0.67112013, 0.67663422, 0.6792661], std=[0.19423191, 0.19232531, 0.19091303]),
        ToTensorV2()
    ])
    
class ImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None, num_augmentations=1):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
        self.num_augmentations = num_augmentations

    def __len__(self):
        return len(self.image_paths) * self.num_augmentations

    def __getitem__(self, idx):
        image_idx = idx // self.num_augmentations
        image_path = self.image_paths[image_idx]
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = self.labels[image_idx]

        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
            if 'mixup' in augmented:
                label = augmented['mixup']['target']

        return image, label

In [None]:
class EarlyStopping:
    """조기 종료(Early stopping) 을 위한 클래스"""
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_f1_max = np.Inf
        self.delta = delta

    def __call__(self, val_f1, model):
        score = val_f1

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_f1, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_f1, model)
            self.counter = 0

    def save_checkpoint(self, val_f1, model):
        """모델의 매개변수를 저장합니다"""
        if self.verbose:
            print(f'Validation F1 score increased ({self.val_f1_max:.6f} --> {val_f1:.6f}).  Saving model ...')
        torch.save(model.state_dict(), 'checkpoint.pt')
        self.val_f1_max = val_f1

In [None]:
import wandb
wandb.login()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mgimjeongheon38[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

In [None]:
def main():
    wandb.init(project="kimjeongheon_cv", entity = 'upstage-ai-comp', name ='test1')
    """
    def 제외하고 복붙하면 된다는 . 
    """
    # 시드를 고정합니다.
    SEED = 42
    os.environ['PYTHONHASHSEED'] = str(SEED)
    random.seed(SEED)
    np.random.seed(SEED)
    torch.manual_seed(SEED)
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.benchmark = False

    # device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # model config
    model_name = 'efficientnet_b4' #'resnet34' # 'resnet50' 'efficientnet-b0', ...

    # training config
    img_size = 384
    LR = wandb.config.lr
    EPOCHS = 100
    BATCH_SIZE = 16
    num_workers = 0

    # 증강 데이터 배수 설정
    num_augmentations = 5

    alpha = wandb.config.alpha
    mixup_prob = wandb.config.p

    train_df = pd.read_csv("/data/ephemeral/home/data/train.csv", usecols=['ID', 'target'])
    train_image_paths = [f"/data/ephemeral/home/data/train/{fname}" for fname in train_df['ID']]
    train_labels = train_df['target'].values

    test_df = pd.read_csv("/data/ephemeral/home/data/sample_submission.csv")
    test_image_paths = test_df['ID'].apply(lambda x: f"/data/ephemeral/home/data/test/{x}").tolist()
    test_labels = [0] * len(test_image_paths)  # 테스트 데이터셋에는 레이블이 없으므로 더미 레이블 사용

    tst_dataset = ImageDataset(test_image_paths, test_labels, transform=get_test_augmentation(img_size))
    tst_loader = DataLoader(tst_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
    # Stratified K-Fold Cross Validation 설정
    n_splits = 5
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    fold_scores = []

    # 5개의 Fold에 대한 예측 확률을 저장할 리스트
    pred_probs = []

    # 각 fold에 대해 모델 학습 및 평가
    for fold, (train_idx, val_idx) in enumerate(skf.split(train_image_paths, train_labels)):
        print(f'Fold {fold+1}/{n_splits}')

        # 학습 데이터와 검증 데이터 분리
        train_paths = [train_image_paths[i] for i in train_idx]
        train_labels_ = [train_labels[i] for i in train_idx]
        val_paths = [train_image_paths[i] for i in val_idx]
        val_labels = [train_labels[i] for i in val_idx]

        # 데이터셋 및 데이터로더 생성
        train_dataset = ImageDataset(train_paths, train_labels_, transform=get_train_augmentation(img_size, mixup_prob=0.5, alpha=1.0), num_augmentations=num_augmentations)
        val_dataset = ImageDataset(val_paths, val_labels, transform=get_train_augmentation(img_size, mixup_prob=0.5, alpha=1.0), num_augmentations=num_augmentations)
        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=num_workers)
        val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers)

        # 모델 초기화
        model = timm.create_model(model_name, pretrained=True, num_classes=17).to(device)
        loss_fn = nn.CrossEntropyLoss()

        # AdamW 옵티마이저 사용
        optimizer = optim.AdamW(model.parameters(), lr=LR)

        # 학습률 스케줄러 설정
        scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, verbose=True)

        # 조기 종료 설정
        early_stopping = EarlyStopping(patience=5, verbose=True)

        # 모델 학습
        for epoch in range(EPOCHS):
            mixup_fn = Mixup(mixup_alpha=alpha, cutmix_alpha=0.0, prob=mixup_prob, switch_prob=0.0, mode='batch', label_smoothing=0.0, num_classes=17)
            results = train_one_epoch(train_loader, model, optimizer, loss_fn, device, val_loader, epoch, mixup_fn)

            # 학습률 스케줄러 업데이트
            scheduler.step(results['val_f1'])

            # 조기 종료 확인
            early_stopping(results['val_f1'], model)
            if early_stopping.early_stop:
                print("Early stopping")
                break

        # 모델 평가
        val_preds = results['val_preds']
        val_targets = results['val_targets']
        val_f1 = f1_score(val_targets, val_preds, average='macro')
        fold_scores.append(val_f1)
        
        # 현재 Fold 모델의 테스트 데이터에 대한 예측 확률 저장
        model.eval()
        with torch.no_grad():
            fold_pred_probs = []
            for image, _ in tqdm(tst_loader):
                image = image.to(device)
                preds = model(image)
                fold_pred_probs.extend(preds.softmax(dim=1).detach().cpu().numpy())
            pred_probs.append(fold_pred_probs)
    
    
    # 전체 평균 점수 계산
    mean_score = np.mean(fold_scores)
    print(f'Mean Macro F1 Score: {mean_score:.4f}')

    pred_probs = np.mean(pred_probs, axis=0)
    preds_list = np.argmax(pred_probs, axis=1)

    submission_df = pd.read_csv("/data/ephemeral/home/data/sample_submission.csv")
    submission_df['target'] = preds_list
    submission_df.to_csv("submission.csv", index=False)

sweep_configuration = {
    'method': 'bysi',
    'name': 'sweep',
    'metric': {'goal': 'maximize', 'name': 'val_acc'},
    
    'parameters': 
    {
        'lr': {'max': 0.01, 'min': 0.0001}, # 스케 
        'alpha' : {'max': 1.0, 'min': 0.4},
        'p' : {'max': 0.7, 'min': 0.4},
        # p : aug probabilities.
        #'mix_up' : aug 기법 < 어느정도 , 지금은 0.5 
        # 여기서 유의미한 값이 최신 모델에도 유용할까... 해봐야 앎.
    }
}

# Initialize sweep by passing in config.
sweep_id = wandb.sweep(sweep=sweep_configuration, project="<project>")

# Start sweep job.
wandb.agent(sweep_id, function=main, count=4) #main 함수를 넣어줘야되는대. 