# Introdução

O seguinte trabalho foi dividido em duas partes, o foco é o estudo de CycleGans.

## Parte 1

O que é feito nesta primeira fase é a aplicação de uma CycleGan ao dataset do tensorflow horse2zebra. O objetivo deste é converter a imagem de um cavalo numa zebra e vice versa da forma mais realista possível. Para isso a arquitetura da CycleGan utilizada é inspirada na CNN U-Net, que acaba por ser semelhante ao utilizado no Pix2Pix mas com a diferença de que este não recebe paired data, ou seja, não existe uma relação de 1 para 1 nas imagens do dataset, e a função de normalização utilizada é a instance normalization em vez da batch normalization. Assim sendo, esta Gan é constituída, não por um mas sim dois geradores para realizar a conversão A -> B mas também B -> A, o que leva á necessidade de dois discriminadores para avaliar estas conversões. Em relação á função de loss, como não existe necessariamente um target, são apresentadas aqui três conceitos de Loss, Adversial Loss, função de erro que permite ao gerador melhorar uma vez que este recebe um feedback calculado a partir do resultado dado pelo discriminador, como o seu objetivo é diminuir o erro este irá tentar produzir imagens mais realistas para enganar o discriminador; Cycle Consistency  Loss, que basicamente tem como objetivo aproximar o resultado final com a imagem input, ou seja, se o gerador um recebe um cavalo e o transforma numa zebra e esta zebra for passada ao gerador dois, a diferença entre o cavalo obtido do gerador dois e o cavalo inicial terá de ser a menor possível; e por fim Identity Loss que em termos simples diz que se dermos como input uma imagem de um cavalo ao gerador que transforma zebras em cavalos então o resultado deve ser a mesma imagem. Como forma de exploração destas funções de loss, são treinados dois modelos onde se modifica a Cycle Consistency Loss entre L1 (MAE) e L2 (MSE).

## Parte 2

Já na segunda parte, é utilizada uma CycleGan para resolver um problema em concreto. Com o que se aprendeu na parte 1 pretende-se agora desenvolver uma Gan capaz de corrigir o balanceamento de dados do dataset da seguinte competição do Kaggle: https://www.kaggle.com/competitions/plant-pathology-2020-fgvc7. Este dataset apresenta um conjunto de imagens de folhas de árvores, labeled com o seu estado de "saúde", ou seja, **healthy**, **multiple_diseases**, **rust** e **scab**. Como iremos ver, a obtenção de dados de folhas com múltiplas doenças são difíceis de coletar, assim pretende-se com esta CycleGan resolver esse problema criando folhas **multiple_diseases** a partir de folhas **healthy**.

# Imports

In [None]:
!pip install git+https://github.com/tensorflow/examples.git
!pip install -U tensorflow-addons
!pip install tensorflow_datasets

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_addons as tfa
import pandas as pd
from tensorflow_examples.models.pix2pix import pix2pix

import os
import time
import matplotlib.pyplot as plt
from google.colab import drive
from IPython.display import clear_output
from keras import Sequential, Model
from keras.utils import plot_model
from keras.layers import Dense, BatchNormalization, LeakyReLU, Reshape, Conv2DTranspose, Conv2D, Dropout, Flatten, ReLU, Input, concatenate, ZeroPadding2D, Concatenate

AUTOTUNE = tf.data.AUTOTUNE


In [None]:
drive.mount('/content/drive',force_remount=True)

In [None]:
!unzip /content/drive/MyDrive/plant-pathology-2020-fgvc7.zip -d /content/

# Global Variables

In [5]:
BUFFER_SIZE = 1000
BATCH_SIZE  = 1
IMG_WIDTH   = 256
IMG_HEIGHT  = 256
LAMBDA = 10
EPOCHS = 500
OUTPUT_CHANNELS = 3
SAMPLE_LEN = 100

IMAGE_PATH = "/content/images/"
TRAIN_PATH = "/content/train.csv"

# Functions

In [6]:
def format_path(st):
  return IMAGE_PATH + st + '.jpg'

def random_crop(image):
  cropped_image = tf.image.random_crop(image, size=[IMG_HEIGHT, IMG_WIDTH, 3])
  return cropped_image

def normalize(image):
  image = tf.cast(image, tf.float32)
  image = (image / 127.5) - 1
  return image

