In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Import

In [2]:
import random
import pandas as pd
import numpy as np
import os
import re
import glob
import cv2

# To read zip file
import zipfile
from zipfile import ZipFile
from io import BytesIO

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

# image augmentation library
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from tqdm import tqdm

import warnings
warnings.filterwarnings(action='ignore')

In [3]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


## Hyperparameter Setting

In [4]:
CFG = {
    'IMG_SIZE':256, # crop size
    'EPOCHS':10,
    'LEARNING_RATE':1e-3,
    'BATCH_SIZE': 32,
    'SEED':41,
    'VERSION': 3,
    'MODEL': "SwinTv2"
}

SWITCH = {
    "SAVE" : True,
    "TRAIN" : True,
    "TRANSFER": True
}

## Fixed RandomSeed

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Train & Validation Split

In [6]:
main_dir_path = "/content/drive/Othercomputers/my/notebooks/bird-classification"
zipfile_path = os.path.join(main_dir_path, "open.zip")

In [7]:
csv_data = dict()
with ZipFile(zipfile_path, 'r') as zipObj:
    listOfFileNames = zipObj.namelist()
    for fileName in listOfFileNames:
        if fileName.endswith('csv'):
            zipRead = zipObj.read(fileName)
            curr_df = pd.read_csv(BytesIO(zipRead))
            csv_data[f"{fileName}"] = curr_df

In [8]:
df = csv_data["train.csv"]

In [9]:
train_data, val_data, _, _ = train_test_split(df, df['label'], test_size=0.3, stratify=df['label'], random_state=CFG['SEED'])

## Label-Encoding

In [10]:
le = preprocessing.LabelEncoder()
train_data['label'] = le.fit_transform(train_data['label'])
val_data['label'] = le.transform(val_data['label'])

## CustomDataset

In [11]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms
        self.data = []

        # read zip file to save storage
        with ZipFile(zipfile_path, 'r') as zipObj:
            for img_path in tqdm(self.img_path_list):
                buf = zipObj.read(img_path[2:])
                image = cv2.imdecode(np.frombuffer(buf, np.uint8), cv2.IMREAD_COLOR)

                if self.transforms is not None:
                    image = self.transforms(image=image)['image']

                self.data.append(image)


    def __getitem__(self, index):
        # img_path = self.img_path_list[index][2:]

        # with ZipFile(zipfile_path, 'r') as zipObj:
        #     data = zipObj.read(img_path)
        #     image = cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)

        # if self.transforms is not None:
        #     image = self.transforms(image=image)['image']

        if self.label_list is not None:
            label = self.label_list[index]
            return self.data[index], label
        else:
            return self.data[index]

    def __len__(self):
        return len(self.img_path_list)

In [12]:
# ImageNet "Standard" Normalization
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

In [13]:
if SWITCH["TRAIN"] :
    train_dataset = CustomDataset(train_data['img_path'].values, train_data['label'].values, train_transform)
    train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

    val_dataset = CustomDataset(val_data['img_path'].values, val_data['label'].values, test_transform)
    val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

100%|██████████| 11083/11083 [00:14<00:00, 779.08it/s]
100%|██████████| 4751/4751 [00:05<00:00, 813.69it/s]


## Model Define

