In [1]:
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
%matplotlib inline
import random
import time
from IPython.display import clear_output
import os
import glob

In [2]:
# cycle_gan_drawing = np.load('C:\\Users\\sw991\\project\\230501_GAN\\image_data\\cycle_gan_drawing.npy')
cycle_gan_img = np.load('image_data/cycle_gan_img.npy')

# c_gan_1_drawing = np.load('C:\\Users\\sw991\\project\\230501_GAN\\image_data\\c_gan_1_drawing.npy')
c_gan_1_img = np.load('image_data/c_gan_1_img.npy')

# c_gan_3_drawing = np.load('C:\\Users\\sw991\\project\\230501_GAN\\image_data\\c_gan_3_drawing.npy')
c_gan_3_img = np.load('image_data/c_gan_3_img.npy')

In [3]:
all_digit = np.vstack((cycle_gan_img, c_gan_1_img))
all_digit = np.vstack((all_digit, c_gan_3_img))
del cycle_gan_img
del c_gan_1_img
del c_gan_3_img
print(all_digit.shape)

(900, 256, 256, 3)


In [4]:
def change_dimension(data_set, img_size):
    
    for i in range(data_set.shape[0]):
        img = data_set[i]
        img = cv.resize(img, dsize=(img_size,img_size))
        img = np.expand_dims(img, axis = 0)
        if i == 0:
            new_data_set = img
        else:
            new_data_set = np.vstack((new_data_set, img))
            
    return new_data_set

In [5]:
new_digit = change_dimension(all_digit, 32)
del all_digit
print(new_digit.shape)

(900, 32, 32, 3)


In [6]:
label_0 = np.expand_dims(np.zeros((300,)), axis = -1)
label_1 = np.expand_dims(np.ones((300,)), axis = -1)
label_3 = np.expand_dims(np.ones((300,)) * 3, axis = -1)

all_label = np.vstack((label_0, label_1))
all_label = np.vstack((all_label, label_3))
print(all_label.shape)

(900, 1)


In [7]:
all_data = [new_digit, all_label]

In [8]:
import tensorflow as tf
from tensorflow.keras import layers
from diffaug import DiffAugment
from tensorflow import keras

In [9]:
def pixel_upsample(x, H, W):
    # Data Input할 때 확인 필수
    B, N, C = x.shape
    assert N == H*W
    x = tf.reshape(x, (-1, H, W, C))
    # 굳이 depth_to_space 함수를 써야하는 이유는?
    #합성곱 연산 사이의 활성화에서 데이터를 그대로 유지한 채로 크기를 변경시키는 때 유용
    #예로, 풀링 대신 사용
    x = tf.nn.depth_to_space(x, 2, data_format='NHWC')
    B, H, W, C = x.shape
    # Upsampling 후 재반환
    x = tf.reshape(x, (-1, H*W, C))
    return x, H, W

In [10]:
def normalize_2nd_moment(x, axis=1, eps=1e-8):
    return x *tf.math.rsqrt(tf.reduce_mean(tf.square(x),
                                          axis = axis,
                                           keepdims = True) + eps)

def scaled_dot_product(q, k, v):
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    #dk는 key의 차원을 의미, attention value를 softmax(QKt / r(dk))로 명시
    scaled_qk = tf.matmul(q, k, transpose_b =True) / tf.math.sqrt(dk)
    
    attn_weights = tf.nn.softmax(scaled_qk, axis = -1)
    
    # Padding Mask는 어디에 있는지 확인
    output = tf.matmul(attn_weights, v)
    return output

In [11]:
# 선형변환 > Attention 계산 > Head 결합
class MultiHeadAttention(tf.keras.layers.Layer):
    # model_dim이 어떻게 계산되서 들어가는지 확인 필요
    # heads * depth = model_dim 인데, 왜 3개 값이 필요한지...
    def __init__(self, model_dim, n_heads, initializer='glorot_uniform'):
        super().__init__()
        self.n_heads = n_heads
        self.model_dim = model_dim
        
        # Model_dimension과 Head 개수가 안맞으면 Fully 하게 결합 불가능
        assert model_dim % n_heads == 0
        self.depth = model_dim // n_heads
        
        # kernel initializer 에 대한 공부 필요
        # Dense로 q,k,v 에 대한 가중치 행렬을 구함
        self.wq = layers.Dense(model_dim, kernel_initializer=initializer)
        self.wk = layers.Dense(model_dim, kernel_initializer=initializer)
        self.wv = layers.Dense(model_dim, kernel_initializer=initializer)

        self.dense = layers.Dense(model_dim, kernel_initializer=initializer)

    def split_into_heads(self, x, batch_size):
        # axis 1 외 다른 axis 값 고정
        # self.depth는 model_dim을 heads 개수로 나눈 값
        x = tf.reshape(x, (batch_size, -1, self.n_heads, self.depth))
        # Data shape은 [batch_size, heads, ?, depth]
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    # Query, Key, Value
    def call(self, q, k, v):
        batch_size = tf.shape(q)[0]
        
        q = self.wq(q)  
        k = self.wk(k)  
        v = self.wv(v)  

        q = self.split_into_heads(q, batch_size)  
        k = self.split_into_heads(k, batch_size)  
        v = self.split_into_heads(v, batch_size)  

        scaled_attention = scaled_dot_product(q, k, v)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) 
        original_size_attention = tf.reshape(scaled_attention,
                                             (batch_size, -1, self.model_dim)) 

        output = self.dense(original_size_attention) 
        return output