def random_jitter(image):
  # resizing to 286 x 286 x 3
  image = tf.image.resize(image, [286, 286], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
  # randomly cropping to 256 x 256 x 3
  image = random_crop(image)
  # random mirroring
  image = tf.image.random_flip_left_right(image)
  return image

def preprocess_image_train_1(image, label):
  image = random_jitter(image)
  image = normalize(image)
  return image

def preprocess_image_train_2(image, label):
  bits = tf.io.read_file(image)
  image = tf.image.decode_jpeg(bits, channels=3)
  image = random_jitter(image)
  image = normalize(image)
  return image

def preprocess_image_test(image, label):
  image = normalize(image)
  return image

def generate_images(model, test_input):
  prediction = model(test_input)

  plt.figure(figsize=(12, 12))

  display_list = [test_input[0], prediction[0]]
  title = ['Input Image', 'Predicted Image']

  for i in range(2):
    plt.subplot(1, 2, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()

@tf.function
def train_step(real_x, real_y):
  # persistent is set to True because the tape is used more than once to calculate the gradients.
  with tf.GradientTape(persistent=True) as tape:
    # Generator G translates X -> Y
    # Generator F translates Y -> X

    fake_y = generator_g(real_x, training=True)
    cycled_x = generator_f(fake_y, training=True)

    fake_x = generator_f(real_y, training=True)
    cycled_y = generator_g(fake_x, training=True)

    # same_x and same_y are used for identity loss.
    same_x = generator_f(real_x, training=True)
    same_y = generator_g(real_y, training=True)

    disc_real_x = discriminator_x(real_x, training=True)
    disc_real_y = discriminator_y(real_y, training=True)

    disc_fake_x = discriminator_x(fake_x, training=True)
    disc_fake_y = discriminator_y(fake_y, training=True)

    # calculate the loss
    gen_g_loss = generator_loss(disc_fake_y)
    gen_f_loss = generator_loss(disc_fake_x)

    total_cycle_loss = calc_cycle_loss(real_x, cycled_x) + calc_cycle_loss(real_y, cycled_y)

    # Total generator loss = adversarial loss + cycle loss + identity loss
    total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(real_y, same_y)
    total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(real_x, same_x)

    disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x)
    disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y)

  # Calculate the gradients for generator and discriminator
  generator_g_gradients = tape.gradient(total_gen_g_loss, generator_g.trainable_variables)
  generator_f_gradients = tape.gradient(total_gen_f_loss, generator_f.trainable_variables)

  discriminator_x_gradients = tape.gradient(disc_x_loss, discriminator_x.trainable_variables)
  discriminator_y_gradients = tape.gradient(disc_y_loss, discriminator_y.trainable_variables)

  # Apply the gradients to the optimizer
  generator_g_optimizer.apply_gradients(zip(generator_g_gradients, generator_g.trainable_variables))
  generator_f_optimizer.apply_gradients(zip(generator_f_gradients, generator_f.trainable_variables))

  discriminator_x_optimizer.apply_gradients(zip(discriminator_x_gradients, discriminator_x.trainable_variables))
  discriminator_y_optimizer.apply_gradients(zip(discriminator_y_gradients, discriminator_y.trainable_variables))

# CycleGAN: U-Net inspired

## Build the Generator

In [7]:
class InstanceNormalization(tf.keras.layers.Layer):
  """Instance Normalization Layer (https://arxiv.org/abs/1607.08022)."""

  def __init__(self, epsilon=1e-5):
    super(InstanceNormalization, self).__init__()
    self.epsilon = epsilon

  def build(self, input_shape):
    self.scale = self.add_weight(
        name='scale',
        shape=input_shape[-1:],
        initializer=tf.random_normal_initializer(1., 0.02),
        trainable=True)

    self.offset = self.add_weight(
        name='offset',
        shape=input_shape[-1:],
        initializer='zeros',
        trainable=True)

  def call(self, x):
    mean, variance = tf.nn.moments(x, axes=[1, 2], keepdims=True)
    inv = tf.math.rsqrt(variance + self.epsilon)
    normalized = (x - mean) * inv
    return self.scale * normalized + self.offset

