In [19]:
# !pip install git+https://www.github.com/keras-team/keras-contrib.git

In [20]:
from keras.models import Sequential, Model
from keras.layers import Input, concatenate
from keras.layers import LeakyReLU, ReLU
from tensorflow.keras.optimizers import Adam

# Convolutional Neural Network (CNN)을 구성하는 레이어들 불러오기
from keras.layers import Conv2D, Conv2DTranspose, UpSampling2D, Dropout, Flatten, Dense

'''
Conv2D: 2D 컨볼루션 레이어입니다. 이미지 등의 2차원 데이터에서 특징을 추출하는데 사용됩니다.

Conv2DTranspose: 전치 합성곱 레이어로서, 역으로 컨볼루션 연산을 수행합니다. 이를 통해 입력을 더 큰 공간으로 확장할 수 있습니다.

UpSampling2D: 2D 업샘플링 레이어입니다. 입력 데이터의 크기를 확장하여 공간 해상도를 높일 수 있습니다.

Dropout: 신경망에서 과적합을 방지하기 위해 사용되는 레이어입니다. 특정 확률로 뉴런을 무작위로 비활성화합니다.

Flatten: 다차원 배열을 1차원 배열로 변환하는 레이어입니다. 주로 완전 연결 레이어에 입력으로 사용됩니다.

Dense: 완전 연결 레이어입니다. 모든 입력 뉴런이 출력 뉴런에 연결되어 있는 레이어입니다.

다운샘플링(Down-sampling) : Image 크기를 줄여가며 특징을 추출하는 과정이다.

업샘플링(Up-sampling) : 원래 Image 크기로 복원하는 과정이다.

'''
# Instance Normalization 레이어 불러오기 (일반적인 Batch Normalization과 다른 정규화 방법)
from keras_contrib.layers.normalization.instancenormalization import InstanceNormalization

# JSON 형식으로부터 모델을 로드하기 위한 함수 불러오기
from keras.models import model_from_json

# 진행 상황을 시각적으로 표시하기 위한 tqdm 라이브러리 불러오기
from tqdm import tqdm

# 파일 경로를 찾기 위한 glob 라이브러리 불러오기
from glob import glob

import cv2
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import os

In [21]:
'''
Make CycleGAN Model
'''
def cyclegan():
    # 입력 이미지의 형태
    img_shape = (128, 128, 3)
        
    ### 1. Discriminator 구성 (이미지가 실제인지 가짜인지 판별 역할)
    D_A = discriminator()  # discriminator A
    D_B = discriminator()  # discriminator B
    
    # Discriminator만 업데이트, Generator는 업데이트 X
    D_A.trainable = True
    D_B.trainable = True 
    
    D_A.summary()  # discriminator A의 구조 출력
    
    # Loss와 Optimizer 설정 학습률,베타 - 그래디언트 영향정도
    D_A.compile(loss='mse', optimizer=Adam(0.0002, 0.5))
    D_B.compile(loss='mse', optimizer=Adam(0.0002, 0.5))
  
    ### 2. Generator 파이프라인 구성
    G_A = generator()  # 스타일 A에서 B로 변환하는 generator
    G_B = generator()  # 스타일 B에서 A로 변환하는 generator
    
    # Generator 를 학습할 때는 discriminator  freeze
    D_A.trainable = False  
    D_B.trainable = False 
    
    # 입력 이미지 크기 정의
    real_A = Input(shape=img_shape)
    real_B = Input(shape=img_shape)
    
    # A에서 B로의 변환 (A사진에 B스타일 적용)
    fake_B = G_A(real_A)     # 스타일 A -> B 변환
    f_label_B = D_B(fake_B)  # discriminator B의 fake 이미지 분류 예측값
    cycle_A = G_B(fake_B)    # fake 이미지를 원래 형태로 복원시킨 이미지
    id_A = G_B(real_A)       # identity mapping : 입력 이미지를 그대로 출력하여 모델 안정성(이해x)
    
    # B에서 A로의 변환
    fake_A = G_B(real_B)
    f_label_A = D_A(fake_A)
    cycle_B = G_A(fake_A)
    id_B = G_A(real_B)
    
    # Generator 파이프라인 모델 정의
    gen_pipe = Model(inputs=[real_A, real_B], outputs=[f_label_A, f_label_B, cycle_A, cycle_B, id_A, id_B])
    # 손실함수 값이 작으면 목표 이미지 간의 차이를 작게 만들지만 문제 발생 
    # 1.과적합   2. 학습 불안정성   3. 품질 감소
    gen_pipe.compile(loss=['mse', 'mse', 'mae', 'mae', 'mae', 'mae'], 
                     loss_weights=[1, 1, 10, 10, 1, 1], 
                     optimizer=Adam(0.0002, 0.5))
    gen_pipe.summary()  # Generator 파이프라인의 구조 출력
    
    return D_A, D_B, G_A, G_B, gen_pipe