In [12]:
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, n_patches, model_dim, initializer='glorot_uniform'):
        super().__init__()
        self.n_patches = n_patches
        self.position_embedding = layers.Embedding(
            input_dim=n_patches, 
            output_dim=model_dim,
            embeddings_initializer=initializer
        )

    def call(self, patches):
        positions = tf.range(start=0, limit=self.n_patches, delta=1)
        return patches + self.position_embedding(positions)

In [13]:
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self,
                 model_dim,
                 n_heads=2,
                 mlp_dim=512, 
                 rate=0.0,
                 eps=1e-6,
                 initializer='glorot_uniform'):
        
        super().__init__()
        
        self.attn = MultiHeadAttention(model_dim, n_heads, initializer=initializer)
        self.mlp = tf.keras.Sequential([
            layers.Dense(mlp_dim,
                         activation='gelu', 
                         kernel_initializer=initializer
                        ), 
            layers.Dense(model_dim,
                         kernel_initializer=initializer
                        ),
        ])
        self.norm1 = layers.LayerNormalization(epsilon=eps)
        self.norm2 = layers.LayerNormalization(epsilon=eps)
        self.drop1 = layers.Dropout(rate)
        self.drop2 = layers.Dropout(rate)

    def call(self, inputs, training):
        #Multihead Attention 하기 전 Normalizaion
        x_norm1 = self.norm1(inputs)
        attn_output = self.attn(x_norm1, x_norm1, x_norm1)
        attn_output = inputs + self.drop1(attn_output, training=training) 
        #wise-forward feed back layer 전 Normalization
        x_norm2 = self.norm2(attn_output)
        mlp_output = self.mlp(x_norm2)
        
        return attn_output + self.drop2(mlp_output, training=training)
        

In [14]:
class Generator(tf.keras.models.Model):
    def __init__(self, model_dim=1024, 
                 noise_dim=256, 
                 depth=[5, 4, 2], 
                 heads=[4, 4, 4], 
                 mlp_dim=[4096, 1024, 256],
                 initializer='glorot_uniform'):
        super().__init__()
        self.init = tf.keras.Sequential([
            layers.Dense(8 * 8 * model_dim,
                         use_bias=False, 
                         kernel_initializer=initializer),
            layers.Reshape((8 * 8, model_dim))
        ])     
        
        self.pos_emb_8 = PositionalEmbedding(64,
                                             model_dim,
                                             initializer=initializer)
        self.block_8 = tf.keras.Sequential()
        for _ in range(depth[0]):
            self.block_8.add(TransformerBlock(model_dim,
                                              heads[0],
                                              mlp_dim[0], 
                                              initializer=initializer))
         
        self.pos_emb_16 = PositionalEmbedding(256,
                                              model_dim // 4,
                                              initializer=initializer)
        self.block_16 = tf.keras.Sequential()
        for _ in range(depth[1]):
            self.block_16.add(TransformerBlock(model_dim // 4,
                                               heads[1],
                                               mlp_dim[1], 
                                               initializer=initializer))
            
        self.pos_emb_32 = PositionalEmbedding(1024,
                                              model_dim // 16,
                                              initializer=initializer)
        self.block_32 = tf.keras.Sequential()
        for _ in range(depth[2]):
            self.block_32.add(TransformerBlock(model_dim // 16, 
                                               heads[2], 
                                               mlp_dim[2], 
                                               initializer=initializer))

        self.ch_conv = layers.Conv2D(3, 
                                     kernel_size = 3,
                                     strides = 1,
                                     padding='same', 
                                     kernel_initializer=initializer)
                       
    def call(self, z):
        B = z.shape[0]
        x = normalize_2nd_moment(z)
   
        x = self.init(x)
        x = self.pos_emb_8(x)
        x = self.block_8(x)
        x, H, W = pixel_upsample(x, 8, 8)
        
        x = self.pos_emb_16(x)
        x = self.block_16(x)
        x, H, W = pixel_upsample(x, H, W)
        
        x = self.pos_emb_32(x)
        x = self.block_32(x)

        x = tf.reshape(x, [B, H, W, -1])
        #x.shape = (32,32,32,64)
        return self.ch_conv(x)