In [8]:
# ENCODER
def downsample(filters, size, apply_norm=True):
  initializer = tf.random_normal_initializer(0., 0.02)
  result = Sequential()
  result.add(Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
  if apply_norm: result.add(InstanceNormalization())
  result.add(LeakyReLU())
  return result

In [9]:
# DECODER
def upsample(filters, size, apply_dropout=False):
  initializer = tf.random_normal_initializer(0., 0.02)
  result = Sequential()
  result.add(Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
  result.add(InstanceNormalization())
  if apply_dropout: result.add(Dropout(0.5))
  result.add(ReLU())
  return result

In [10]:
# GENERATOR
def Generator():
  inputs = Input(shape=[256, 256, 3])

  down_stack = [
    downsample(64, 4, apply_norm=False),  # (batch_size, 128, 128, 64)
    downsample(128, 4),  # (batch_size, 64, 64, 128)
    downsample(256, 4),  # (batch_size, 32, 32, 256)
    downsample(512, 4),  # (batch_size, 16, 16, 512)
    downsample(512, 4),  # (batch_size, 8, 8, 512)
    downsample(512, 4),  # (batch_size, 4, 4, 512)
    downsample(512, 4),  # (batch_size, 2, 2, 512)
    downsample(512, 4),  # (batch_size, 1, 1, 512)
  ]

  up_stack = [
    upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
    upsample(512, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
    upsample(512, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
    upsample(512, 4),  # (batch_size, 16, 16, 1024)
    upsample(256, 4),  # (batch_size, 32, 32, 512)
    upsample(128, 4),  # (batch_size, 64, 64, 256)
    upsample(64, 4),  # (batch_size, 128, 128, 128)
  ]

  initializer = tf.random_normal_initializer(0., 0.02)
  last = Conv2DTranspose(OUTPUT_CHANNELS, 4, strides=2, padding='same', kernel_initializer=initializer, activation='tanh')  # (bs, 256, 256, 3)
  concat = Concatenate()

  inputs = Input(shape=[None, None, 3])
  x = inputs

  # Downsampling through the model
  skips = []
  for down in down_stack:
    x = down(x)
    skips.append(x)

  skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    x = concat([x, skip])

  x = last(x)

  return Model(inputs=inputs, outputs=x)


## Build the Discriminator

In [11]:
def Discriminator():
  initializer = tf.random_normal_initializer(0., 0.02)

  inp = Input(shape=[None, None, 3], name='input_image')
  x = inp

  down1 = downsample(64, 4, False)(x)  # (bs, 128, 128, 64)
  down2 = downsample(128, 4)(down1)  # (bs, 64, 64, 128)
  down3 = downsample(256, 4)(down2)  # (bs, 32, 32, 256)

  zero_pad1 = ZeroPadding2D()(down3)  # (bs, 34, 34, 256)
  conv = Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1)  # (bs, 31, 31, 512)

  norm1 = InstanceNormalization()(conv)
  leaky_relu = LeakyReLU()(norm1)
  zero_pad2 = ZeroPadding2D()(leaky_relu)  # (bs, 33, 33, 512)

  last = Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)  # (bs, 30, 30, 1)

  return Model(inputs=inp, outputs=last)

## Build the Model

* Generator `G` learns to transform image `X` to image `Y`. $(G: X -> Y)$
* Generator `F` learns to transform image `Y` to image `X`. $(F: Y -> X)$
* Discriminator `D_X` learns to differentiate between image `X` and generated image `X` (`F(Y)`).
* Discriminator `D_Y` learns to differentiate between image `Y` and generated image `Y` (`G(X)`).

In [12]:
generator_g = Generator() # A to B
generator_f = Generator() # B to A

discriminator_x = Discriminator()
discriminator_y = Discriminator()

In [None]:
plot_model(generator_g, show_shapes=True, dpi=64)

In [None]:
plot_model(discriminator_x, show_shapes=True, dpi=64)

# Loss Functions e Optimizers

In [15]:
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

## Loss Function 1: Using L1 (MAE)

In [16]:
def discriminator_loss(real, generated):
  real_loss = loss_obj(tf.ones_like(real), real)
  generated_loss = loss_obj(tf.zeros_like(generated), generated)
  total_disc_loss = real_loss + generated_loss
  return total_disc_loss * 0.5

def generator_loss(generated):
  return loss_obj(tf.ones_like(generated), generated)

def calc_cycle_loss(real_image, cycled_image):
  loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
  return LAMBDA * 0.5 * loss1

def identity_loss(real_image, same_image):
  loss = tf.reduce_mean(tf.abs(real_image - same_image))
  return LAMBDA * 0.5 * loss

## Loss Function 2: Using L2 (MSE)

In [None]:
def discriminator_loss(real, generated):
  real_loss = loss_obj(tf.ones_like(real), real)
  generated_loss = loss_obj(tf.zeros_like(generated), generated)
  total_disc_loss = real_loss + generated_loss
  return total_disc_loss * 0.5

def generator_loss(generated):
  return loss_obj(tf.ones_like(generated), generated)

def calc_cycle_loss(real_image, cycled_image):
  loss2 = tf.reduce_mean(tf.square(real_image - cycled_image))
  return LAMBDA * loss2

def identity_loss(real_image, same_image):
  loss = tf.reduce_mean(tf.abs(real_image - same_image))
  return LAMBDA * 0.5 * loss

# Checkpoints

In [17]:
checkpoint_path = "./drive/MyDrive/ind"

ckpt = tf.train.Checkpoint(generator_g=generator_g,
                           generator_f=generator_f,
                           discriminator_x=discriminator_x,
                           discriminator_y=discriminator_y,
                           generator_g_optimizer=generator_g_optimizer,
                           generator_f_optimizer=generator_f_optimizer,
                           discriminator_x_optimizer=discriminator_x_optimizer,
                           discriminator_y_optimizer=discriminator_y_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=1)

# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print ('Latest checkpoint restored!!')

Latest checkpoint restored!!


# Parte 1

## Input Pipeline

In [None]:
dataset, metadata = tfds.load('cycle_gan/horse2zebra', with_info=True, as_supervised=True)

train_set_A, train_set_B = dataset['trainA'], dataset['trainB']
test_set_A , test_set_B  = dataset['testA'] , dataset['testB']

train_set_A = train_set_A.cache().map(preprocess_image_train_1, num_parallel_calls=AUTOTUNE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
train_set_B = train_set_B.cache().map(preprocess_image_train_1, num_parallel_calls=AUTOTUNE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

test_set_A = test_set_A.map(preprocess_image_test, num_parallel_calls=AUTOTUNE).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_set_B = test_set_B.map(preprocess_image_test, num_parallel_calls=AUTOTUNE).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

## Training

In [None]:
sample_A = next(iter(train_set_A))

for epoch in range(EPOCHS):
  start = time.time()

  n = 0
  for image_x, image_y in tf.data.Dataset.zip((train_set_A, train_set_B)):
    train_step(image_x, image_y)
    if n % 10 == 0:
      print ('.', end='')
    n += 1

  # clear_output(wait=True)
  # Using a consistent image (sample_horse) so that the progress of the model is clearly visible.
  generate_images(generator_g, sample_A)

  ckpt_save_path = ckpt_manager.save()
  print ('Saving checkpoint for epoch {} at {}'.format(epoch+1, ckpt_save_path))
  print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1, time.time()-start))


## Test Loss L1

In [None]:
for inp in test_set_A.take(10):
  generate_images(generator_g, inp)

In [None]:
for inp in test_set_B.take(10):
  generate_images(generator_f, inp)

## Test Loss L2

In [None]:
for inp in test_set_A.take(10):
  generate_images(generator_g, inp)

In [None]:
for inp in test_set_B.take(10):
  generate_images(generator_f, inp)

# Parte 2

## Data Viz

In [None]:
train_data = pd.read_csv(TRAIN_PATH)
md_gan = []

In [None]:
train_data.head()

In [19]:
healthy = train_data[train_data['healthy']>0]
multiple_diseases = train_data[train_data['multiple_diseases']>0]
rust = train_data[train_data['rust']>0]
scab = train_data[train_data['scab']>0]

In [None]:
labels = 'healthy', 'multiple diseases', 'rust', 'scab'
sizes = [len(healthy), len(multiple_diseases), len(rust), len(scab)]
colors = ['gold', 'yellowgreen', 'lightcoral', 'lightskyblue']
explode = (0.1, 0, 0, 0)  # explode 1st slice

# Plot
plt.rcParams.update({'font.size': 22})
plt.figure(figsize=(10,10))
plt.pie(sizes, explode=explode, labels=labels, colors=colors,
autopct='%1.1f%%', shadow=True, startangle=140)

plt.axis('equal')
plt.show()

Como podemos verificar pelo Pie Chart apresentado, existe um grande desbalanceamento no que toca a imagens relativas a folhas da categoria multiple diseases. Assim sendo, como já mencionado o objetivo será gerar imagens da mesma categoria a partir das healthy através da CycleGan.

## Input Pipeline

In [21]:
healthy_imgs = healthy.image_id.apply(format_path).values
md_imgs = multiple_diseases.image_id.apply(format_path).values

train_healthy = (
    tf.data.Dataset
    .from_tensor_slices((healthy_imgs, ['healthy_train']*len(healthy_imgs)))
    .map(preprocess_image_train_2, num_parallel_calls=AUTOTUNE)
    .cache()
    .shuffle(BUFFER_SIZE)
    .batch(1)
)
train_md = (
    tf.data.Dataset
    .from_tensor_slices((md_imgs, ['multiple_diseases_train']*len(md_imgs)))
    .map(preprocess_image_train_2, num_parallel_calls=AUTOTUNE)
    .cache()
    .shuffle(BUFFER_SIZE)
    .batch(1)
)

## Training

In [None]:
sample_healthy = next(iter(train_healthy))

for epoch in range(EPOCHS):
    start = time.time()

    n = 0
    for image_x, image_y in tf.data.Dataset.zip((train_healthy, train_md)):
        train_step(image_x, image_y)
        if n % 10 == 0:
            print ('.', end='')
        n+=1

    # clear_output(wait=True)
    # Using a consistent image (sample_horse) so that the progress of the model
    # is clearly visible.
    generate_images(generator_g, sample_healthy)
    print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1, time.time()-start))

    if epoch % 20 == 0:
      ckpt_save_path = ckpt_manager.save()
      print ('Saving checkpoint for epoch {} at {}'.format(epoch+1, ckpt_save_path))