'''
Discriminator Structure
'''
def discriminator():
    # 입력 이미지를 받는 레이어 정의
    inp = Input(shape=(128, 128, 3))

    # Convolutional 레이어를 통해 이미지 처리
    conv1 = Conv2D(32, kernel_size=4, strides=2, padding='same')(inp)  # w/o instancenorm
    conv1 = LeakyReLU(alpha=0.2)(conv1)
    
    conv2 = Conv2D(64, kernel_size=4, strides=2, padding='same')(conv1)
    conv2 = LeakyReLU(alpha=0.2)(InstanceNormalization()(conv2))  # Instance Normalization 적용
    
    conv3 = Conv2D(128, kernel_size=4, strides=2, padding='same')(conv2)
    conv3 = LeakyReLU(alpha=0.2)(InstanceNormalization()(conv3))

    # Discriminator의 출력 레이어 정의
    outp = Conv2D(1, kernel_size=4, padding='same')(conv3)  # label 추출 -> pieces로 분석

    # Model 객체 생성
    d = Model(inp, outp)
    return d


'''
Generator Structure
'''
# 생성자 함수 : 이미지의 크기를 키워가면서 디테일을 추가하여 이미지를 생성
# Instance Normalization : 변환된 이미지의 통계적인 특성을 안정화 (Image Style Transfer 성능 올리기 위함)
def generator():    
    inp = Input(shape=(128, 128, 3))

    # Downsample 함수 정의 (Discriminator구조와 일치)
    def downsample(layer, filters, kernels):
        x = Conv2D(filters, kernel_size=kernels, strides=2, padding='same')(layer)
        x = InstanceNormalization()(x)
        x = LeakyReLU(alpha=0.2)(x)
        return x
    
    # Upsample 함수 정의
    # Usampling 단계이기 때문에 strides 는 필요X > Usampling2D(size = ) : 이미지 확대 작업
    def upsample(layer, connect, filters, kernels): 
        x = Conv2D(filters, kernel_size=kernels, padding='same')(UpSampling2D(size=2)(layer))
        x = InstanceNormalization()(x)
        x = ReLU()(x)
        return concatenate([x, connect])

    # 다양한 레이어를 통해 이미지 처리
    conv1 = downsample(inp, 32, 4)
    conv2 = downsample(conv1, 64, 4)
    conv3 = downsample(conv2, 128, 4)
    conv4 = downsample(conv3, 32*8, 4)
    conv5 = upsample(conv4, conv3, 128, 4)
    conv6 = upsample(conv5, conv2, 64, 4)
    conv7 = upsample(conv6, conv1, 32, 4)
    
    # Generator의 출력 레이어 정의
    outp = Conv2D(3, kernel_size=4, padding='same', activation='tanh')(UpSampling2D(size=2)(conv7))  # RGB 채널로 복원

    # Model 객체 생성
    g = Model(inp, outp)
    return g


