# 전체 학습 과정 

In [3]:
import torch.nn as nn 
import torch 
import torchvision 
import cv2 
import numpy as np 
import matplotlib.pyplot as plt 
from glob import glob 
import pandas as pd 
from PIL import Image
from tqdm import tqdm 
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
import os 
import yaml 
import json

# Configuration 

# 데이터셋 로드 
- 이미지 디렉토리와 라벨 로드 

In [4]:
cfg = {
'Dataset_dir': './Dataset/hazelnut',
'Decription': 'Default',
'Epochs': 50,
'aug_number': 2,
'batch_size': 32,
'device': 'cuda:0',
'encoded_space_dim': 256,
'img_size': 256,
'lr': 0.001,
'optimizer': 'adamw',
'save_dir': 'Contrast',
'seed': 42,
'target_class': 6,
'weight_decay': 1.0e-05,
}

In [5]:
class Datadir_init:
    def __init__(self,Dataset_dir='./Dataset/hazelnut'):
        self.Dataset_dir = Dataset_dir 
        
    def test_load(self):
        test_label_unique = pd.Series(sorted(glob(f'{self.Dataset_dir}/test/*'))).apply(lambda x : x.split('/')[-1]).values
        test_label_unique = {key:value for value,key in enumerate(test_label_unique)}
        self.test_label_unique = test_label_unique 

        test_dir = f'{self.Dataset_dir}/test/'
        label = list(test_label_unique.keys())[0]

        test_img_dirs = [] 
        test_img_labels = [] 
        for label in list(test_label_unique.keys()):
            img_dir = sorted(glob(test_dir +f'{label}/*'))
            img_label = np.full(len(img_dir),test_label_unique[label])
            test_img_dirs.extend(img_dir)
            test_img_labels.extend(img_label)
        return np.array(test_img_dirs),np.array(test_img_labels) 

    def train_load(self):
        train_img_dirs = sorted(glob(f'{self.Dataset_dir}/train/good/*.png'))
        return np.array(train_img_dirs) 

class MVtecADDataset(Dataset):
    def __init__(self,cfg,img_dirs,labels=None,Augmentation=None):
        super(MVtecADDataset,self).__init__()
        self.cfg = cfg 
        self.dirs = img_dirs 
        self.augmentation = self.__init_aug__(Augmentation)
        self.labels = self.__init_labels__(labels)

    def __len__(self):
        return len(self.dirs)

    def __init_labels__(self,labels):
        if np.sum(labels) !=None:
            return labels 
        else:
            return np.zeros(len(self.dirs))
    
    def __init_aug__(self,Augmentation):
        if Augmentation == None:
            augmentation = transforms.Compose([
                                                transforms.ToTensor(),
                                                transforms.Resize((self.cfg['img_size'],self.cfg['img_size']))
                                            ])
        else: 
            augmentation = Augmentation 
        return augmentation                                      

    def __getitem__(self,idx):
        img_dir = self.dirs[idx]
        img = Image.open(img_dir)
        img = self.augmentation(img)

        if np.sum(self.labels) !=None:
            return img,self.labels[idx] 
        else:
            return img



In [6]:
#이미지 디렉토리 및 라벨 로드 
Data_dir = Datadir_init()
train_dirs = Data_dir.train_load()
test_dirs,test_labels = Data_dir.test_load()

#augmentation 
augmentation = transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Resize((256,256))
    ])    

#Train-Valid split 
indx = int(len(train_dirs)*0.8)
train_dset = MVtecADDataset(cfg,train_dirs[:indx],Augmentation=augmentation)
valid_dset = MVtecADDataset(cfg,train_dirs[indx:])
test_dset = MVtecADDataset(cfg,test_dirs,test_labels)

#DataLoader 
train_loader = DataLoader(train_dset,batch_size=cfg['batch_size'],shuffle=True)
valid_loader = DataLoader(valid_dset,batch_size=cfg['batch_size'],shuffle=False)
test_loader = DataLoader(test_dset,batch_size=cfg['batch_size'],shuffle=False)

# 모델 생성 
- Convolution Autoencoder 사용 
- Encoder 와 Decoder를 각각 구성한 뒤 Autoencoder 구성 