## Test (just visualizing generated images)

Aqui são apresentadas as imagens alvo, ou seja, imagens labeled como multiple_diseases.

In [None]:
store = []

for image_ in tf.data.Dataset.zip(train_md):
  store.append(image_[0].numpy() * 0.5 + 0.5)

fig, axis =plt.subplots(3, 5, figsize=(15, 10))
for i, ax in enumerate(axis.flat):
  # getting the pixel values between [0, 1] to plot it.
  ax.imshow(store[i])
  ax.axis('off')

Aqui são apresentadas algumas das imagens geradas.

In [None]:
for image_ in tf.data.Dataset.zip(train_healthy):
  prediction_ = generator_g(image_)
  md_gan.append(prediction_[0].numpy() * 0.5 + 0.5)

fig, axis =plt.subplots(3, 5, figsize=(15, 10))
for i, ax in enumerate(axis.flat):
  # getting the pixel values between [0, 1] to plot it.
  ax.imshow(md_gan[i])
  ax.axis('off')

In [None]:
for img in tf.data.Dataset.zip(train_healthy).take(10):
    generate_images(generator_g, img)

# Resultados - Conclusão

## Parte 1

LOSS L1 - Em relação aos resultados obtidos na primeira parte do trabalho, depois de um extenso e demorado processo de treino, que aproximou-se das 150 epochs podemos observar que as transformações conseguidas não são para todos os casos positivas. Em relação ao gerador 1 (cavalo - zebra) este mostra resultados promissores tendo mesmo em alguns casos conseguido converter um cavalo numa zebra totalmente. No entanto existem ainda imagens em que este não consegue identificar a localização do cavalo e desenha as riscas da zebra num local aleatório da imagem. Já o gerador 2 (zebra - cavalo) ficou um bocado aquém do esperado, como podemos observar, ao contrário do gerador 1, este parece conseguir localizar com facilidade a zebra na imagem. Apesar disso, o "processo" de retirar as riscas da zebra e pintá-la de castanho parece ainda bastante ineficaz, o que se pode comprovar pelos resultados de zebras quase transparentes.

