<a href="https://colab.research.google.com/github/bcmin1018/CV/blob/main/notebooks/EfficientNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os, sys 
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/MyDrive/Colab Notebooks/data

/content/drive/MyDrive/Colab Notebooks/data


In [None]:
!unzip -qq "/content/drive/MyDrive/Colab Notebooks/data/test.zip"

In [None]:
!unzip -qq "/content/drive/MyDrive/Colab Notebooks/data/train.zip"

In [None]:
!pip install timm

import warnings
warnings.filterwarnings('ignore')

from glob import glob
import pandas as pd
import numpy as np 
from tqdm import tqdm
import cv2

import os
import timm
import random

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from copy import deepcopy
import time

device = torch.device('cuda')



In [None]:
DIR = '/content/drive/MyDrive/Colab Notebooks/data/'
TRAIN_SOURCE = os.path.join(DIR, "train_df.csv")
TEST_SOURCE = os.path.join(DIR, "test_df.csv")
SAMPLE_SUBMISSION = os.path.join(DIR, "sample_submission.csv")

In [None]:
train_df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/data/train_df.csv')
test_df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/data/test_df.csv')

In [None]:
train_png = sorted(glob('/content/drive/MyDrive/Colab Notebooks/data/train/*.png'))
test_png = sorted(glob('/content/drive/MyDrive/Colab Notebooks/data/test/*.png'))

In [None]:
train_labels = train_df["label"]
label_unique = sorted(np.unique(train_labels))
label_unique = {key:value for key,value in zip(label_unique, range(len(label_unique)))}
train_labels = [label_unique[k] for k in train_labels]

In [None]:
normalize = train_df['label'].value_counts(normalize=True)

In [None]:
weight_ratio = 10 - normalize *100

In [None]:
weight_list_all = np.ones(88)
for key in weight_ratio.index:
  idx = label_unique[key]
  weight = weight_ratio[key]
  weight_list_all[idx] = weight
weight_list_all_tensor = torch.FloatTensor(weight_list_all).cuda()
print(weight_list_all_tensor)

tensor([9.7662, 9.7428, 9.7428, 5.1134, 9.8363, 9.8597, 9.8597, 9.8363, 9.8831,
        4.7627, 9.8597, 9.8831, 9.8831, 9.7194, 9.7428, 4.8796, 9.7428, 9.7194,
        9.7662, 9.7662, 9.7896, 3.4534, 9.7896, 9.7896, 9.7662, 9.8597, 9.8597,
        9.8597, 3.8274, 9.8597, 9.8597, 9.7896, 9.7896, 0.8581, 9.7896, 9.7896,
        9.7662, 9.7662, 9.7896, 9.7662, 4.2717, 9.7896, 9.6960, 9.7428, 9.7194,
        4.8562, 9.7194, 9.6960, 9.7896, 9.7428, 9.6960, 9.7662, 3.7573, 9.8831,
        9.7194, 2.5181, 9.7194, 9.7194, 9.6960, 9.7194, 9.7194, 9.7896, 9.7896,
        4.6224, 9.8130, 9.7896, 9.8130, 9.6493, 8.5971, 9.8831, 9.8831, 9.8831,
        5.0199, 9.8831, 9.9065, 9.8597, 4.2249, 9.8831, 9.8831, 9.7428, 9.7662,
        9.8130, 9.7896, 9.8130, 4.3886, 9.7896, 9.7896, 9.8130],
       device='cuda:0')


In [None]:
def img_load(path):
    img = cv2.imread(path)[:,:,::-1]
    img = cv2.resize(img, (256, 256))
    return img

In [None]:
train_imgs = [img_load(m) for m in tqdm(train_png)]
test_imgs = [img_load(n) for n in tqdm(test_png)]

100%|██████████| 4277/4277 [05:29<00:00, 13.00it/s]
100%|██████████| 2154/2154 [14:34<00:00,  2.46it/s]


In [None]:
train_imgs, valid_imgs, train_labels, valid_labels = train_test_split(train_imgs, train_labels, test_size = 0.2, shuffle=True, stratify=train_labels, random_state = 2022)

In [None]:
class Custom_dataset(Dataset):
    def __init__(self, img_paths, labels, mode='train'):
        self.img_paths = img_paths
        self.labels = labels
        self.mode=mode
    def __len__(self):
        return len(self.img_paths)
    def __getitem__(self, idx):
        img = self.img_paths[idx]
        if self.mode=='train':
            augmentation = random.randint(0,2)
            if augmentation==1:
                img = img[::-1].copy()
            elif augmentation==2:
                img = img[:,::-1].copy()
        img = transforms.ToTensor()(img)
        if self.mode=='test':
            pass
        
        label = self.labels[idx]
        return img, label
    
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()
        self.model = timm.create_model('efficientnet_b0', pretrained=True, num_classes=88)
        
    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
batch_size = 32
epochs = 24

# Train
train_dataset = Custom_dataset(train_imgs, train_labels, mode='train')
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

# valid
valid_dataset = Custom_dataset(train_imgs, train_labels, mode='train')
valid_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

