In [1]:
import os

os.chdir(os.path.dirname(os.getcwd()))

In [3]:
import time
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import cv2
import os
import pickle
import tensorflow as tf
import glob
import random

from tensorflow import keras
from cleanfid import fid
from IPython import display
from time import sleep

seed = 42
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
autotune = tf.data.AUTOTUNE

In [None]:
from models.discirminator import build_d
from models.generator import build_g

In [None]:
# image variabels
batch_size = 32
img_size = (128, 128)

# discriminator variables
img_shape = (128, 128, 1)
num_classes = 2

# generator variabels
z_dim = 128
initial_shape = (8, 8, 1024)
embed_label_shape = (8, 8, 1)

In [None]:
# 0 = normal, 1 = pneumonia
path = None  # specify path to data
data = keras.utils.image_dataset_from_directory(
    directory=path,
    label_mode="binary",
    color_mode="grayscale",
    batch_size=batch_size,
    image_size=img_size,
    shuffle=True,
    smart_resize=True,
)
rescale = lambda x, label: ((x / 127.5) - 1.0, label)
data = data.map(map_func=rescale, num_parallel_calls=autotune).prefetch(
    buffer_size=autotune
)

In [None]:
num_show = 16
fig = plt.figure(figsize=(15, 10))
for images, labels in data:
    for i in range(num_show):
        plt.subplot(4, 4, i + 1)
        plt.imshow(images[i, :, :, 0] * 127.5 + 127.5, cmap="gray")
        title = "normal" if str(labels[i].numpy()[0]) == "0.0" else "penumonia"
        plt.title(title)
        plt.axis("off")
    break

In [None]:
D = build_d()
D.summary()

In [None]:
G = build_g()
G.summary()

In [None]:
G_loss = keras.losses.BinaryCrossentropy(from_logits=True)
D_loss = keras.losses.BinaryCrossentropy(from_logits=True)
R_loss = keras.losses.MeanAbsoluteError()

# TTUR
G_lr = 1e-4
D_lr = 3e-4
G_optimizer = tf.keras.optimizers.Adam(learning_rate=G_lr, beta_1=0.5, beta_2=0.9)
D_optimizer = tf.keras.optimizers.Adam(learning_rate=D_lr, beta_1=0.5, beta_2=0.9)
LAMBDA = 0.01

In [None]:
def discriminator_loss(real_output, fake_output):
    # one-sided label smoothing
    noise = 0.05 * tf.random.uniform(tf.shape(real_output))
    real_label = tf.ones_like(real_output) + noise
    fake_label = tf.zeros_like(fake_output) + noise

    real_loss = D_loss(real_label, real_output)
    fake_loss = D_loss(fake_label, fake_output)
    return real_loss + fake_loss


def generator_loss(
    fake_output, use_r_loss=False, real_images=None, fake_images=None, lambda_=LAMBDA
):
    real_label = tf.ones_like(fake_output)
    binary_loss = G_loss(real_label, fake_output)
    if use_r_loss:
        reconstruction_loss = R_loss(real_images, fake_images)
        return binary_loss + (lambda_ * reconstruction_loss)

    return binary_loss

In [None]:
@tf.function
def train_step(
    real_images,
    labels,
):
    batch_size = tf.shape(real_images)[0]
    # train D
    noise = tf.random.normal(shape=[batch_size, z_dim])
    with tf.GradientTape() as D_tape:
        fake_images = G([noise, labels], training=True)
        real_output = D([real_images, labels], training=True)
        fake_output = D([fake_images, labels], training=True)

        D_loss = discriminator_loss(real_output, fake_output)
    D_grad = D_tape.gradient(D_loss, D.trainable_weights)
    D_optimizer.apply_gradients(zip(D_grad, D.trainable_weights))
    # train G
    noise = tf.random.normal(shape=[batch_size, z_dim])
    with tf.GradientTape() as G_tape:
        fake_images = G([noise, labels], training=True)
        fake_output = D([fake_images, labels], training=True)

        # for reconstruction loss
        G_loss = generator_loss(
            fake_output,
            use_r_loss=True,
            real_images=real_images,
            fake_images=fake_images,
        )
    G_grad = G_tape.gradient(G_loss, G.trainable_weights)
    G_optimizer.apply_gradients(zip(G_grad, G.trainable_weights))

