# Setting

In [1]:
from google.colab import drive
drive.mount('/content/drive')

import os
REF_PATH = '/content/drive/MyDrive/Github/10_도배하자유형분류'
os.chdir(REF_PATH)

Mounted at /content/drive


<br>

## Import

In [2]:
import gc
gc.collect()

25

In [3]:
import os,sys
import random
import joblib
import pandas as pd
import numpy as np
import glob
import cv2
import itertools

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
# import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from tqdm import tqdm, trange

import warnings
warnings.filterwarnings(action='ignore') 

from PIL import Image

<br>

## Hyperparameter Setting

In [4]:
MC_PATH = './mc/gan'
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


In [5]:
CFG = {
    'IMG_SIZE':224, #224,320,384
    'EPOCHS':128,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':16,
    'SEED':41,
}

<br>

## Fixed RandomSeed

In [6]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

<br></br>

# Data Load

<br>

## Data Pre-processing

In [7]:
all_img_list = glob.glob('./data/train/*/*')

In [8]:
df = pd.DataFrame(columns=['img_path', 'label'])
df['img_path'] = all_img_list
df['label'] = df['img_path'].apply(lambda x : x.split('/')[-2].replace('.png',''))

In [None]:
train_data, val_data, _, _ = train_test_split(df, df['label'], test_size=0.3, stratify=df['label'], random_state=CFG['SEED'])

<br>

## Label-Encoding

In [None]:
le = preprocessing.LabelEncoder()
train_data['label'] = le.fit_transform(train_data['label'])
val_data  ['label'] = le.transform(val_data['label'])

In [None]:
for i,label in enumerate(le.classes_):
    print(i,label)

0 가구수정
1 걸레받이수정
2 곰팡이
3 꼬임
4 녹오염
5 들뜸
6 면불량
7 몰딩수정
8 반점
9 석고수정
10 오염
11 오타공
12 울음
13 이음부불량
14 창틀,문틀수정
15 터짐
16 틈새과다
17 피스
18 훼손


In [None]:
# 한글을 le.inverse_transform() 할 경우에 안맞는 현상생김
# -> 손코딩 필요
def label_decoder(label):
    new_label =[
        '가구수정' if l==0 else
        '걸레받이수정' if l==1 else
        '곰팡이' if l==2 else
        '꼬임' if l==3 else
        '녹오염' if l==4 else
        '들뜸' if l==5 else
        '면불량' if l==6 else
        '몰딩수정' if l==7 else
        '반점' if l==8 else
        '석고수정' if l==9 else
        '오염' if l==10 else
        '오타공' if l==11 else
        '울음' if l==12 else
        '이음부불량' if l==13 else
        '창틀,문틀수정' if l==14 else
        '터짐' if l==15 else
        '틈새과다' if l==16 else
        '피스' if l==17 else
        '훼손' if l==18 else
        'NaN' for l in label
    ]
    return np.array(new_label)

<br></br>

# Image Generate by GAN

## Pre-trained Model

In [None]:
# import torch
# torch.hub.list('NVIDIA/DeepLearningExamples:torchhub')
# torch.hub.list('facebookresearch/pytorch_GAN_zoo')

In [None]:
# import torch
# model = torch.hub.load('facebookresearch/pytorch_GAN_zoo:hub', 'StyleGAN', pretrained=True)
# model = torch.hub.load('facebookresearch/pytorch_GAN_zoo:hub', 'DCGAN', pretrained=True)

<br>