In [7]:
class MVtecEncoder(nn.Module):
    def __init__(self,encoded_space_dim):
        super(MVtecEncoder,self).__init__()

        self.encoder_cnn = nn.Sequential(
                                        nn.Conv2d(in_channels=3,out_channels=8,kernel_size=3,stride=2,padding=1),
                                        nn.ReLU(),
                                        nn.Conv2d(in_channels=8,out_channels=16,kernel_size=3,stride=2,padding=1),
                                        nn.BatchNorm2d(16),
                                        nn.ReLU(),
                                        nn.Conv2d(in_channels=16,out_channels=32,kernel_size=3,stride=2,padding=1),
                                        nn.ReLU(),
                                        nn.Conv2d(in_channels=32,out_channels=64,kernel_size=3,stride=2,padding=1),
                                        nn.ReLU(),
                                        nn.Conv2d(in_channels=64,out_channels=128,kernel_size=3,stride=2,padding=1),
                                        nn.ReLU()
)

        self.flatten = nn.Flatten(start_dim=1)
        self.encoder_lin = nn.Sequential(
                                        nn.Linear(8*8*128,512),
                                        nn.ReLU(True),
                                        nn.Linear(512,encoded_space_dim)
                                        )
        

    def forward(self,x):
        x = self.encoder_cnn(x)
        x = self.flatten(x)
        x = self.encoder_lin(x)
        return x 

class MVtecDecoder(nn.Module):
    def __init__(self,encoded_space_dim):
        super(MVtecDecoder,self).__init__()       

        self.decoder_lin = nn.Sequential(
            nn.Linear(encoded_space_dim,512),
            nn.ReLU(True),
            nn.Linear(512,7*7*128),
            nn.ReLU(True)
        )
        self.unflatten = nn.Unflatten(dim=1,unflattened_size=(128,7,7))      

        self.decoder_cnn = nn.Sequential(
                            nn.ConvTranspose2d(128,64,3,stride=2,output_padding=1),
                            nn.BatchNorm2d(64),
                            nn.ReLU(True),
                            nn.ConvTranspose2d(64,32,3,stride=2,padding=1,output_padding=1),
                            nn.BatchNorm2d(32),
                            nn.ReLU(True),
                            nn.ConvTranspose2d(32,16,3,stride=2,padding=1,output_padding=1),
                            nn.BatchNorm2d(16),
                            nn.ReLU(True),
                            nn.ConvTranspose2d(16,8,3,stride=2,padding=1,output_padding=1),
                            nn.BatchNorm2d(8),
                            nn.ReLU(True),
                            nn.ConvTranspose2d(8,3,3,stride=2,padding=1,output_padding=1)
        )
    def forward(self,x):
        x = self.decoder_lin(x)
        x = self.unflatten(x)
        x = self.decoder_cnn(x)
        x = torch.sigmoid(x)
        return x     

class Convolution_Auto_Encoder(nn.Module):
    def __init__(self,Encoder,Decoder,encoded_space_dim ):
        super(Convolution_Auto_Encoder,self).__init__()
        self.encoder = Encoder(encoded_space_dim)
        self.decoder = Decoder(encoded_space_dim)

    def forward(self,x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x         

# 모델 학습 

## 학습 전 선언 

In [10]:
#Config 에서 설정한 space_dim(bottleneck size) 대로 오토인코더 생성 
model = Convolution_Auto_Encoder(MVtecEncoder,MVtecDecoder,cfg['encoded_space_dim']).to(cfg['device'])
criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(),lr=cfg['lr'],weight_decay=cfg['weight_decay'])
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,T_max=100,eta_min=0) #optimizer learning rate 조절 용 Scheduler 

## 학습 프로세스 

In [11]:
total_train_loss = [] 
total_valid_loss = [] 
best_valid_loss = np.inf 
for epoch in tqdm(range(cfg['Epochs'])):
    model.train() 
    optimizer.zero_grad()
    train_loss = [] 
#Train Process 
    for img,_ in train_loader:
        img = img.to(cfg['device']).type(torch.float32)
        y_pred = model(img).type(torch.float32)
        loss = criterion(img,y_pred)

        #Backpropagation 
        loss.backward()
        optimizer.step()
        train_loss.append(loss.detach().cpu().numpy())
    scheduler.step()        