In [14]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b0(pretrained=True)
        self.classifier = nn.Linear(1000, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [22]:
class SwinTv2(nn.Module):
    def __init__(self, num_classes=len(le.classes_), transfer=False):
        super(SwinTv2, self).__init__()
        self.backbone = models.swin_v2_b(pretrained=True)
        if transfer :
            print("Transfer Learning!!!")
            for param in self.backbone.parameters():
                param.requires_grad = False
        self.classifier = nn.Linear(self.backbone.num_classes, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [16]:
class ViT_H_14(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(ViT_H_14, self).__init__()
        self.backbone = models.vit_h_14(weights=models.ViT_H_14_Weights.IMAGENET1K_SWAG_E2E_V1)
        self.classifier = nn.Linear(1000, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [17]:
class RegNet_Y_128GF(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(RegNet_Y_128GF, self).__init__()
        self.backbone = models.regnet_y_128gf(weights=models.RegNet_Y_128GF_Weights.IMAGENET1K_SWAG_E2E_V1)
        self.classifier = nn.Linear(1000, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [18]:
class EfficientNet_V2_M(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(EfficientNet_V2_M, self).__init__()
        self.backbone = models.efficientnet_v2_m(weights=models.EfficientNet_V2_M_Weights.IMAGENET1K_V1)
        self.classifier = nn.Linear(1000, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

## Train

In [19]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)

    best_score = 0
    best_model = None

    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()                                  # Switch train mode
        train_loss = []
        for imgs, labels in tqdm(iter(train_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            output = model(imgs)
            loss = criterion(output, labels)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val F1 Score : [{_val_score:.5f}]')

        if scheduler is not None:
            scheduler.step(_val_score)

        if best_score < _val_score:
            best_score = _val_score
            best_model = model

    return best_model

In [20]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, true_labels = [], []

    with torch.no_grad():
        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)

            pred = model(imgs)

            loss = criterion(pred, labels)

            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += labels.detach().cpu().numpy().tolist()

            val_loss.append(loss.item())

        _val_loss = np.mean(val_loss)
        _val_score = f1_score(true_labels, preds, average='macro')

    return _val_loss, _val_score

## Run!!

In [25]:
model = SwinTv2(transfer=SWITCH["TRANSFER"])
model.eval()
optimizer = torch.optim.Adam(params = model.classifier.parameters(), lr = CFG["LEARNING_RATE"])
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG["EPOCHS"], eta_min=0.00001)

if SWITCH["TRAIN"] :
    infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

Transfer Learning!!!


100%|██████████| 347/347 [01:42<00:00,  3.39it/s]
100%|██████████| 149/149 [00:45<00:00,  3.25it/s]


Epoch [1], Train Loss : [1.67638] Val Loss : [1.19828] Val F1 Score : [0.65582]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.23it/s]


Epoch [2], Train Loss : [1.12588] Val Loss : [1.03512] Val F1 Score : [0.70094]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.23it/s]


Epoch [3], Train Loss : [1.01252] Val Loss : [0.97602] Val F1 Score : [0.71314]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.24it/s]


Epoch [4], Train Loss : [0.94762] Val Loss : [0.93943] Val F1 Score : [0.73295]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.24it/s]


Epoch [5], Train Loss : [0.89338] Val Loss : [0.91546] Val F1 Score : [0.73851]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.23it/s]


Epoch [6], Train Loss : [0.87791] Val Loss : [0.90898] Val F1 Score : [0.73835]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.24it/s]


Epoch [7], Train Loss : [0.85116] Val Loss : [0.89573] Val F1 Score : [0.73820]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.24it/s]


Epoch [8], Train Loss : [0.81610] Val Loss : [0.89242] Val F1 Score : [0.74256]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.24it/s]


Epoch [9], Train Loss : [0.81218] Val Loss : [0.88131] Val F1 Score : [0.74941]


100%|██████████| 347/347 [01:48<00:00,  3.19it/s]
100%|██████████| 149/149 [00:46<00:00,  3.23it/s]

Epoch [10], Train Loss : [0.79599] Val Loss : [0.88135] Val F1 Score : [0.74615]





## Save model

In [26]:
model_name = "{0}_Ver{1}.pth".format(CFG["MODEL"], CFG["VERSION"])
PATH = os.path.join(main_dir_path, "weights", model_name)
if SWITCH["SAVE"]:
    torch.save(model, PATH)

## Inference

In [27]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs in tqdm(iter(test_loader)):
            imgs = imgs.float().to(device)

            pred = model(imgs)

            preds += pred.argmax(1).detach().cpu().numpy().tolist()

    preds = le.inverse_transform(preds)
    return preds

In [28]:
if not SWITCH["TRAIN"]:
    infer_model = torch.load(PATH)
    test = csv_data["test.csv"]
    test_dataset = CustomDataset(test['img_path'].values, None, test_transform)
    test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)
    preds = inference(infer_model, test_loader, device)

    # Submission
    submit = csv_data["sample_submission.csv"]
    submit['label'] = preds
    file_name = "{0}_submit_ver{1}.csv".format(CFG["MODEL"], CFG["VERSION"])
    submit_path = os.path.join(main_dir_path, "submit", file_name)
    submit.to_csv(submit_path, index=False)