In [None]:
%tensorflow_version 2.x 

In [None]:
import tensorflow as tf
from matplotlib import pyplot as plt
import numpy as np
import os
import time
import random

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive

In [None]:
os.environ['KAGGLE_CONFIG_DIR'] = "/content/drive/MyDrive/"

!kaggle datasets download -d ktaebum/anime-sketch-colorization-pair
%ls

In [None]:
!unzip \*.zip && rm *.zip

In [None]:
%ls
%pwd

In [None]:
imgs_path = 'data/data/train/'
images = os.listdir(imgs_path)
print(len(images))

In [None]:
test_imgs_path = 'data/data/val/'
test_images = os.listdir(test_imgs_path)
print(len(test_images))

In [None]:
new_images = []
for i in images:
  i = 'data/train/'+i
  new_images.append(i)
new_test_images = []
for i in test_images:
  i = 'data/val/'+i
  new_test_images.append(i)
images = new_images
test_images = new_test_images

In [None]:
%pwd
%cd data

In [None]:
#%cd train

img = tf.keras.preprocessing.image.load_img(random.choice(images))
img

In [None]:
test_img = tf.keras.preprocessing.image.load_img(random.choice(test_images))
test_img

In [None]:
def read_jpg(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img,channels=3)
    return img 

def normalise(image,mask):
    image = tf.cast(image,tf.float32)/127.5 - 1
    mask = tf.cast(mask,tf.float32)/127.5 - 1
    return image , mask

def load_image(path):
    input_image = read_jpg(path)
    w = tf.shape(input_image)[1]
    w = w//2
    image = input_image[:,:w,:]
    mask = input_image[:,w:,:]
    image = tf.image.resize(image, (256, 256))
    mask = tf.image.resize(mask, (256, 256))
    if tf.random.uniform(()) > 0.5 :
        image = tf.image.flip_left_right(image)
        mask = tf.image.flip_left_right(image)
    image , mask = normalise(image,mask)
    return mask,image

In [None]:
def show(image,mask):
    plt.subplot(1,2,1)
    plt.imshow(image)
    plt.subplot(1,2,2)
    plt.imshow(mask)
    return None

In [None]:
%pwd
image,mask = load_image(images[0])
print(tf.shape(image)[1])

In [None]:
print(images[0])
l = []
for img in images:
    rand = imgs_path+str(img)
    l.append(rand)
print(imgs_path)
print(l[0])

In [None]:
dataset = tf.data.Dataset.from_tensor_slices(images)
train = dataset.map(load_image,num_parallel_calls = tf.data.experimental.AUTOTUNE)
train

In [None]:
test_dataset = tf.data.Dataset.from_tensor_slices(test_images)
test = dataset.map(load_image,num_parallel_calls = tf.data.experimental.AUTOTUNE)
test

In [None]:
BATCH_SIZE = 4
BUFFER_SIZE = 4000 

In [None]:
train_dataset = train.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
print(":)")

In [None]:
test_dataset = test.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
for i in train_dataset.take(1):
  print(i)

In [None]:
OUTPUT_CHANNELS = 3

In [None]:
def downsample(filters, size, apply_batchnorm=True):
    result = tf.keras.Sequential()
    result.add(
        tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                               use_bias=False))
    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())
        result.add(tf.keras.layers.LeakyReLU())

    return result

def upsample(filters, size, apply_dropout=False):
    result = tf.keras.Sequential()
    result.add(
        tf.keras.layers.Conv2DTranspose(filters, size, strides=2, padding='same',
                                        use_bias=False))
    result.add(tf.keras.layers.BatchNormalization())
    if apply_dropout:
        result.add(tf.keras.layers.Dropout(0.5))
    result.add(tf.keras.layers.ReLU())

    return result

In [None]:
def Generator():
    inputs = tf.keras.layers.Input(shape=[256,256,3])

    down_stack = [
        downsample(64, 4, apply_batchnorm=False), 
        downsample(128, 4),
        downsample(256, 4), 
        downsample(512, 4), 
        downsample(512, 4), 
        downsample(512, 4), 
        downsample(512, 4), 
        downsample(512, 4), 
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True), 
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True), 
        upsample(512, 4),
        upsample(256, 4), 
        upsample(128, 4), 
        upsample(64, 4),
    ]

    last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                         strides=2,
                                         padding='same',
                                         activation='tanh')
    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
generator = Generator()
l = 10

In [None]:
def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

    total_gen_loss = gan_loss + (l * l1_loss)

    return total_gen_loss, gan_loss, l1_loss

In [None]:
def Discriminator():
    inp = tf.keras.layers.Input(shape=[256, 256, 3], name='input_image')
    tar = tf.keras.layers.Input(shape=[256, 256, 3], name='target_image')

    x = tf.keras.layers.concatenate([inp, tar]) 

    down1 = downsample(64, 4, False)(x) 
    down2 = downsample(128, 4)(down1) 
    down3 = downsample(256, 4)(down2) 

    conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                  padding='same',
                                  use_bias=False)(down3)

    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

    last = tf.keras.layers.Conv2D(1, 4, strides=1, padding='same')(leaky_relu) 

    return tf.keras.Model(inputs=[inp, tar], outputs=last)

In [None]:
discriminator = Discriminator()
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

    total_disc_loss = real_loss + generated_loss

    return total_disc_loss

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [None]:
def generate_images(model, test_input, tar):
    prediction = model(test_input, training=True)
    plt.figure(figsize=(15,15))

    display_list = [test_input[0], tar[0], prediction[0]]
    title = ['Input Image', 'Ground Truth', 'Predicted Image']

    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
        plt.imshow(display_list[i] * 0.5 + 0.5)
        plt.axis('off')
    plt.show()

In [None]:
%pwd
%cd MyDrive/

In [None]:
%cd data/data/val

In [None]:
EPOCHS = 100

In [None]:
@tf.function
def train_step(input_image, target, epoch):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)

        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)

        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    generator_gradients = gen_tape.gradient(gen_total_loss,
                                          generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              discriminator.trainable_variables))
    return gen_total_loss, disc_loss

In [None]:
epoch_loss_avg_gen = tf.keras.metrics.Mean('g_loss')
epoch_loss_avg_disc = tf.keras.metrics.Mean('d_loss')
g_loss_results = []
d_loss_results = []

In [None]:
def fit(train_ds, epochs, test_ds):
    for epoch in range(epochs+1):
        if epoch%10 == 0:
            for example_input, example_target in test_ds.take(1):
                generate_images(generator, example_input, example_target)
        print("Epoch: ", epoch)

        for n, (input_image, target) in train_ds.enumerate():
            print('.', end='')
            g_loss, d_loss = train_step(input_image, target, epoch)
            epoch_loss_avg_gen(g_loss)
            epoch_loss_avg_disc(d_loss)
        print()
        g_loss_results.append(epoch_loss_avg_gen.result())
        d_loss_results.append(epoch_loss_avg_disc.result())
        
        epoch_loss_avg_gen.reset_states()
        epoch_loss_avg_disc.reset_states()

In [None]:
for input, target in test_dataset.take(1):
    generate_images(generator, input, target)

In [None]:
fit(train_dataset, EPOCHS, test_dataset)