#Valid process 
    model.eval()
    valid_loss = [] 
    with torch.no_grad():
        for img,_ in valid_loader:
            img = img.to(cfg['device'])
            y_pred = model(img)
            loss = criterion(img,y_pred)
            valid_loss.append(loss.detach().cpu().numpy())
    print(f'\t epoch : {epoch+1} valid loss : {np.mean(valid_loss):.3f}')
    #시각화 
    fig, (ax1,ax2) = plt.subplots(ncols=2,nrows=1,figsize=(5, 5))
    ax1.imshow(img[0].detach().cpu().permute(1,2,0).numpy())
    ax2.imshow(y_pred[0].detach().cpu().permute(1,2,0).numpy())
    plt.show()

  0%|          | 0/50 [00:00<?, ?it/s]


# 학습 후 Test

## Inference 

In [None]:
#오토인코더를 모두 사용하는 Reconstruction과 Encoder만을 사용하는 Machinelearning 모두 진행 
encoder = model.encoder 
Pred_imgs = [] 
Pred_vecs = [] 
True_imgs = [] 
True_labels = []
for img,label in test_loader:
    img,label = img.to(cfg['device']).type(torch.float32),label.to(cfg['device']).type(torch.float32)

    with torch.no_grad():
        Pred_img = model(img)
        Pred_vec = encoder(img)

    Pred_imgs.extend(Pred_img.detach().cpu().numpy())
    Pred_vecs.extend(Pred_vec.detach().cpu().numpy())
    True_imgs.extend(img.detach().cpu().numpy())
    True_labels.extend(label.detach().numpy())

#머신러닝의 OC-SVM을 학습시키기 위했선 Training 데이터도 필요함 
Train_vecs = [] 
Train_labels = [] 

for img,label in train_loader:
    img,label = img.to(cfg['device']).type(torch.float32),label.to(cfg['device']).type(torch.float32)

    with torch.no_grad():
        Train_vec = encoder(img)
    Train_vecs.extend(Train_vec.detach().cpu().numpy())
    Train_labels.extend(label.detach().cpu().numpy())

Train_vecs = np.array(Train_vecs)
Train_labels = np.array(Train_labels)

## Metrics 

### Reconstruction 

In [None]:
#Reconstruction 방법론을 이용한 경우 각 이미지의 Anomaly Score 계산 -> MSE를 사용 함 
from sklearn.metrics import roc_curve,auc 
test_score = np.mean(np.array(Pred_imgs-True_imgs).reshape(len(True_labels),-1)**2,axis=1)
fpr,tpr,threshold = roc_curve(True_labels,test_score,pos_label=0)
AUROC = round(auc(fpr,tpr),4)

### Machine learnig : OC-SVM

In [None]:
#Machine learning : OC-SVM 방법론을 이용한 경우 Train - normal 데이터로 학습한 뒤 Test 데이터 Inference 
#우선 normal 데이터의 스케일링 진행 
from sklearn.preprocessing import MinMaxScaler
minmax = MinMaxScaler()
train_normalized_vecs = minmax.fit_transform(Train_vecs)
test_normalized_vecs = minmax.transform(Pred_vecs)

#Test 라벨 정제 
'''
- Test 라벨의 경우 Anomaly가 세부 카테고리로 나뉘어 있음 
- 이를 Anomaly  하나로 통합해야 함 
- 기존의 Normal의 라벨은 2 이므로 나머지는 모두 1로 바꾼 뒤 2를 -1로 변환 함 
- True_labels == Test 데이터의 진짜 라벨 
'''
True_labels = np.where(True_labels==2,True_labels,1)  
True_labels = np.where(True_labels==1,True_labels,-1) # Anomaly:-1, normal:1

#모델 학습 
from sklearn.svm import OneClassSVM
model = OneClassSVM()
model.fit(train_normalized_vecs)
Pred_labels = model.predict(test_normalized_vecs)
preds = model.score_samples(test_normalized_vecs)

#AUROC 계산 
fpr,tpr,threshold = roc_curve(preds,True_labels,pos_label=1)
AUROC = round(auc(fpr, tpr),4)