In [15]:
class Discriminator(tf.keras.models.Model):
    def __init__(self,
                 model_dim=[192, 192],
                 depth=[3, 3],
                 patch_size=2, 
                 heads=[4, 4, 4],
                 mlp_dim=[768, 1536, 1536],
                 img_size=32, 
                 policy='color,translation,cutout',
                 initializer='glorot_uniform'):
        super().__init__()
        '''Encode image'''
        patches_32 = (img_size // patch_size)**2
        self.patch_32 = tf.keras.Sequential([
            layers.Conv2D(model_dim[0],
                          kernel_size=patch_size, 
                          strides=patch_size,
                          padding='same',
                          kernel_initializer=initializer)
        ])
        self.pos_emb_32 = PositionalEmbedding(n_patches=patches_32, 
                                              model_dim=model_dim[0],
                                              initializer=initializer)
        self.block_32 = tf.keras.Sequential()
        for _ in range(depth[0]):
            self.block_32.add(TransformerBlock(model_dim[0],
                                               heads[0],
                                               mlp_dim[0], 
                                               initializer=initializer))

        patches_16 = ((img_size//2) // patch_size)**2
        self.patch_16 = tf.keras.Sequential([
            layers.Conv2D(model_dim[1],
                          kernel_size=patch_size*2, 
                          strides=patch_size*2,
                          padding='same',
                          kernel_initializer=initializer)
        ])
        self.pos_emb_16 = PositionalEmbedding(n_patches=patches_16, 
                                              model_dim=model_dim[0] + model_dim[1])
        self.block_16 = tf.keras.Sequential()
        for _ in range(depth[1]):
            self.block_16.add(TransformerBlock(model_dim[0] + model_dim[1], 
                                               heads[1], mlp_dim[1], initializer=initializer))
        '''Last block'''
        self.last_block=TransformerBlock(model_dim[0] + model_dim[1],
                                         heads[2],
                                         mlp_dim[2], 
                                         initializer=initializer)
        self.norm = layers.LayerNormalization(epsilon=1e-6)
        '''Encode cls_token'''        
        self.cls_dim = model_dim[0] + model_dim[1]
        self.cls_token = self.add_weight(name='cls_token',
                                         shape=(1, self.cls_dim),
                                         initializer=initializer,
                                         trainable=True)
        '''Logits'''
        self.logits = layers.Dense(1, kernel_initializer=initializer)
        self.policy = policy

    def call(self, img):
        img = DiffAugment(img, self.policy)
        x1 = self.patch_32(img)
        B, H, W, C = x1.shape
        x1 = tf.reshape(x1, [B, H * W, C]) 
        x1 = self.pos_emb_32(x1)
        x1 = self.block_32(x1)
        x1 = tf.reshape(x1, [B, H, W, -1]) 
        x1 = tf.nn.avg_pool2d(x1, [1,2,2,1], [1,2,2,1], 'SAME') 
        
        x2 = self.patch_16(img)
        B, H, W, C = x2.shape
        x2 = tf.reshape(x2, [B, H * W, C]) 
        x1 = tf.reshape(x1, [B, H * W, -1]) 
        x = tf.concat([x1, x2], -1)
        x = self.pos_emb_16(x)
        x = self.block_16(x)

        cls_tokens = tf.broadcast_to(self.cls_token, [B, 1, self.cls_dim])
        x = tf.concat([cls_tokens, x], 1)
        x = self.last_block(x)
        x = self.norm(x)
        return self.logits(x[:, 0])

In [16]:
generator = Generator()
discriminator = Discriminator()

Metal device set to: Apple M1


2023-05-25 16:06:19.310390: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-05-25 16:06:19.310489: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [17]:
class ConditionalGAN(keras.Model):
    def __init__(self, discriminator, generator, z_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.z_dim = z_dim
        self.gen_loss_tracker = keras.metrics.Mean()
        self.disc_loss_tracker = keras.metrics.Mean()
        
    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker]
    
    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super().compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        
    # Data 넣어줬을 때 Shape[0]이 곧 Batch_size로 써도 가능
    @tf.function
    def train_step(self, data, batch_size ):
        raw_real_imgs = data[0]
        raw_labels = data[1]
        # data 항목에 One-hot encoding 진행하여 Input
        # 소수점 등 집어넣기 위해 일단 OH encoding 미실시
        
        # idx = np.random.randint(0,raw_real_imgs.shape[0],batch_size)
        
        idx = list(np.random.randint(0,raw_real_imgs.shape[0],batch_size))
        
        # real_imgs = raw_real_imgs[idx]
        real_imgs = tf.gather(raw_real_imgs, indices = idx)
        real_imgs = tf.cast(real_imgs, tf.float32)
        
        # labels = raw_labels[idx]
        labels = tf.gather(raw_labels, indices = idx)
        labels = tf.cast(labels, tf.float32)
                
        # label > 256*256*1 size Layer로 만들어 concat 진행
        img_labels = tf.repeat(labels, repeats = [image_size * image_size])
        img_labels = tf.reshape(img_labels, shape=(-1, image_size,
                                                  image_size, 1))
        img_labels = tf.cast(img_labels, tf.float32)
        
        # discriminator 훈련 진행
        z_dim_vector = tf.random.normal(shape = (batch_size, self.z_dim))
        random_vector_labels = tf.concat([z_dim_vector, labels], axis = 1)
        #random_vector_labels의 shape이 (Batch_size, channel)이고, Generator Input과 같아야함
    
        generated_imgs = self.generator(random_vector_labels)

        fake_imgs_and_labels = tf.concat([generated_imgs, img_labels], axis = -1)
        real_imgs_and_labels = tf.concat([real_imgs, img_labels], axis = -1)
        
        combined_imgs = tf.concat([fake_imgs_and_labels, real_imgs_and_labels], 
                                 axis = 0)
        
        combined_labels = tf.concat([tf.ones((batch_size,1)), tf.zeros((batch_size,1))],
                          axis = 0)
        
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_imgs)
            d_loss = self.loss_fn(combined_labels, predictions)
            
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(zip(grads, 
                                             self.discriminator.trainable_weights))
        
        
        self.discriminator.trainable = False
        # generator 훈련 할 때 Discriminator를 훈련하지 않는 것인지 확인 필요
        z_dim_vector = tf.random.normal(shape = (batch_size, self.z_dim))
        random_vector_labels = tf.concat([z_dim_vector, labels], axis = 1)
        misleading_labels = tf.zeros((batch_size,1))
        
        with tf.GradientTape() as tape:
            fake_images = self.generator(random_vector_labels)
            fake_image_and_labels = tf.concat([fake_images, img_labels], axis = -1)
            predictions = self.discriminator(fake_image_and_labels)
            g_loss = self.loss_fn(misleading_labels, predictions)
            
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        
        self.discriminator.trainable = True

        # Monitor loss.
        self.gen_loss_tracker.update_state(g_loss)
        self.disc_loss_tracker.update_state(d_loss)
        return {
            "g_loss": self.gen_loss_tracker.result(),
            "d_loss": self.disc_loss_tracker.result(),
        }
        
        
        

In [18]:
z_dim = 256
image_size = 32

In [19]:
cond_gan = ConditionalGAN(discriminator=discriminator, 
                          generator=generator, 
                          z_dim=z_dim)
cond_gan.compile(d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
                 g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
                 loss_fn=keras.losses.BinaryCrossentropy(from_logits=True))

In [20]:
def generate_images(model, epoch, z_dim):
    test_labels = np.array([[0],[1],[2],[3]])
    z_dim_vector = tf.random.normal(shape = (4, z_dim))
    random_vector_labels = tf.concat([z_dim_vector, test_labels], axis = 1)

    prediction = model(random_vector_labels)
    plt.figure(figsize=(24, 12))
    title = ['0', '1', '2', '3']

    for i in range(test_labels.shape[0]):
        
        plt.subplot(1, test_labels.shape[0], i+1)
        plt.title(title[i])
        # getting the pixel values between [0, 1] to plot it.
        plt.imshow(prediction[i] * 0.5 + 0.5)
        plt.axis('off')
    plt.savefig('./image/Conditional_GAN_Image %d.png' %(epoch+1))
    # Plt.show() 전에 저장 필요
    plt.show()


In [21]:
epochs = 1000000
batch_size =32

In [22]:
for epoch in range(epochs):
    iterations = int(all_data[0].shape[0] / batch_size)
    start = time.time()

    for iteration in range(iterations):
        cond_gan.train_step(all_data, batch_size)
        
    if (epoch + 1) % 50 == 0:
        generate_images(generator, epoch, z_dim)
        cond_gan.save_weights('./checkpoints/my_checkpoint')
        
    if (epoch + 1) % 250 == 0:
        clear_output(wait=True)
        
    print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1,
                                                      time.time()-start))

2023-05-25 16:06:29.677559: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
2023-05-25 16:06:29.683960: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:113] Plugin optimizer for device_type GPU is enabled.


Time taken for epoch 1 is 148.64892268180847 sec

Time taken for epoch 2 is 125.42287611961365 sec



KeyboardInterrupt: 