# Import

In [None]:
nvidia-smi

In [None]:
!pip install transformers
!pip install --upgrade pip
!pip install transformers
!pip install torch torchvision
!pip install --upgrade tensorflow

In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
import time

from PIL import Image
from tensorflow.keras import layers
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import multi_gpu_model
from tqdm.notebook import tqdm

from transformers import TFElectraModel, ElectraTokenizer

#drive.mount('/gdrive')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


# parameter set

In [None]:
#base_path = '/gdrive/My Drive/project/text_to_image'
base_path = '/home/ec2-user/SageMaker'

tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-small-discriminator")

sequence_len = 80  # 바꿔야함
embedding_size = 256

stage1_generator_lr = 0.0002
stage1_discriminator_lr = 0.0002
stage1_epochs = 1000
stage1_batch_size = 32

stage2_generator_lr = 0.0002
stage2_discriminator_lr = 0.0002
stage2_epochs = 500
stage2_batch_size = 4

HBox(children=(FloatProgress(value=0.0, description='Downloading', max=279173.0, style=ProgressStyle(descripti…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=51.0, style=ProgressStyle(description_w…




# model build function

In [None]:
def embedding_reshape():
    # 1차원인 임베딩 벡터를 파라미터 조절해서 차원 맞추기
    # 얘도 가중치 공유해야해서 따로 만듦
    x = Sequential(
        layers=[
                layers.Dense(128),
                layers.Reshape((1, 1, 128)),
                layers.Lambda(lambda x: tf.tile(x, (1, 4, 4, 1)))
        ]
    )
    return x


def generate_c(x):
    # dot, add 연산을 수행하는 람다 레이어에 들어갈 함수
    mean = x[:, :128]
    log_sigma = x[:, 128:]
    stddev = tf.exp(log_sigma)
    epsilon = tf.random.normal(shape=tf.constant((mean.shape[1],), dtype=tf.int32))
    c = stddev * epsilon + mean
    return c


def get_up_sampling(filters):
    # 업샘플링 코드 반복 최소화를 위한 함수
    x = Sequential(
        layers=[
                layers.UpSampling2D(size=(2, 2)),
                layers.Conv2D(filters, kernel_size=3, padding="same", strides=1, use_bias=False),
                layers.BatchNormalization(),
                layers.ReLU()
        ]
    )
    return x


def get_down_sampling(filters):
    # 다운샘플링 코드 반복 최소화를 위한 함수
    x = Sequential(
        layers=[
                layers.Conv2D(filters, (4, 4), padding='same', strides=2, use_bias=False),
                layers.BatchNormalization(),
                layers.LeakyReLU(alpha=0.2)
        ]
    )
    return x


def build_stage1_generator():
    # 텍스트
    input_text = layers.Input(shape=(sequence_len,), dtype=tf.int32)
    embeded = Sequential(
        layers=[
                TFElectraModel.from_pretrained("monologg/koelectra-small-discriminator", from_pt=True).get_input_embeddings(),
                layers.Dense(4),
                layers.Flatten(),
                layers.Dense(embedding_size),
                layers.LeakyReLU(alpha=0.2)
        ],
        name='embedding'
    )(input_text)
    # embeded = layers.Dense(embedding_size)(input_text)
    # embeded = layers.LeakyReLU(alpha=0.2)(embeded)

    c = layers.Lambda(generate_c)(embeded)

    # 노이즈
    input_noise = layers.Input(shape=(100,))

    # 처리된 텍스트와 노이즈를 함쳐줌
    gen_input = layers.Concatenate(axis=1)([c, input_noise])
    
    # 모양 맞춰서 reshape
    x = layers.Dense(128 * 8 * 4 * 4, activation='relu', use_bias=False)(gen_input)
    x = layers.Reshape((4, 4, 128 * 8), input_shape=(128 * 8 * 4 * 4,))(x)

    # (batch_size, 64, 64, 64) 까지 업샘플링
    x = get_up_sampling(512)(x)
    x = get_up_sampling(256)(x)
    x = get_up_sampling(128)(x)
    x = get_up_sampling(64)(x)

    # 이미지에 맞게 (batch_size, 64, 64, 3) 으로 채널 수 맞춰주기
    x = layers.Conv2D(3, kernel_size=3, padding="same", strides=1, activation='tanh', use_bias=False)(x)

    stage1_gen = Model(inputs=[input_text, input_noise], outputs=[x, embeded], name='stage_1_generator')
    return stage1_gen


def build_stage1_discriminator(embedding_reshape_layer):
    # 만들어진 이미지
    generated_image = layers.Input(shape=(64, 64, 3))

    x = layers.Conv2D(64, kernel_size=4, padding='same', strides=2, use_bias=False)(generated_image)
    x = layers.LeakyReLU(alpha=0.2)(x)

    # (batch_size, 4, 4, 512) 까지 다운샘플링
    x = get_down_sampling(128)(x)
    x = get_down_sampling(256)(x)
    x = get_down_sampling(512)(x)

    # 임베딩된 텍스트. generator에서 출력함.
    input_text = layers.Input(shape=(embedding_size,))
    input_text_repeated = embedding_reshape_layer(input_text)
    

    # 붙여주고
    merged_input = layers.concatenate([x, input_text_repeated])


    # sigmoid 최종 처리
    x2 = layers.Conv2D(64 * 8, kernel_size=1, padding="same", strides=1)(merged_input)
    x2 = layers.BatchNormalization()(x2)
    x2 = layers.LeakyReLU(alpha=0.2)(x2)
    x2 = layers.Flatten()(x2)
    x2 = layers.Dense(1, activation='sigmoid')(x2)

    stage1_dis = Model(inputs=[generated_image, input_text], outputs=[x2], name='stage_1_discriminator')
    return stage1_dis

In [None]:
def get_residual_block():
    x = layers.Input((16, 16, 128 * 4))
    residual = layers.Conv2D(128 * 4, kernel_size=1, padding="same", strides=1, use_bias=False)(x)
    residual = layers.BatchNormalization()(x)
    residual = layers.Activation('relu')(residual)
    merged = layers.add([x, residual])

    return Model(x, merged)


def build_stage2_generator():
    # stage1 에서 만든 이미지
    generated_image = layers.Input(shape=(64, 64, 3))
    x = get_down_sampling(256)(generated_image)
    x = get_down_sampling(512)(x)

    # 텍스트
    input_text = layers.Input(shape=(embedding_size,))
    c = layers.Lambda(generate_c)(input_text)
    embedding = layers.Dense(16 * 16 * 128 * 8)(input_text)
    embedding = layers.LeakyReLU(alpha=0.2)(embedding)
    embedding = layers.Reshape((16, 16, 128 * 8))(embedding)

    # 붙이고 차원 맞추기
    merged_input = layers.concatenate([x, embedding])
    x = layers.Conv2D(128 * 4, kernel_size=3, padding="same", strides=1)(merged_input)

    # 잔차 연결. 조금 줄여도 될 듯?
    x = get_residual_block()(x)
    x = get_residual_block()(x)
    x = get_residual_block()(x)
    x = get_residual_block()(x)

    # 512까지 업샘플링
    x = get_up_sampling(512)(x)
    x = get_up_sampling(256)(x)
    x = get_up_sampling(128)(x)
    x = get_up_sampling(64)(x)
    x = get_up_sampling(32)(x)

    x = layers.Conv2D(3, kernel_size=3, padding="same", strides=1, activation='tanh', use_bias=False)(x)

    return Model(inputs=[generated_image, input_text], outputs=x, name='stage_2_generator')


def build_stage2_discriminator(embedding_reshape_layer):
    # 만들어진 이미지
    generated_image = layers.Input(shape=(512, 512, 3))

    x = layers.Conv2D(512, kernel_size=3, padding="same", strides=2)(generated_image)
    x = layers.LeakyReLU(alpha=0.2)(x)

    # (batch_size, 4, 4, 512) 까지 다운샘플링
    x = get_down_sampling(16)(x)
    x = get_down_sampling(32)(x)
    x = get_down_sampling(64)(x)
    x = get_down_sampling(128)(x)
    x = get_down_sampling(256)(x)
    x = get_down_sampling(512)(x)

    # 임베딩된 텍스트. generator에서 출력함.
    input_text = layers.Input(shape=(embedding_size,))
    input_text_repeated = embedding_reshape_layer(input_text)
    

    # 붙여주고
    merged_input = layers.concatenate([x, input_text_repeated])


    # sigmoid 최종 처리
    x2 = layers.Conv2D(512 * 8, kernel_size=1,
                padding="same", strides=1)(merged_input)
    x2 = layers.BatchNormalization()(x2)
    x2 = layers.LeakyReLU(alpha=0.2)(x2)
    x2 = layers.Flatten()(x2)
    x2 = layers.Dense(1, activation='sigmoid')(x2)

    return Model(inputs=[generated_image, input_text], outputs=[x2], name='stage_2_discriminator')

# build_adversarial_model

In [None]:
def build_stage_1_adversarial_model(gen_model, dis_model):
    input_layer = layers.Input(shape=(sequence_len,))
    input_layer2 = layers.Input(shape=(100,))

    x, mean_logsigma = gen_model([input_layer, input_layer2])

    dis_model.trainable = False
    valid = dis_model([x, mean_logsigma])

    model = Model(inputs=[input_layer, input_layer2], outputs=[valid, mean_logsigma])
    return model

In [None]:
def build_stage_2_adversarial_model(gen_model_1, gen_model_2, dis_model):
    input_layer = layers.Input(shape=(sequence_len,))
    input_layer2 = layers.Input(shape=(100,))

    gen_model_1.trainable = False
    x, mean_logsigma = gen_model_1([input_layer, input_layer2])
    x = gen_model_2([x, mean_logsigma])

    dis_model.trainable = False
    valid = dis_model([x, mean_logsigma])

    model = Model(inputs=[input_layer, input_layer2], outputs=valid)
    return model

In [None]:
def build_full_generator(gen_model_1, gen_model_2):
    input_layer = layers.Input(shape=(sequence_len,))
    input_layer2 = layers.Input(shape=(100,))

    gen_model_1.trainable = False
    gen_model_2.trainable = False

    x, mean_logsigma = gen_model_1([input_layer, input_layer2])
    x = gen_model_2([x, mean_logsigma])

    model = Model(inputs=[input_layer, input_layer2], outputs=[x, mean_logsigma])
    return model

# model build

In [None]:
em_reshape = embedding_reshape()

stage_1_generator = build_stage1_generator()
stage_1_discriminator = build_stage1_discriminator(em_reshape)
stage_1_adversarial_model = build_stage_1_adversarial_model(stage_1_generator, stage_1_discriminator)

embedding_layer = stage_1_generator.layers[1]

stage_2_generator = build_stage2_generator()
stage_2_discriminator = build_stage2_discriminator(em_reshape)
stage_2_adversarial_model = build_stage_2_adversarial_model(stage_1_generator, stage_2_generator, stage_2_discriminator)

full_generator = build_full_generator(stage_1_generator, stage_2_generator)

All PyTorch model weights were used when initializing TFElectraModel.

Some weights or buffers of the PyTorch model TFElectraModel were not initialized from the TF 2.0 model and are newly initialized: ['discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense_prediction.weight', 'discriminator_predictions.dense.bias', 'discriminator_predictions.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
stage_1_generator = multi_gpu_model(stage_1_generator, gpus=2)
stage_1_discriminator = multi_gpu_model(stage_1_discriminator, gpus=2)
stage_1_adversarial_model = multi_gpu_model(stage_1_adversarial_model, gpus=2)

# define loss and util function

In [None]:
def KL_loss(y_true, y_pred):
    mean = y_pred[:, :128]
    logsigma = y_pred[:, :128]
    loss = -logsigma + .5 * (-1 + tf.exp(2. * logsigma) + tf.square(mean))
    loss = tf.math.reduce_mean(loss)
    return loss

def save_rgb_img(img, path):
    img = (img * 127.5 + 127.5).astype(int)
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    ax.imshow(img)
    ax.axis("off")
    ax.set_title("Image")

    plt.savefig(path)
    plt.close()

# dataset define

In [None]:
df = pd.read_excel(os.path.join(base_path, 'text_data.xlsx')).dropna()
text = df['text'].to_list()
text_negative = text[1:] + [text[1]]

text = sum([[tokenizer.encode(t)] * 20 for t in text], [])
text_negative = sum([[tokenizer.encode(t)] * 20 for t in text_negative], [])

filenames = sorted(os.listdir(os.path.join(base_path, 'transform_img')))
filenames = [os.path.join(base_path + '/transform_img', f) for f in filenames]

assert len(text) == len(filenames), "shape가 맞지 않음"

In [None]:
text = pad_sequences(text, maxlen=sequence_len, dtype=np.int32, truncating="post", padding="post")
text_negative = pad_sequences(text_negative, maxlen=sequence_len, dtype=np.int32, truncating="post", padding="post")

In [None]:
filenames = np.array(filenames)

s = np.arange(text.shape[0])
np.random.shuffle(s)

text = text[s]
text_negative = text_negative[s]
filenames = list(filenames[s])

In [None]:
# real_labels = np.ones((stage1_batch_size, 1), dtype=float) * 0.9
# fake_labels = np.zeros((stage1_batch_size, 1), dtype=float) * 0.1
real_labels = np.ones((stage1_batch_size, 1), dtype=float)
fake_labels = np.zeros((stage1_batch_size, 1), dtype=float)

# stage 1 train

In [None]:
dis_optimizer = Adam(lr=stage1_discriminator_lr, beta_1=0.5, beta_2=0.999)
gen_optimizer = Adam(lr=stage1_generator_lr, beta_1=0.5, beta_2=0.999)

In [None]:
stage_1_generator.compile(loss="mse", optimizer=gen_optimizer)
stage_1_discriminator.compile(loss='binary_crossentropy', optimizer=dis_optimizer)
stage_1_adversarial_model.compile(loss=['binary_crossentropy', KL_loss], loss_weights=[1, 2.0], optimizer=gen_optimizer, metrics=None)

In [None]:
generator_losses = []
discriminator_losses = []

for epoch in range(stage1_epochs):
    number_of_batches = int(text.shape[0] / stage1_batch_size)

    print("========================================")
    print(f"Epoch is: {epoch}, Number of batches: {number_of_batches}")

    gen_losses = []
    dis_losses = []


    for index in tqdm(range(number_of_batches)):

        # 판별자 학습
        z_noise = np.random.normal(size=(stage1_batch_size, 100))
        # 이미지
        image_batch = []
        for fname in filenames[index * stage1_batch_size : (index + 1) * stage1_batch_size]:
            img = Image.open(fname).resize((64, 64))
            # img
            image_batch.append(np.array(img))
        image_batch = np.array(image_batch)
        image_batch = (image_batch - 127.5) / 127.5

        # 텍스트
        text_batch = text[index * stage1_batch_size : (index + 1) * stage1_batch_size]
        text_negative_batch = text_negative[index * stage1_batch_size : (index + 1) * stage1_batch_size]

        # 생성된 이미지
        fake_image_batch, embedding = stage_1_generator.predict([text_batch, z_noise])

        # 가짜 임베딩
        dismatched_embedding = embedding_layer.predict(text_negative_batch)

        # 실제 이미지나 가짜 이미지나 사용한 텍스트는 같음. 그래서 임베딩은 공유함
        # 매치되지 않는 이미지 역시 이미지는 공유하나 사용한 텍스트가 다름. 그래서 이미지는 공유
        dis_loss_real = stage_1_discriminator.train_on_batch([image_batch, embedding], real_labels)
        dis_loss_fake = stage_1_discriminator.train_on_batch([fake_image_batch, embedding], fake_labels)
        dis_loss_wrong = stage_1_discriminator.train_on_batch([image_batch, dismatched_embedding], fake_labels)

        d_loss = 0.5 * np.add(dis_loss_real, 0.5 * np.add(dis_loss_wrong, dis_loss_fake))

        # 제너레이터 학습
        
        g_loss = np.mean([
            stage_1_adversarial_model.train_on_batch([text_batch, z_noise],[tf.ones((stage1_batch_size, 1)), tf.ones((stage1_batch_size, 256))])
            for _ in range(3)
        ])

        dis_losses.append(d_loss)
        gen_losses.append(g_loss)
        
    generator_losses.append(np.mean(gen_losses))
    discriminator_losses.append(np.mean(dis_losses))
        
    if generator_losses[-1] == np.min(generator_losses):
        stage_1_generator.save_weights(os.path.join(base_path + '/models', 'stage_1_generator.h5'))
    if discriminator_losses[-1] == np.min(discriminator_losses):
        stage_1_discriminator.save_weights(os.path.join(base_path + '/models', 'stage_1_discriminator.h5'))

   
    # 2에포크마다 제너레이터가 생성한 이미지를 저장
    if epoch % 2 == 0:
        n = 5
        fake_images, _ = stage_1_generator.predict([text[:n], np.random.normal(size=(n, 100))])

        for i, img in enumerate(fake_images[:10]):
            save_rgb_img(img, os.path.join(base_path + '/generated_image64', f"stage1_generator-epoch{epoch: 4d}_{i}.png"))

    print(f"generator_loss: {np.mean(gen_losses): .4f}")
    print(f"discriminator_loss: {np.mean(dis_losses): .4f}")

# stage 2 train

In [None]:
dis_optimizer = Adam(lr=stage1_discriminator_lr, beta_1=0.5, beta_2=0.999)
gen_optimizer = Adam(lr=stage1_generator_lr, beta_1=0.5, beta_2=0.999)

In [None]:
stage_2_generator.compile(loss="mse", optimizer=gen_optimizer)
stage_2_discriminator.compile(loss='binary_crossentropy', optimizer=dis_optimizer)
stage_2_adversarial_model.compile(loss='binary_crossentropy', optimizer=gen_optimizer, metrics=None)

In [None]:
generator_losses = []
discriminator_losses = []

for epoch in range(stage1_epochs):
    number_of_batches = int(text.shape[0] / stage1_batch_size)

    print("========================================")
    print(f"Epoch is: {epoch}, Number of batches: {number_of_batches}")

    gen_losses = []
    dis_losses = []


    for index in tqdm(range(number_of_batches)):
        
        # 판별자 학습
        z_noise = np.random.normal(size=(stage1_batch_size, 100))
        # 이미지
        image_batch = []
        for fname in filenames[index * stage1_batch_size : (index + 1) * stage1_batch_size]:
            img = Image.open(fname).resize((512, 512))
            image_batch.append(np.asarray(img))
        image_batch = np.array(image_batch)
        image_batch = (image_batch - 127.5) / 127.5

        # 텍스트
        text_batch = text[index * stage1_batch_size : (index + 1) * stage1_batch_size]
        text_negative_batch = text_negative[index * stage1_batch_size : (index + 1) * stage1_batch_size]

        # 생성된 이미지
        fake_image_batch, embedding = full_generator.predict([text_batch, z_noise])

        # 가짜 임베딩
        dismatched_embedding = embedding_layer.predict(text_negative_batch)

        # 실제 이미지나 가짜 이미지나 사용한 텍스트는 같음. 그래서 임베딩은 공유함
        # 매치되지 않는 이미지 역시 이미지는 공유하나 사용한 텍스트가 다름. 그래서 이미지는 공유
        dis_loss_real = stage_2_discriminator.train_on_batch([image_batch, embedding], real_labels)
        dis_loss_fake = stage_2_discriminator.train_on_batch([fake_image_batch, embedding], fake_labels)
        dis_loss_wrong = stage_2_discriminator.train_on_batch([image_batch, dismatched_embedding], fake_labels)

        d_loss = 0.5 * np.add(dis_loss_real, 0.5 * np.add(dis_loss_wrong, dis_loss_fake))

        # 제너레이터 학습
        g_loss = np.mean([
            stage_2_adversarial_model.train_on_batch([text_batch, z_noise],[tf.ones((stage1_batch_size, 1)) * 0.9, tf.ones((stage1_batch_size, 256)) * 0.9])
            for _ in range(3)
        ])

        dis_losses.append(d_loss)
        gen_losses.append(g_loss)

    generator_losses.append(np.mean(gen_losses))
    discriminator_losses.append(np.mean(dis_losses))
        
    if generator_losses[-1] == np.min(generator_losses):
        stage_2_generator.save(os.path.join(base_path + '/models', 'stage_2_generator.h5'))
    if discriminator_losses[-1] == np.min(discriminator_losses):
        stage_2_discriminator.save(os.path.join(base_path + '/models', 'stage_2_discriminator.h5'))

   
    # 2에포크마다 제너레이터가 생성한 이미지를 저장
    if epoch % 2 == 0:
        n = 1
        fake_images, _ = full_generator.predict([text[:1], np.random.normal(size=(1, 100))])

        for i, img in enumerate(fake_images[:10]):
            save_rgb_img(img, os.path.join(base_path + '/generated_image512', f"stage2_generator-epoch{epoch: 4d}_{i}.png"))

    print(f"generator_loss: {np.mean(gen_losses): .4f}")
    print(f"discriminator_loss: {np.mean(dis_losses): .4f}")