# **Computer Vision Anomaly Detection**

### **Import Libraries**

In [None]:
import warnings
warnings.filterwarnings('ignore')

from glob import glob
import pandas as pd
import numpy as np 
from tqdm import tqdm
import cv2

import os
import timm
import random

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
import time


### **Label 전처리**

In [None]:


train_y = pd.read_csv("open/train_df.csv")

train_labels = train_y["label"]

label_unique = sorted(np.unique(train_labels))
label_unique = {key:value for key,value in zip(label_unique, range(len(label_unique)))}

train_labels = [label_unique[k] for k in train_labels]



### **Image 전처리**

In [None]:

train_png = sorted(glob('open/train/*.png'))
test_png = sorted(glob('open/test/*.png'))


def img_resize(path):
	img = cv2.imread(path)[:,:,::-1]
	img = cv2.resize(img, (512, 512))
	cv2.imwrite('data/' + path, img)


In [None]:

for img in tqdm(train_png):
	img_resize(img)

for img in tqdm(test_png):
	img_resize(img)


In [None]:


train_png = sorted(glob('data/open/train/*.png'))
test_png = sorted(glob('data/open/test/*.png'))


def img_load(path):
    img = cv2.imread(path)[:,:,::-1]
    return img


In [None]:

train_imgs = [img_load(m) for m in tqdm(train_png)]
test_imgs = [img_load(n) for n in tqdm(test_png)]


### **모델 정의 및 Data Augmentation**

In [None]:

class Custom_dataset(Dataset):
    def __init__(self, img_paths, labels, mode='train'):
        self.img_paths = img_paths
        self.labels = labels
        self.mode=mode
    def __len__(self):
        return len(self.img_paths)
    def __getitem__(self, idx):
        img = self.img_paths[idx]
        if self.mode=='train':
            augmentation = random.randint(0,8)
            # RGB
            if augmentation<3:
                pass

            # Rotate 90'
            elif augmentation==3:
                img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)

            # Rotate 270'
            elif augmentation==4:
                img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)

            # Y flip
            elif augmentation==5:
                img = img[::-1].copy()

            # X flip
            elif augmentation==6:
                img = img[:,::-1].copy()

            # X, Y flip
            elif augmentation==7:
                img = img[::-1, ::-1, :].copy()
                

        img = transforms.ToTensor()(img)
        if self.mode=='test':
            pass
        
        label = self.labels[idx]
        return img, label
    


In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.model = timm.create_model('efficientnet_b3', pretrained=True, num_classes=len(label_unique))
        
    def forward(self, x):
        x = self.model(x)
        return x


In [None]:

batch_size = 256

# Train
train_dataset = Custom_dataset(np.array(train_imgs), np.array(train_labels), mode='train')
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

# Test
test_dataset = Custom_dataset(np.array(test_imgs), np.array(["tmp"]*len(test_imgs)), mode='test')
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size)



In [None]:

device = Model.device('cuda')

model = Model().to(device)
model = nn.DataParallel(model)

optimizer = torch.optim.AdamW(model.parameters(), lr=1.0e-4)
criterion = nn.CrossEntropyLoss()
scaler = torch.cuda.amp.GradScaler() 


### **Score Function 정의**

In [None]:

def score_function(real, pred):
    score = f1_score(real, pred, average="macro")
    return score


### **Train Dataset 학습**

In [None]:


epochs = 100

for epoch in range(epochs):
    start=time.time()
    train_loss = 0
    train_pred=[]
    train_y=[]
    model.train()
    for batch in (train_loader):
        optimizer.zero_grad()
        x = torch.tensor(batch[0], dtype=torch.float32, device=device)
        y = torch.tensor(batch[1], dtype=torch.long, device=device)
        with torch.cuda.amp.autocast():
            pred = model(x)
        loss = criterion(pred, y)


        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        train_loss += loss.item()/len(train_loader)
        train_pred += pred.argmax(1).detach().cpu().numpy().tolist()
        train_y += y.detach().cpu().numpy().tolist()
        
    
    train_f1 = score_function(train_y, train_pred)

    TIME = time.time() - start
    print(f'epoch : {epoch+1}/{epochs}    time : {TIME:.0f}s/{TIME*(epochs-epoch-1):.0f}s')
    print(f'TRAIN    loss : {train_loss:.5f}    f1 : {train_f1:.5f}')


### **Test Dataset 평가**

In [None]:

model.eval()
f_pred = []

with torch.no_grad():
    for batch in (test_loader):
        x = torch.tensor(batch[0], dtype = torch.float32, device = device)
        with torch.cuda.amp.autocast():
            pred = model(x)
        f_pred.extend(pred.argmax(1).detach().cpu().numpy().tolist())


### **제출물 생성**

In [None]:


label_decoder = {val:key for key, val in label_unique.items()}

f_result = [label_decoder[result] for result in f_pred]

submission = pd.read_csv("open/sample_submission.csv")

submission["label"] = f_result

submission.to_csv("submission.csv", index = False)