LOSS L2 - Já em relação ao modelo treinado com a loss l2, que também treinou durante cera de 150 epochs, esta apresenta um desempenho semelhante à anterior. No entanto, um ponto forte aqui é o facto de este na maior parte dos casos conseguir identificar o cavalo na imagem. Como podemos observar algumas das transformações do gerador 1 (cavalo - zebra) encontram-se próximas do esperado apesar de ainda haver casos mais difíceis de trabalhar como os cavalos brancos. Já para o gerador 2 (zebra - cavalo), á semelhança do treinado anteriormente, as transformações ainda se encontram bastante incompletas. O ponto fraco do modelo parece ser não conseguir ainda "retirar" as riscas das zebras, que parece ser o que ele está a fazer quando estas aparecem transparentes. No entanto, nalguns casos a cor caractrística de um cavalo já comeca a aparecer.

Apesar dos resultados obtidos, os modelos aparentam estar num bom caminho  sendo que uma solução para os melhorar seria continuar o treino por um número superior de epochs.

## Parte 2

Como base nos resultados que se obteve em cima, conclui-se que apesar de ainda haver folhas que não apresentam alteração alguma, grande parte das imagens transformadas parecem ter atingido o objetivo final, que seria a criação de folhas com multiple diseases. Á semelhança dos modelos desenvolvidos na primeira parte do trabalho, também este apresenta espaço para melhorias, ou seja, treiná-lo durante mais algumas epochs, uma vez que este apenas treinou cerca de 60. Para concluir a 100% o objetivo inicial, faltaria depois de um processo de treino mais extenso, adicionar estas imagens geradas ao dataset inicial e assim corrigir o problema de desbalanceamento dos dados.