# Test
test_dataset = Custom_dataset(np.array(test_imgs), np.array(["tmp"]*len(test_imgs)), mode='test')
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size)

In [None]:
def f1_score_function(real, pred):
    score = f1_score(real, pred, average="macro")
    return score

def precision_score_function(real, pred):
  precision = precision_score(real, pred, average='macro')
  return precision

def recall_score_function(real, pred):
  recall = recall_score(real, pred, average='macro')
  return recall

model = Network().to(device)

WEIGHT_DECAY = 1e-2
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, betas=(0.9, 0.999), weight_decay=WEIGHT_DECAY)
criterion = nn.CrossEntropyLoss(weight=weight_list_all_tensor)
scaler = torch.cuda.amp.GradScaler() 

In [None]:
best=0
best_f1 = 0
for epoch in range(epochs):
    start=time.time()
    train_loss = 0
    train_pred=[]
    train_y=[]
    model.train()
    for batch in (train_loader):
        optimizer.zero_grad()
        x = torch.tensor(batch[0], dtype=torch.float32, device=device)
        y = torch.tensor(batch[1], dtype=torch.long, device=device)
        with torch.cuda.amp.autocast():
            pred = model(x)
        loss = criterion(pred.float(), y)


        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        train_loss += loss.item()/len(train_loader)
        train_pred += pred.argmax(1).detach().cpu().numpy().tolist()
        train_y += y.detach().cpu().numpy().tolist()
    
    # train_precision = precision_score_function(train_y, train_pred)
    # train_recall = recall_score_function(train_y, train_pred)
    # train_f1 = f1_score_function(train_y, train_pred)

    # if train_f1 > best_f1:
    #   best_f1 = train_f1
    #   best_f1_model = deepcopy(model.state_dict())

    valid_pred = []
    valid_y = []
    model.eval()
    with torch.no_grad():
      for batch in (valid_loader):
        x = torch.tensor(batch[0], dtype=torch.float32, device=device)
        y = torch.tensor(batch[1], dtype=torch.long, device=device)
        with torch.cuda.amp.autocast():
            pred = model(x)
        valid_pred += pred.argmax(1).detach().cpu().numpy().tolist()
        valid_y += y.detach().cpu().numpy().tolist()

    valid_precision = precision_score_function(valid_y, valid_pred)
    valid_recall = recall_score_function(valid_y, valid_pred)
    valid_f1 = f1_score_function(valid_y, valid_pred)

    torch.save(model.state_dict(), "/content/drive/MyDrive/Colab Notebooks/data/bestmodel_state_dict.pt")

    TIME = time.time() - start
    print(f'epoch : {epoch+1}/{epochs}    time : {TIME:.0f}s/{TIME*(epochs-epoch-1):.0f}s')
    print(f'VALID    loss : {train_loss:.5f}   precision : {valid_precision:.5f}     recall : {valid_recall:.5f}    f1 : {valid_f1:.5f}')

epoch : 1/24    time : 68s/1553s
VALID    loss : 1.80026   precision : 0.31926     recall : 0.35260    f1 : 0.29814
epoch : 2/24    time : 68s/1485s
VALID    loss : 0.98550   precision : 0.55434     recall : 0.53522    f1 : 0.49507
epoch : 3/24    time : 68s/1419s
VALID    loss : 0.77617   precision : 0.62757     recall : 0.62602    f1 : 0.58982
epoch : 4/24    time : 68s/1359s
VALID    loss : 0.63626   precision : 0.69906     recall : 0.63485    f1 : 0.62065
epoch : 5/24    time : 68s/1301s
VALID    loss : 0.50559   precision : 0.72869     recall : 0.69860    f1 : 0.67800
epoch : 6/24    time : 68s/1232s
VALID    loss : 0.45117   precision : 0.81884     recall : 0.80870    f1 : 0.76692
epoch : 7/24    time : 69s/1165s
VALID    loss : 0.40406   precision : 0.85021     recall : 0.82425    f1 : 0.80000
epoch : 8/24    time : 69s/1097s
VALID    loss : 0.31698   precision : 0.88279     recall : 0.86301    f1 : 0.84658
epoch : 9/24    time : 69s/1031s
VALID    loss : 0.27985   precision : 0

In [None]:
model.load_state_dict(torch.load("/content/drive/MyDrive/Colab Notebooks/data/bestmodel_state_dict.pt"))

<All keys matched successfully>

In [None]:
model.eval()
f_pred = []

with torch.no_grad():
    for batch in (test_loader):
        x = torch.tensor(batch[0], dtype = torch.float32, device = device)
        with torch.cuda.amp.autocast():
            pred = model(x)
        f_pred.extend(pred.argmax(1).detach().cpu().numpy().tolist())

In [None]:
label_decoder = {val:key for key, val in label_unique.items()}
f_result = [label_decoder[result] for result in f_pred]

In [None]:
submission = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/data/sample_submission.csv")
submission["label"] = f_result
submission
submission.to_csv("/content/drive/MyDrive/Colab Notebooks/data/Cost function based approach_v5.csv", index = False)