## Make Model - Kaggle
- 참조 : [Kaggle](https://www.kaggle.com/code/jesucristo/gan-introduction)

In [None]:
from torch.autograd import Variable
import torchvision.utils as vutils

In [None]:
def conv_output_size(input_size, kernel_size, stride, padding):
    # 입력 이미지에 대한 Tensor 객체 생성
    input_tensor = torch.rand(1, 1, input_size, input_size)

    # conv2d 연산 수행하여 출력 크기 계산
    conv = torch.nn.Conv2d(1, 1, kernel_size=kernel_size, stride=stride, padding=padding)
    output_tensor = conv(input_tensor)
    _, _, output_size, _ = output_tensor.shape

    # 출력 크기 반환
    return output_size

In [None]:
## generator
# conv_output_size(64,4,1,0)
# conv_output_size(61,4,2,1)
# conv_output_size(30,4,2,1)
# conv_output_size(15,4,2,1)
# conv_output_size(7,4,2,1)

## discriminator
# conv_output_size(64,4,2,1)
# conv_output_size(32,4,2,1)
# conv_output_size(16,4,2,1)
# conv_output_size(8,4,2,1)
# conv_output_size(4,4,2,0)

In [None]:
# Defining the generator
class G(nn.Module):
    def __init__(self):
        # Used to inherit the torch.nn Module
        super(G, self).__init__()
        # Meta Module - consists of different layers of Modules
        self.main = nn.Sequential(
            nn.ConvTranspose2d(100, 512, 4, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 3, 4, stride=2, padding=1, bias=False),
            nn.Tanh()
        )
        
    def forward(self, input):
        output = self.main(input)
        return output

# Defining the discriminator
class D(nn.Module):
    def __init__(self):
        super(D, self).__init__()
        self.main = nn.Sequential(
            nn.Conv2d(3, 64, 4, stride=2, padding=1, bias=False),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(64, 128, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(128, 256, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(256, 512, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(512, 1, 4, stride=1, padding=0, bias=False),
            nn.Sigmoid()
        )
        
    def forward(self, input):
        output = self.main(input)
        # .view(-1) = Flattens the output into 1D instead of 2D
        return output.view(-1)

def weights_init(m):
    """
    Takes as input a neural network m that will initialize all its weights.
    """
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        m.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

In [None]:
# brainAI 이희원님 공유내용
# (참조) https://dacon.io/competitions/official/236082/codeshare/7891?page=1&dtype=recent
class GANDataset(Dataset):
    def __init__(self, img_path_list):
        self.img_path_list = img_path_list
        self.transforms = A.Compose([
            A.Resize(64,64),
            A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5), p=1),
            ToTensorV2(),
        ])
        
        self.features = []
        for img_path in tqdm(self.img_path_list,desc='Load Dataset'):
            # (1) raw image
            image = cv2.imread(img_path)
            image = self.transforms(image=image)['image']
            self.features.append(image)

    def __getitem__(self, index):
        return self.features[index]
        
    def __len__(self):
        return len(self.features)

In [None]:
import time

class ImageGenerator:
    def __init__(self):
        self.generator = G().apply(weights_init)
        self.discriminator = D().apply(weights_init)

    def get_generator(self):
        return self.generator

    def get_discriminator(self):
        return self.discriminator

    def train(
        self,
        epochs,dataloader,lr,device,metric_period,save_period,
        generator_save_path,image_save_path,
    ):
        self.generator = self.generator.to(device)
        self.discriminator = self.discriminator.to(device)

        criterion = nn.BCELoss().to(device)
        optimizerG = optim.Adam(self.generator.parameters()    , lr=lr, betas=(0.5, 0.999), weight_decay=5e-4)
        optimizerD = optim.Adam(self.discriminator.parameters(), lr=lr, betas=(0.5, 0.999), weight_decay=5e-4)
        
        period_start_time = time.time()
        
        for epoch in range(epochs):
            for i, real in enumerate(dataloader, 0):
                # 1st Step: Updating the weights of the neural network of the discriminator
                self.discriminator.zero_grad()

                # Training the discriminator with a real image of the dataset
                input = Variable(real).float()
                target = Variable(torch.ones(input.size()[0])).to(device)
                output = self.discriminator(input.to(device))
                errD_real = criterion(output, target)

                # Training the discriminator with a fake image generated by the generator
                noise = Variable(torch.randn(input.size()[0], 100, 1, 1)).to(device)
                fake = self.generator(noise.to(device))
                target = Variable(torch.zeros(input.size()[0])).to(device)
                output = self.discriminator(fake.detach())
                errD_fake = criterion(output, target).to(device)

                # Backpropagating the total error
                errD = errD_real + errD_fake
                errD.backward()
                optimizerD.step()

                # 2nd Step: Updating the weights of the neural network of the generator
                self.generator.zero_grad()
                target = Variable(torch.ones(input.size()[0])).to(device)
                output = self.discriminator(fake.to(device))
                errG = criterion(output, target)
                errG.backward()
                optimizerG.step()

                # # 3rd Step: Printing the losses and saving the real images and the generated images of the minibatch every 100 steps
                # epoch_str = str(epoch+1).zfill(len(str(epochs)))
                # i_str = str(i+1).zfill(len(str(len(dataloader))))
                # print('Epochs[{}/{}] Batch[{}/{}] Loss_D: {:.4f}; Loss_G: {:.4f}'.format(epoch_str, epochs, i_str, len(dataloader), errD.item(), errG.item()))
                # if (i+1)==len(dataloader):
                #     vutils.save_image(real, './out/gan_images/real_samples.png', normalize=True)
                #     fake = netG(noise)
                #     vutils.save_image(fake.data, './out/gan_images/fake_samples_epoch_{}.png'.format(epoch_str), normalize=True)

            if (epoch+1) % metric_period == 0:
                period_end_time = time.time()
                elapsed = period_end_time - period_start_time
                remaining = (epochs-epoch)/metric_period*elapsed
                
                # 3rd Step: Printing the losses and saving the real images and the generated images of the minibatch every 100 steps
                epoch_str = str(epoch+1).zfill(len(str(epochs)))
                progress = 'Epoch[{}/{}] Loss_D: {:.4f}, Loss_G: {:.4f}, elapsed: {:.1f}, remaining: {:.1f}'\
                    .format(epoch_str, epochs, errD.item(), errG.item(), elapsed, remaining)
                print(progress)
                period_start_time = time.time()

            if (epoch+1) % save_period == 0:
                vutils.save_image(real, image_save_path + 'real_samples_epoch_{}.png'.format(epoch_str), normalize=True)
                fake = self.generator(noise)
                vutils.save_image(fake.data, image_save_path + 'fake_samples_epoch_{}.png'.format(epoch_str), normalize=True)
          
        # 최종모델 저장
        torch.save(self.generator    .state_dict(), generator_save_path)

    def generate(self,n_generate,save_path):
        for i in trange(n_generate):
            i_str = str(i).zfill(len(str(n_generate)))
            noise = torch.randn(1, 100, 1, 1).to(device)
            fake = self.generator(noise)
            vutils.save_image(fake, save_path + 'fake_samples_{}.png'.format(i_str), normalize=True)

In [None]:
def mkdir(path):
    import os
    if not os.path.exists(path):
        print("folder created: {}".format(path))
        os.mkdir(path)

In [None]:
train_data.label.value_counts()

18    983
10    416
1     215
3     147
15    113
2     102
11     99
7      91
6      69
9      40
5      38
17     36
14     19
12     15
13     12
4      10
0       8
16      4
8       2
Name: label, dtype: int64

In [None]:
# # 기존파일 삭제
# import shutil
# shutil.rmtree(f'./out/gan_images')
# shutil.rmtree(f'./mc/gan')

In [None]:
# 학습 이미지 저장을 위한 디렉토리 생성
mkdir(f'./out')
mkdir(f'./out/gan_images')
mkdir(f'./out/gan_images/train')
mkdir(f'./out/gan_images/generate')

# 결과파일 저장을 위한 디렉토리 생성
mkdir(f'./mc/gan')

folder created: ./out/gan_images
folder created: ./out/gan_images/train
folder created: ./out/gan_images/generate
folder created: ./mc/gan


In [None]:
def main_iteration(label,epochs,metric_peroid,save_period):
    gc.collect()

    generator_save_path     = MC_PATH + f'/generator_{label}.pt'
    image_save_path         = f'./out/gan_images/train/{label}/'
    gen_image_save_path     = f'./out/gan_images/generate/{label}/'
    mkdir(image_save_path)
    mkdir(gen_image_save_path)

    sub_data = train_data[train_data.label==label]
    # Image.open(sub_data.img_path.values[0])
    train_dataset = GANDataset(img_path_list=sub_data.img_path.values)
    train_loader  = DataLoader(train_dataset, batch_size=32, num_workers=0, shuffle=True)

    # GAN의 목적은 Discriminator loss를 높이는 것과 Generator loss를 낮추는 것
    image_gen = ImageGenerator()

    # GAN 모델 학습
    image_gen.train(
        epochs=epochs,
        dataloader=train_loader,
        lr=5e-4,
        device=device,
        metric_period=metric_peroid,
        save_period=save_period,
        generator_save_path=generator_save_path,
        image_save_path=image_save_path,
    )

    # GAN 모델을 통해서 이미지생성
    image_gen.generate(
        n_generate=100,
        save_path=gen_image_save_path,
    )

    return image_gen

In [None]:
# label = 2
# image_gen = main_iteration(label=label,epochs=100,metric_peroid=500,save_period=1000)

In [None]:
# --------------------------------------------------------------------------------
# > (18/19) 17
# --------------------------------------------------------------------------------
#!!!! 17,18해야하는데 왜 18/19인지?

In [None]:
vc = train_data.label.value_counts()
label_list = vc[vc<vc.max()].index.tolist()
label_list = sorted(label_list)

# for iter,label in enumerate(label_list):
for iter,label in enumerate([17]):
    print()
    print('-'*80)
    print('> ({}/{}) {}'.format(iter+1,len(label_list),label))
    print('-'*80)
    main_iteration(label=label,epochs=5000,metric_peroid=500,save_period=1000)


--------------------------------------------------------------------------------
> (1/18) 17
--------------------------------------------------------------------------------


Load Dataset: 100%|██████████| 36/36 [00:28<00:00,  1.25it/s]


Epoch[0500/5000] Loss_D: 0.0516, Loss_G: 8.0418, elapsed: 32.8, remaining: 295.6
Epoch[1000/5000] Loss_D: 2.3743, Loss_G: 5.9121, elapsed: 26.3, remaining: 210.4
Epoch[1500/5000] Loss_D: 0.0145, Loss_G: 26.9942, elapsed: 28.2, remaining: 197.6
Epoch[2000/5000] Loss_D: 0.0238, Loss_G: 10.2667, elapsed: 27.7, remaining: 166.1
Epoch[2500/5000] Loss_D: 0.0017, Loss_G: 18.1709, elapsed: 28.5, remaining: 142.8
Epoch[3000/5000] Loss_D: 0.0106, Loss_G: 5.6145, elapsed: 27.4, remaining: 109.6
Epoch[3500/5000] Loss_D: 0.0142, Loss_G: 7.2781, elapsed: 28.5, remaining: 85.5
Epoch[4000/5000] Loss_D: 0.0217, Loss_G: 6.7062, elapsed: 28.1, remaining: 56.2
Epoch[4500/5000] Loss_D: 0.0475, Loss_G: 16.1553, elapsed: 27.4, remaining: 27.5
Epoch[5000/5000] Loss_D: 0.1012, Loss_G: 8.2037, elapsed: 28.1, remaining: 0.1


100%|██████████| 100/100 [00:01<00:00, 93.10it/s]


In [None]:
label = 2
# mkdir(f'./out/gan_images/generate/{label}')

final_generator = G().apply(weights_init)
final_generator.load_state_dict(torch.load(MC_PATH + f'/generator_{label}.pt'))
final_generator = final_generator.to(device)

n_generate = 5
for i in trange(n_generate):
    i_str = str(i).zfill(len(str(n_generate)))
    noise = torch.randn(16, 100, 1, 1).to(device)
    fake = final_generator(noise)
    #vutils.save_image(fake, './out/gan_images/generate/{}/fake_samples_{}.png'.format(label,i_str), normalize=True)
    vutils.save_image(fake, './temp/fake_samples_{}.png'.format(i_str), normalize=True)

100%|██████████| 5/5 [00:00<00:00, 14.60it/s]


In [None]:
import glob
from PIL import Image

img_paths = glob.glob(f'./out/gan_images/generate/{label}/*')
img = Image.open(img_paths[1])
img

# img.resize((224,224))
# np.array(img).shape