'''
Train Model
'''
# 스타일 변환의 경우, 개별 이미지에 대한 변환을 수행하므로 batch_size = 1 로 설정. 미니배치X
def main(styles, photos, epoch, batch_size=1): 
    """
    CycleGAN을 활용한 스타일 변환을 수행하는 함수.

    Parameters:
        styles (numpy.ndarray): 스타일 이미지 데이터셋.
        photos (numpy.ndarray): 훈련용 사진 이미지 데이터셋.
        epoch (int): 전체 훈련 반복 횟수.
        batch_size (int, optional): 한 번에 처리할 이미지의 개수. 기본값은 1.

    Returns:
        G_B (keras.Model): 스타일 변환된 이미지를 생성하는 Generator 모델.
        losses_D_A (list): Discriminator A의 손실 값 리스트.
        losses_D_B (list): Discriminator B의 손실 값 리스트.
        losses_G (list): Generator의 손실 값 리스트.
    """

    # 전체 데이터셋 중, 최소 크기를 기준으로 batch_size로 나눈 값 계산 (ex.사진 10개/sytle 100개 >10번 학습)
    end = int(min(styles.shape[0], photos.shape[0]) / batch_size)

    # Discriminator와 Generator 모델 초기화
    D_A, D_B, G_A, G_B, gen_pipe = cyclegan()

    # Discriminator의 실제와 가짜 이미지 레이블 초기화
    real_label = np.ones((batch_size, 16, 16, 1))
    fake_label = np.zeros((batch_size, 16, 16, 1))

    # 손실 값을 저장할 리스트 초기화
    losses_D_A = []
    losses_D_B = []
    losses_G = []

    for j in range(epoch):
        for i in range(end):
            # 무작위로 스타일 이미지와 훈련용 사진 이미지 선택
            style = styles[np.random.randint(0, end, size=batch_size)]
            photo = photos[np.random.randint(0, end, size=batch_size)]
            
            # Generator에서 fake_A, fake_B 생성
            stop_fake = G_A.predict(style)
            ptos_fake = G_B.predict(photo)

            # Discriminator 학습: fake는 0, real은 1값이 나오도록
            # train_on_batch : 단일 배치에 대해 모델을 학습 (데이터 하나씩 모델 가중치 업데이트)
            
            # 실제 데이터에 대한 훈련 > 예측 최적화
            loss_D_A_r = D_A.train_on_batch(style, real_label)
            # 가짜 데이터에 대한 훈련 > 예측 최적화
            loss_D_A_f = D_A.train_on_batch(ptos_fake, fake_label)
            # 손실 계산
            loss_D_A = (loss_D_A_r + loss_D_A_f) / 2

            loss_D_B_r = D_B.train_on_batch(photo, real_label)
            loss_D_B_f = D_B.train_on_batch(stop_fake, fake_label)
            loss_D_B = (loss_D_B_r + loss_D_B_f) / 2
            
            # 손실 값을 리스트에 추가
            losses_D_A.append(loss_D_A)
            losses_D_B.append(loss_D_B)
            
            # Generator 학습: pipeline에 들어갔을 때 discriminator들이 fake에 대해 1을 출력하도록
            loss_G = gen_pipe.train_on_batch([style, photo], [real_label, real_label, style, photo, style, photo])
            losses_G.append(loss_G)
            
            # 이미지 출력 및 저장  # 역-scale [-1,1] > [0,255]
            if i % 150 == 0:
                print("epoch : {}, iteration: {}, D Loss : {:.3f}, G Loss(photo->style) : {:.3f}".format(j, i, (loss_D_A + loss_D_B) / 2, loss_G[0]))
                fig = plt.figure()
                ax = plt.subplot(1, 3, 1)
                ax.set_title("Original")
                ax.imshow(((photo.reshape(128, 128, 3) + 1) * 127.5).astype(np.uint8))  # scale back

                ax = plt.subplot(1, 3, 2)
                ax.set_title("Transferred")
                ax.imshow(((ptos_fake.reshape(128, 128, 3) + 1) * 127.5).astype(np.uint8))
                
                ax = plt.subplot(1, 3, 3)
                ax.set_title("Cycle")
                ax.imshow(((G_A.predict(ptos_fake).reshape(128, 128, 3) + 1) * 127.5).astype(np.uint8))
                
                fig.savefig("trainprocess/epoch{}batch{}".format(j, i))
                plt.close()
                
    return G_B, losses_D_A, losses_D_B, losses_G


In [22]:
def getimage( t, batch=1): #dataset_name,
    A = glob('./datasets/trainA/*') # 해당 경로파일 목록 반환
    B = glob('./datasets/trainB/*')
    
    if t=='style':
        where = A
    else:
        where = B
        
    images = []
    for _ in tqdm(where):
        image = cv2.imread(_)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) #bgr -> rgb
        image = cv2.resize(image, dsize=(128, 128))
        images.append(image)
     #scale to -1~1 : 훈련과정에서 안정성과 효율성, 역전파 최적화
    return np.array(images)/127.5-1

In [None]:
styles = getimage(batch = 1, t = 'style')
photos = getimage(batch = 1, t = 'photo')

 40%|███████████████████████████████▌                                                | 249/630 [00:39<00:56,  6.73it/s]

In [None]:
with tf.device('/device:GPU:1'):
    transfer_generator, losses_D_A, losses_D_B, losses_G = main(styles, photos, epoch=200, batch_size=1)

In [None]:
G_loss=[i[0] for i in np.array(losses_G)]

In [None]:
plt.figure(figsize=(17,5))
plt.plot(np.add(losses_D_A,losses_D_B)/2,label="Discriminator")
plt.plot(G_loss,label="Generator")
plt.legend()
plt.title('Discriminator loss and Generator loss')
plt.show()

In [None]:
G_json = transfer_generator.to_json()
with open("generator_git.json","w") as file:
    file.write(G_json)
transfer_generator.save_weights("generator_git.h5")

## Get Model Weights

In [None]:
file = open('generator_git.json','r')
g_model = file.read()
file.close()
generator = model_from_json(g_model,custom_objects={'InstanceNormalization':InstanceNormalization
})
generator.load_weights("generator_git.h5")

In [None]:
#check test data
test = glob('./datasets/testB/*')
for _ in test[0:5]:
    image = cv2.imread(_)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) #bgr->rgb
    image = cv2.resize(image, dsize=(128,128))
    image = image/127.5-1
    image = image.reshape(1,128,128,3)
    styletransferred = generator.predict(image)
    fig, ax = plt.subplots(1,2)
    ax[0].imshow(((image.reshape(128,128,3)+1)*127.5).astype(np.uint8))
    ax[1].imshow(((styletransferred.reshape(128,128,3)+1)*127.5).astype(np.uint8))
    plt.title('transferred')
    plt.show()