In [None]:
def get_fid(folder_real_path: str, folder_fake_path: str, iter=5):
    score = []
    for _ in range(iter):
        # pass noise and labels to G -> get fake imgs
        noise = tf.random.normal(shape=(6849 + 7208, z_dim))  # change over iteration
        labels = tf.concat(
            [tf.zeros(shape=(6849, 1)), tf.ones(shape=(7208, 1))], axis=0
        )
        labels = tf.random.shuffle(labels)
        fake_imgs = (
            G.predict(x=[noise, labels], batch_size=64, verbose=0) * 127.5 + 127.5
        )
        # save fake imgs to folder fake
        for i in range(fake_imgs.shape[0]):
            # change path string to save fake image
            cv2.imwrite(f"{i}.png", fake_imgs[i, :, :, :])
        # wait for sure, colab is slow
        print("sleep...zZz")
        sleep(5)
        with os.scandir(None) as it:
            if any(it):
                print("folder not empty")

        # compute fid here
        score_clean = fid.compute_fid(
            folder_real_path, folder_fake_path, mode="clean", num_workers=12
        )
        print(f"clean-fid score is {score_clean:.3f}")
        score.append(score_clean)
        print("deleting images..")
        for f in glob.glob(""):
            # change "" to path to fake image folder
            os.remove(f)
        sleep(3)
    return score

In [None]:
def generate_and_save_images(inputs, labels, epoch, path):
    predictions = G.predict(x=[inputs, labels], verbose=0)
    predictions = predictions * 127.5 + 127.5

    fig = plt.figure(figsize=(15, 10))
    for i in range(predictions.shape[0]):
        plt.subplot(4, 4, i + 1)
        plt.imshow(predictions[i, :, :, 0], cmap="gray")
        title = "normal" if str(labels[i].numpy()[0]) == "0" else "penumonia"
        plt.title(title)
        plt.axis("off")

    plt.savefig(path + "/" + f"image_{epoch + 1}.png")
    plt.show()

In [None]:
folder_real_path = None
folder_fake_path = None
# create folder for fake images
if not os.path.exists(folder_fake_path):
    os.mkdir("folder_fake")
    sleep(5)
print(
    f"real and fake folder are there: {os.path.exists(folder_real_path)} {os.path.exists(folder_fake_path)}"
)

In [None]:
# 1 create folder in weight folder
weight_folder_name = "dcgan_weight"
save_weight_path = "" + weight_folder_name
if not os.path.exists(save_weight_path):
    os.mkdir(save_weight_path)
    sleep(5)

# 2 specify path to save fid in save_objects folder
fid_name = "fid_dcgan.pickle"
save_fid_path = "" + "/" + fid_name

# 3 create image folder
image_folder_name = "dcgan"
save_image_path = "" + "/" + image_folder_name
if not os.path.exists(save_image_path):
    os.mkdir(save_image_path)
    sleep(5)

print(save_weight_path)
print(save_fid_path)
print(save_image_path)

In [None]:
EPOCHS = 200
FID = []
NUM_SAMPLES = 16
INTERVAL = 5

for epoch in range(EPOCHS):
    print(f"starting epoch: {epoch + 1}")
    start = time.time()
    for images, labels in data:
        train_step(images, labels)

    display.clear_output(wait=True)
    print("Time for epoch {} is {} sec".format(epoch + 1, time.time() - start))
    print("")

    if (epoch + 1) % INTERVAL == 0:
        inputs = tf.random.normal(shape=(NUM_SAMPLES, z_dim))
        labels = tf.random.uniform((NUM_SAMPLES, 1), 0, 2, dtype=tf.int32)
        generate_and_save_images(inputs, labels, epoch, save_image_path)
        print("")

    if (epoch + 1) % INTERVAL == 0:
        FID.append(
            get_fid(
                iter=3,
                folder_real_path=folder_real_path,
                folder_fake_path=folder_fake_path,
            )
        )
        print(f"get fid at epoch : {epoch + 1}")

    if (epoch + 1) % INTERVAL == 0:
        G.save_weights(save_weight_path + "/" + f"G_weight_{epoch + 1}.h5")
        print("weight saved")

In [None]:
with open(save_fid_path, "wb") as f:
    pickle.dump(FID, f)