In [None]:
try:
    import cupy
except:
    pass
import datetime
from IPython import display
from matplotlib import pyplot as plt
#from mri_cs import column2matrix, matrix2column, prefiltering, radial_lines_samples_rows
import numpy as np
import os
import pathlib
import pydot
from scipy.io import loadmat
import tensorflow as tf
import time
import zipfile
from PIL import Image
import glob
import pandas as pd
import matplotlib.image as mpimg
from tensorflow.keras.utils import plot_model

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
mode = 'test'
save_checkpoint = 0

In [None]:
# Conta o número de arquivos em uma pasta

def count_files(folder):
    items = os.listdir(folder)

    files = [item for item in items if os.path.isfile(os.path.join(folder, item))]

    return len(files)

In [None]:
if mode == 'train' or mode == 'test':
    show_intermediate_images = False
    show_predicted_images_after_training = False


if mode == 'train' or mode == 'test':
    dataset_name = "birn_anatomical_part"

if mode == 'train' or mode == 'test':
    dataset_path = '/content/gdrive/MyDrive/Mestrado/Códigos/datasets/' + dataset_name + '/'
    n_cont = count_files(dataset_path + 'train')
    print("Number of train images: ", n_cont)


if mode == 'train':
    sample_image = tf.io.read_file(dataset_path + 'train/1.png')
    sample_image = tf.io.decode_png(sample_image, channels=3)
    print(sample_image.shape)

Number of train images:  1500


In [None]:
if mode == 'train':
    if show_intermediate_images:
        plt.figure()
        plt.imshow(sample_image)

def load(image_file):
    # Read and decode an image file to a uint8 tensor
    print("Start Load")
    print("real_file type: ", type(image_file))
    image = tf.io.read_file(image_file)
    image = tf.io.decode_png(image, channels=3)
    print("END STEP 1 LOAD")

    # Split each image tensor into two tensors:
    # - one with a real building facade image
    # - one with an architecture label image
    w = tf.shape(image)[1]
    w = w // 2
    input_image = image[:, w:, :]
    real_image = image[:, :w, :]

    # Convert both images to float32 tensors
    input_image = tf.cast(input_image, tf.float32)
    real_image = tf.cast(real_image, tf.float32)
    file_path = image_file
    print("input_image type: ", type(input_image))
    print("real_image type: ", type(real_image))
    print("file_path type: ", type(file_path))
    print("END STEP 2 LOAD")
    return input_image, real_image, file_path

In [None]:
if mode == 'train' or mode == 'test':
    input1, output1, img_path = load(dataset_path + 'test/1.png')
    IMG_HEIGHT, IMG_WIDTH = input1.shape[:2]
    print("Image Height: ", IMG_HEIGHT)
    print("Image Width: ", IMG_WIDTH)
    if show_intermediate_images:
        plt.figure()
        plt.imshow(input1 / 255.0)
        plt.title("Example of image to be used in the cGAN input")
        plt.show()
        plt.figure()
        plt.imshow(output1 / 255.0)
        plt.title("Example of image to be generated in the cGAN output")
        plt.show()

if mode == 'train':
    inp, re, img_path = load(dataset_path + 'train/1.png')
    print("Input Shape: ", inp.shape)
    if show_intermediate_images:
        # Casting to int for matplotlib to display the images
        plt.figure()
        plt.imshow(inp / 255.0)
        plt.title("Example of image to be used in the cGAN input")
        plt.show()
        plt.figure()
        plt.imshow(re / 255.0)
        plt.title("Example of image to be generated in the cGAN output")
        plt.show()

if mode == 'train' or mode == 'test':
    # The facade training set consist of 400 images
    BUFFER_SIZE = int(n_cont)
    # The batch size of 1 produced better results for the U-Net in the original pix2pix experiment
    BATCH_SIZE = 1

Start Load
real_file type:  <class 'str'>
END STEP 1 LOAD
input_image type:  <class 'tensorflow.python.framework.ops.EagerTensor'>
real_image type:  <class 'tensorflow.python.framework.ops.EagerTensor'>
file_path type:  <class 'str'>
END STEP 2 LOAD
Image Height:  512
Image Width:  512


In [None]:
# Esse conjunto de funções aumenta o tamanho das imagens e depois corta trechos da imagem aumentada do tamanho da imagem original, isso permite
#maior variedade de dados de treinamento inserindo leves variaçoes na imagem

def resize(input_image, real_image, height, width):
    input_image = tf.image.resize(input_image, [height, width],
    method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    real_image = tf.image.resize(real_image, [height, width],
    method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    return input_image, real_image

def random_crop(input_image, real_image):
    stacked_image = tf.stack([input_image, real_image], axis=0)
    cropped_image = tf.image.random_crop(\
    stacked_image, size=[2, IMG_HEIGHT, IMG_WIDTH, 3])
    return cropped_image[0], cropped_image[1]

In [None]:
# Faz um teste, abrindo a segunda imagem e realizando as opreações resize e random_crop
if mode == 'train':
    input1, output1, img_path = load(dataset_path + 'train/2.png')
    input1, output1 = resize(input1, output1, IMG_HEIGHT + 30, IMG_WIDTH + 30)
    input1, output1 = random_crop(input1, output1)
    if show_intermediate_images:
        plt.figure()
        plt.imshow(input1 / 255.0)
        plt.title("Example of image to be used in the cGAN input")
        plt.show()
        plt.figure()
        plt.imshow(output1 / 255.0)
        plt.title("Example of image to be generated in the cGAN output")
        plt.show()

In [None]:
# Normaliza os valores para ficarem entre -1 e 1
def normalize(input_image, real_image):

    input_image = (input_image / 127.5) - 1
    real_image = (real_image / 127.5) - 1

    return input_image, real_image

In [None]:
# Usa as funções resize e random_crop  para aumentar a variedade das imagens além de espelhar algumas imagens.
@tf.function()
def random_jitter(input_image, real_image):

    input_image, real_image = resize(input_image, real_image, IMG_HEIGHT + 30, IMG_WIDTH + 30)
    input_image, real_image = random_crop(input_image, real_image)

    # Aleatoriamente espelha algumas imagens em volta do eixo vertical, aumentando a variedade e diminuindo overfitting

    if tf.random.uniform(()) > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        real_image = tf.image.flip_left_right(real_image)
    return input_image, real_image

In [None]:
if mode == 'train':
    data_augmentation = tf.keras.Sequential([tf.keras.layers.RandomFlip("horizontal_and_vertical"),
                                            tf.keras.layers.RandomRotation(0.2),])

In [None]:
def augment_images(input, target):
    input = data_augmentation(input)
    target = data_augmentation(target)
    return input, target

In [None]:
# Visualiza a imagem após aplicar a função random_jitter
if mode == 'train':
    if show_intermediate_images:
        plt.figure(figsize=(6, 6))
        for i in range(4):
            rj_inp, rj_re = random_jitter(inp, re)
            plt.subplot(2, 2, i + 1)
            plt.imshow(rj_inp / 255.0)
            plt.axis('off')
        plt.show()

In [None]:
# As três funções abaixo carregam os conjuntos de dados de treino, teste e validação respectivamente, realizando as operações de random_jitter e normalização
def load_image_train(image_file):
    input_image, real_image, file_path = load(image_file)
    input_image, real_image = random_jitter(input_image, real_image)
    # input_image, real_image = augment_images(input_image, real_image) Rotacionar as imagens estava criando muito ruído e atrapalhando o treinamento pois as imagens sempre estão na mesma posição
    input_image, real_image = normalize(input_image, real_image)
    return input_image, real_image, file_path

In [None]:
def load_image_test(image_file):
    input_image, real_image, file_path = load(image_file)
    input_image, real_image = resize(input_image, real_image, IMG_HEIGHT, IMG_WIDTH)
    input_image, real_image = normalize(input_image, real_image)
    return input_image, real_image, file_path

In [None]:
def load_image_val(image_file):
    input_image, real_image, file_path = load(image_file)
    input_image, real_image = resize(input_image, real_image, IMG_HEIGHT, IMG_WIDTH)
    input_image, real_image = normalize(input_image, real_image)
    return input_image, real_image, file_path

In [None]:
# Mapeia e separa os conjuntos em batches
if mode == 'train':
    train_dataset = tf.data.Dataset.list_files(dataset_path + 'train/*.png')
    train_dataset = train_dataset.map(load_image_train, num_parallel_calls=tf.data.AUTOTUNE)
    train_dataset = train_dataset.shuffle(BUFFER_SIZE)
    train_dataset = train_dataset.batch(BATCH_SIZE)
    train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
    try:
        val_dataset = tf.data.Dataset.list_files(dataset_path + 'val/*.png')
    except tf.errors.InvalidArgumentError:
        print("ERROR")
    val_dataset = val_dataset.map(load_image_val, num_parallel_calls=tf.data.AUTOTUNE)
    val_dataset = val_dataset.batch(BATCH_SIZE)
    val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)



if mode == 'train' or mode == 'test':
    try:
        test_dataset = tf.data.Dataset.list_files(dataset_path + 'test/*.png')
    except tf.errors.InvalidArgumentError:
        print("ERROR")
    test_dataset = test_dataset.map(load_image_test)
    test_dataset = test_dataset.batch(BATCH_SIZE)
    test_dataset = test_dataset.prefetch(tf.data.AUTOTUNE)



if mode == 'train':
    OUTPUT_CHANNELS = 3


Start Load
real_file type:  <class 'tensorflow.python.framework.ops.SymbolicTensor'>
END STEP 1 LOAD
input_image type:  <class 'tensorflow.python.framework.ops.SymbolicTensor'>
real_image type:  <class 'tensorflow.python.framework.ops.SymbolicTensor'>
file_path type:  <class 'tensorflow.python.framework.ops.SymbolicTensor'>
END STEP 2 LOAD


In [None]:
# Função que cria o modelo downsample
def downsample(filters, size, apply_batchnorm = True, kernel_regularizer=None):

    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()

    result.add(tf.keras.layers.Conv2D(filters, size, strides = 2, padding = 'same',
                                        kernel_initializer = initializer,
                                        kernel_regularizer = kernel_regularizer,
                                        use_bias = not apply_batchnorm))

    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())

    result.add(tf.keras.layers.LeakyReLU())
    return result

In [None]:
# Apresenta uma imagem após aplicar o downsample

if mode == 'train':
    down_model = downsample(3, 4)
    down_result = down_model(tf.expand_dims(inp, 0))
    print(inp.shape)
    print(down_result.shape)
    print(np.min(inp))
    print(np.max(inp))
    print(np.min(down_result))
    print(np.max(down_result))
    x = down_result[0, :, :, :]
    x -= np.min(x)
    x /= np.max(x)
    print(np.min(inp))
    print(np.max(inp))
    print(np.min(x))
    print(np.max(x))
    if show_intermediate_images:
        plt.imshow(inp / 255.0)
        plt.imshow(x)

In [None]:
# Apresenta uma imagem que será usada como input da GAN, com a realização de operações de resize e random_crop
# Também apresenta uma imagem após a realização do downsample

if mode == 'train':
    input1, output1, img_path = load(dataset_path + 'train/1.png')
    input1, output1 = resize(input1, output1, IMG_HEIGHT + 30, IMG_WIDTH + 30)
    input1, output1 = random_crop(input1, output1)
    if show_intermediate_images:
        plt.figure()
        plt.imshow(input1 / 255.0)
        plt.title("Example of image to be used in the cGAN input")
        plt.show()
        input1, output1 = normalize(input1, output1)
        down_result = down_model(tf.expand_dims(input1, 0))
        # plt.figure()
        # plt.imshow((down_result + 1) / 2 * 255.0)
        # plt.title("Example of the image after the downsample model (before training)")
        # plt.show()
        print(input1.shape)
        print(down_result.shape)
        x = np.zeros(shape = (down_result.shape[1], down_result.shape[2], down_result.shape[3]))
        print(x.shape)
        x[:, :, :] = down_result[0, :, :, :]
        plt.figure()
        plt.imshow(x)
        plt.title("Example of the downsample module output (before training)")
        plt.show()

In [None]:
# Função que cria o modelo upsample

def upsample(filters, size, apply_dropout=False):

    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()

    result.add(tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                padding='same',
                kernel_initializer=initializer,
                use_bias=False))

    result.add(tf.keras.layers.BatchNormalization())

    if apply_dropout:
        result.add(tf.keras.layers.Dropout(0.5))
        result.add(tf.keras.layers.ReLU())
    return result

In [None]:
# Cria um modelo upsample
if mode == 'train':
    up_model = upsample(3, 4)
    up_result = up_model(down_result)
    print (up_result.shape)

In [None]:
# Cria o gerador baseado em um modelo U-net com uma sequência de downsamples e upsamples que evidencia as características da imagem
def Generator():
    inputs = tf.keras.layers.Input(shape=[IMG_HEIGHT, IMG_WIDTH, 3])  # Entrada do modelo

    # Define o down_stack (as camadas de downsampling)
    down_stack = [
        downsample(64, 4, apply_batchnorm=False),  # (batch_size, IMG_HEIGHT/2, IMG_WIDTH/2, 64)
        downsample(128, 4),                        # (batch_size, IMG_HEIGHT/4, IMG_WIDTH/4, 128)
        downsample(256, 4),                        # (batch_size, IMG_HEIGHT/8, IMG_WIDTH/8, 256)
        downsample(512, 4),                        # (batch_size, IMG_HEIGHT/16, IMG_WIDTH/16, 512)
        downsample(512, 4),                        # (batch_size, IMG_HEIGHT/32, IMG_WIDTH/32, 512)
        downsample(512, 4),                        # (batch_size, IMG_HEIGHT/64, IMG_WIDTH/64, 512)
        downsample(512, 4),                        # (batch_size, IMG_HEIGHT/128, IMG_WIDTH/128, 512)
        downsample(512, 4),                        # (batch_size, IMG_HEIGHT/256, IMG_WIDTH/256, 512)
    ]


    # Passa a entrada pelas camadas de downsampling
    x = inputs
    skips = []  # Para armazenar conexões de salto (skip connections)
    for down in down_stack:
        x = down(x)  # Chama cada camada no down_stack
        skips.append(x)

    # Remove a última conexão de salto, pois ela não é usada no upsampling
    skips = reversed(skips[:-1])

    # Define o up_stack (as camadas de upsampling)
    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (batch_size, IMG_HEIGHT/128, IMG_WIDTH/128, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, IMG_HEIGHT/64, IMG_WIDTH/64, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, IMG_HEIGHT/32, IMG_WIDTH/32, 1024)
        upsample(512, 4),                      # (batch_size, IMG_HEIGHT/16, IMG_WIDTH/16, 1024)
        upsample(256, 4),                      # (batch_size, IMG_HEIGHT/8, IMG_WIDTH/8, 512)
        upsample(128, 4),                      # (batch_size, IMG_HEIGHT/4, IMG_WIDTH/4, 256)
        upsample(64, 4),                       # (batch_size, IMG_HEIGHT/2, IMG_WIDTH/2, 128)
    ]



    # Passa pelos upsampling layers e conecta com os skips
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])

    # Última camada de saída
    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv2DTranspose(
        3, 4, strides=2, padding='same', kernel_initializer=initializer, activation='tanh',
        kernel_regularizer=tf.keras.regularizers.L2(1e-4)  # Adiciona regularização L2
    )  # (batch_size, IMG_HEIGHT, IMG_WIDTH, 3)

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
# Cria uma visualização gráfica do modelo gerador
# if mode == 'train':
#     generator = Generator()
#     tf.keras.utils.plot_model(generator, to_file = 'generator.png', show_shapes=True, dpi=64)
#     if show_intermediate_images:
#         x = plt.imread('generator.png')
#         plt.imshow(x, cmap = 'gray')
#         plt.show()


# Aplica o modelo a uma imagem exemplo e apresenta o resultado
if mode == 'train':
    generator = Generator()
    generator.summary()
    gen_output = generator(inp[tf.newaxis, ...], training=False)
    if show_intermediate_images:
        plt.imshow(gen_output[0, ...])


if mode == 'train':
    plot_path = os.path.join('/content/gdrive/MyDrive/Mestrado/Códigos/Plot/', "unet_diagram.png")
    plot_model(generator, to_file=plot_path, show_shapes=True, expand_nested=True)

    print(f"Diagrama salvo em: {plot_path}")


if mode == 'train':
    LAMBDA = 150

if mode == 'train':
    loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
# retorna os valores de loss do generator que são usados para ajustar a rede
def generator_loss(disc_generated_output, gen_output, target):
 gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)
 l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
 total_gen_loss = gan_loss + (LAMBDA * l1_loss)
 return total_gen_loss, gan_loss, l1_loss


In [None]:
# Função que cria o modelo discriminador
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)

    # Define as entradas com base em IMG_HEIGHT e IMG_WIDTH
    inp = tf.keras.layers.Input(shape=[IMG_HEIGHT, IMG_WIDTH, 3], name='input_image')
    tar = tf.keras.layers.Input(shape=[IMG_HEIGHT, IMG_WIDTH, 3], name='target_image')

    # Calcula a diferença absoluta entre as imagens
    diff = tf.keras.layers.Subtract()([tar, inp])  # Calcula tar - inp

    # Concatena as imagens de entrada, alvo e a diferença
    x = tf.keras.layers.concatenate([inp, tar, diff])  # (batch_size, IMG_HEIGHT, IMG_WIDTH, 9)

    # Define o down_stack com dropout e regularização L2
    down1 = downsample(64, 4, apply_batchnorm=False)(x)
    down2 = tf.keras.layers.Dropout(0.5)(downsample(128, 4, kernel_regularizer=tf.keras.regularizers.L2(1e-4))(down1))
    down3 = tf.keras.layers.Dropout(0.5)(downsample(256, 4, kernel_regularizer=tf.keras.regularizers.L2(1e-4))(down2))
    down4 = downsample(512, 4, kernel_regularizer=tf.keras.regularizers.L2(1e-4))(down3)
    down5 = tf.keras.layers.Dropout(0.5)(downsample(512, 4, kernel_regularizer=tf.keras.regularizers.L2(1e-4))(down4))
    down6 = downsample(512, 4, kernel_regularizer=tf.keras.regularizers.L2(1e-4))(down5)
    down7 = tf.keras.layers.Dropout(0.5)(downsample(512, 4, kernel_regularizer=tf.keras.regularizers.L2(1e-4))(down6))

    # Zero padding para manter as dimensões ao aplicar convoluções finais
    zero_pad1 = tf.keras.layers.ZeroPadding2D()(down7)
    conv = tf.keras.layers.Conv2D(
        512, 4, strides=1, kernel_initializer=initializer, use_bias=False,
        kernel_regularizer=tf.keras.regularizers.L2(1e-4)
    )(zero_pad1)

    # Batch normalization e ativação Leaky ReLU
    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)
    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

    # Segunda camada de zero padding e convolução final para classificar real/fake
    zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)
    last = tf.keras.layers.Conv2D(
        1, 4, strides=1, kernel_initializer=initializer,
        kernel_regularizer=tf.keras.regularizers.L2(1e-4)
    )(zero_pad2)

    return tf.keras.Model(inputs=[inp, tar], outputs=last)


In [None]:
# visualiza o modelo discriminador de forma gráfica
if mode == 'train':
    discriminator = Discriminator()
    discriminator.summary()
#     plot_path = os.path.join('/content/gdrive/MyDrive/Mestrado/Códigos/Plot/', "disc_diagram.png")
#     plot_model(discriminator, to_file=plot_path, show_shapes=True, expand_nested=True)

#     print(f"Diagrama salvo em: {plot_path}")


# Apresenta a saida do discriminador de forma gráfica
# if mode == 'train':
#     disc_out = discriminator([inp[tf.newaxis, ...], gen_output], training=False)
#     if show_intermediate_images:
#         plt.imshow(disc_out[0, ..., -1], vmin=-20, vmax=20, cmap='RdBu_r')
#         plt.colorbar()

In [None]:
# Calcula a perda (loss) do modelo discriminador
def discriminator_loss(disc_real_output, disc_generated_output):
 real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

 generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

 total_disc_loss = real_loss + generated_loss

 return total_disc_loss

In [None]:
# Instância o otimizador Adam para os modelos

if mode == 'train':
    generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)


# Cria um objeto que salva o estado dos modelos e dos otimizadores em checkpoints
if mode == 'train':
    checkpoint_dir = '/content/gdrive/MyDrive/Mestrado/Códigos/Checkpoints'
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
    checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                    discriminator_optimizer=discriminator_optimizer,
                                    generator=generator,
                                    discriminator=discriminator)


    # latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
    # if latest_checkpoint and os.path.exists(latest_checkpoint + ".index"):
    #     checkpoint.restore(latest_checkpoint).expect_partial()
    #     print(f"Checkpoint restored from: {latest_checkpoint}")
    # else:
    #     print("Checkpoint not found")

In [None]:
# Gera o conjunto de imagens resultado em uma única imagem

def generate_images(model, test_input, tar, save_path):
 prediction = model(test_input, training=True)
 plt.figure(figsize=(15, 15))

 display_list = [test_input[0], tar[0], prediction[0]]
 title = ['Input Image', 'Ground Truth', 'Predicted Image']

 for i in range(3):
     plt.subplot(1, 3, i+1)
     plt.title(title[i])
     # Getting the pixel values in the [0, 1] range to plot.
     plt.imshow(display_list[i] * 0.5 + 0.5)
     plt.axis('off')
     #plt.show()

In [None]:
def generate_images_file(model, test_input, tar, file_name, n, comp = 0):
    prediction = model(test_input, training=True)

    display_list = [test_input[0], tar[0], prediction[0]]
    title = ['Input Image', 'Ground Truth', 'Predicted Image']

    # 🔹 Salvando as imagens individuais no tamanho correto (512x512)
    for i in range(3):
        save_path = f"{file_name}_{title[i]}.png"
        mpimg.imsave(save_path, display_list[i].numpy() * 0.5 + 0.5, cmap='gray')

    # 🔹 Criando a imagem combinada com as 3 imagens lado a lado

    if comp == 1:
        for i in range(3):
            plt.subplot(1, 3, i+1)
            plt.title(title[i])
            # Getting the pixel values in the [0, 1] range to plot.
            plt.imshow(display_list[i] * 0.5 + 0.5)
            plt.axis('off')
            if i == 2:
                plt.savefig(file_name + '.png', dpi=300, bbox_inches = 'tight')



In [None]:
# if mode == 'train':
#     if show_intermediate_images:
#         for example_input, example_target in test_dataset.take(1):
#          generate_images(generator, example_input, example_target)

# Cria logs com detalhamento dos treinamentos
if mode == 'train':
    log_dir="logs/"
    summary_writer = tf.summary.create_file_writer(
     log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
# Realiza o treinamento do modelo
@tf.function
def train_step(input_image, target, step):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        # Gera a imagem de saída do gerador
        gen_output = generator(input_image, training=True)

        # Recebe os pares de imagem reais e falsos com os respectivos rótulos
        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)

        # Calcula as perdas do gerador e do discriminador
        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

        # Calcula os gradientes e aplica a otimização
        generator_gradients = gen_tape.gradient(gen_total_loss, generator.trainable_variables)
        discriminator_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

        generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
        discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))

    # Calcula a acurácia do discriminador para as imagens reais e geradas
    real_accuracy = tf.reduce_mean(tf.cast(disc_real_output > 0.5, tf.float32))
    fake_accuracy = tf.reduce_mean(tf.cast(disc_generated_output <= 0.5, tf.float32))
    disc_accuracy = (real_accuracy + fake_accuracy) / 2

    # Registra os resultados no TensorBoard
    with summary_writer.as_default():
        tf.summary.scalar('gen_total_loss', gen_total_loss, step=step//1000)
        tf.summary.scalar('gen_gan_loss', gen_gan_loss, step=step//1000)
        tf.summary.scalar('gen_l1_loss', gen_l1_loss, step=step//1000)
        tf.summary.scalar('disc_loss', disc_loss, step=step//1000)
        tf.summary.scalar('disc_accuracy', disc_accuracy, step=step//1000)

    # Retorna as perdas e acurácias
    return gen_total_loss, gen_gan_loss, gen_l1_loss, disc_loss, disc_accuracy



In [None]:
# Realiza a validação do modelo
@tf.function
def val_step(input_image, target, step):
    # Gera a saída do gerador
    gen_output = generator(input_image, training=False)

    # Calcula as saídas do discriminador para as imagens reais e geradas
    disc_real_output = discriminator([input_image, target], training=False)
    disc_generated_output = discriminator([input_image, gen_output], training=False)

    # Calcula as perdas do gerador e do discriminador
    gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
    disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    # Calcula a acurácia do discriminador para as imagens reais e geradas
    real_accuracy = tf.reduce_mean(tf.cast(disc_real_output > 0.5, tf.float32))
    fake_accuracy = tf.reduce_mean(tf.cast(disc_generated_output <= 0.5, tf.float32))
    disc_accuracy = (real_accuracy + fake_accuracy) / 2

    # Registra os resultados no TensorBoard
    with summary_writer.as_default():
        tf.summary.scalar('val_gen_total_loss', gen_total_loss, step=step//1000)
        tf.summary.scalar('val_gen_gan_loss', gen_gan_loss, step=step//1000)
        tf.summary.scalar('val_gen_l1_loss', gen_l1_loss, step=step//1000)
        tf.summary.scalar('val_disc_loss', disc_loss, step=step//1000)
        tf.summary.scalar('val_disc_accuracy', disc_accuracy, step=step//1000)

    # Retorna as perdas e acurácias
    return gen_total_loss, gen_gan_loss, gen_l1_loss, disc_loss, disc_accuracy


In [None]:
# Apaga checkpoints antigos caso um novo seja criado

def remove_old_checkpoints(checkpoint_dir, keep_latest=1):
    checkpoints = glob.glob(os.path.join(checkpoint_dir, "ckpt-*"))
    checkpoints.sort()


    for checkpoint in checkpoints[:-keep_latest]:
        os.remove(checkpoint)
        print(f"Checkpoint removed: {checkpoint}")

In [None]:
import time
import tensorflow as tf

def fit(train_ds, val_ds, steps):
    start = time.time()

    # Listas para armazenar métricas de treinamento
    gen_total_loss_list = []
    gen_gan_loss_list = []
    gen_l1_loss_list = []
    disc_loss_list = []
    disc_accuracy_list = []

    # Listas para armazenar métricas de validação (serão atualizadas apenas a cada 100 épocas)
    val_gen_total_loss_list = []
    val_gen_gan_loss_list = []
    val_gen_l1_loss_list = []
    val_disc_loss_list = []
    val_disc_accuracy_list = []

    latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)

    if latest_checkpoint:
        print(f"Checkpoint encontrado: {latest_checkpoint}")
    else:
        print("Nenhum checkpoint encontrado. O treinamento será iniciado do zero.")

    for step, (input_image, target, file_path) in train_ds.repeat().take(steps).enumerate():
        # Realiza o treinamento normal
        gen_total_loss, gen_gan_loss, gen_l1_loss, disc_loss, disc_accuracy = train_step(input_image, target, step)

        # Salva métricas de treinamento
        gen_total_loss_list.append(gen_total_loss.numpy())
        gen_gan_loss_list.append(gen_gan_loss.numpy())
        gen_l1_loss_list.append(gen_l1_loss.numpy())
        disc_loss_list.append(disc_loss.numpy())
        disc_accuracy_list.append(disc_accuracy.numpy())

        # Executa a validação APENAS a cada 100 épocas
        if (step + 1) % 100 == 0:
            val_gen_total_loss, val_gen_gan_loss, val_gen_l1_loss, val_disc_loss, val_disc_accuracy = 0, 0, 0, 0, 0
            val_steps = 0

            for val_input_image, val_target, val_path in val_ds:
                v_gen_total_loss, v_gen_gan_loss, v_gen_l1_loss, v_disc_loss, v_disc_accuracy = val_step(val_input_image, val_target, step)

                val_gen_total_loss += v_gen_total_loss
                val_gen_gan_loss += v_gen_gan_loss
                val_gen_l1_loss += v_gen_l1_loss
                val_disc_loss += v_disc_loss
                val_disc_accuracy += v_disc_accuracy
                val_steps += 1

            # Calcula médias das métricas de validação
            val_gen_total_loss /= val_steps
            val_gen_gan_loss /= val_steps
            val_gen_l1_loss /= val_steps
            val_disc_loss /= val_steps
            val_disc_accuracy /= val_steps

            # Adiciona os valores médios às listas de validação
            val_gen_total_loss_list.append(val_gen_total_loss.numpy())
            val_gen_gan_loss_list.append(val_gen_gan_loss.numpy())
            val_gen_l1_loss_list.append(val_gen_l1_loss.numpy())
            val_disc_loss_list.append(val_disc_loss.numpy())
            val_disc_accuracy_list.append(val_disc_accuracy.numpy())

            print(f"Validação na época {step + 1}: Gen Loss {val_gen_total_loss.numpy():.4f}, Disc Loss {val_disc_loss.numpy():.4f}")

        print(f"{step.numpy()}/{steps - 1}")

        # Salva o checkpoint no final do treinamento
        if step + 1 == int(steps * 1) and save_checkpoint == 1:
            remove_old_checkpoints(checkpoint_dir, keep_latest=1)
            checkpoint.save(file_prefix=checkpoint_prefix)
            print("Checkpoint Salvo!")

    return {
        "steps": steps,
        "train_metrics": {
            "gen_total_loss": gen_total_loss_list,
            "gen_gan_loss": gen_gan_loss_list,
            "gen_l1_loss": gen_l1_loss_list,
            "disc_loss": disc_loss_list,
            "disc_accuracy": disc_accuracy_list,
        },
        "val_metrics": {
            "gen_total_loss": val_gen_total_loss_list,
            "gen_gan_loss": val_gen_gan_loss_list,
            "gen_l1_loss": val_gen_l1_loss_list,
            "disc_loss": val_disc_loss_list,
            "disc_accuracy": val_disc_accuracy_list,
        }
    }


In [None]:
# Apresenta os tipos de cada um dos conjuntos de dados
if mode == 'train':
    print("BUFFER_SIZE = ", BUFFER_SIZE)
    print("Type train_dataset: ", type(train_dataset))
    print("Shape train_dataset: ", np.shape(train_dataset))
    print("Type test_dataset: ", type(test_dataset))
    print("Shape test_dataset: ", np.shape(test_dataset))
    print("Type val_dataset: ", type(val_dataset))
    print("Shape val_dataset: ", np.shape(val_dataset))

In [None]:
# Aplica o modelo as conjuntos de treino e validação obtendo as métricas de qualidade
if mode == 'train':
    print('Starting the training stage.')
    epochs = 10000
    ##################
    result = fit(train_dataset, val_dataset, steps=epochs)

    # steps, gen_total_loss_list, gen_gan_loss_list, gen_l1_loss_list, disc_loss_list, disc_accuracy_list, val_gen_total_loss_list, val_gen_gan_loss_list, val_gen_l1_loss_list, val_disc_loss_list, val_disc_accuracy_list = fit(train_dataset, val_dataset, steps=epochs)
    ##################
    print('Training finished.')

    gen_total_loss_list = result["train_metrics"]["gen_total_loss"]
    gen_gan_loss_list = result["train_metrics"]["gen_gan_loss"]
    gen_l1_loss_list = result["train_metrics"]["gen_l1_loss"]
    disc_loss_list = result["train_metrics"]["disc_loss"]
    disc_accuracy_list = result["train_metrics"]["disc_accuracy"]

    # Métricas de validação
    val_gen_total_loss_list = result["val_metrics"]["gen_total_loss"]
    val_gen_gan_loss_list = result["val_metrics"]["gen_gan_loss"]
    val_gen_l1_loss_list = result["val_metrics"]["gen_l1_loss"]
    val_disc_loss_list = result["val_metrics"]["disc_loss"]
    val_disc_accuracy_list = result["val_metrics"]["disc_accuracy"]




# if mode == 'train':
#     # Restoring the latest checkpoint in checkpoint_dir
#     checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))


In [None]:
# Salva o modelo

if mode == 'train':
    generator.save('/content/gdrive/MyDrive/Mestrado/Códigos/Modelo/generator_model.keras')


In [None]:
# Salva as iamgens de qualquer plot
def save_plot(x, y_train, y_val, xlabel, ylabel, title, filename, label1, label2):
    plt.figure(figsize=(10, 6))
    plt.plot(x, y_train, label=label1)
    plt.plot(x, y_val, label=label2)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.legend()
    plt.savefig(os.path.join(plot_folder, filename), dpi=300, bbox_inches='tight', pad_inches=0)
    plt.close()

In [None]:
def pad_with_zeros(arr1, arr2):
    len1, len2 = len(arr1), len(arr2)

    if len1 >= len2:
        return arr1

    # Criar um array de saída preenchido com zeros
    new_arr = np.zeros(len2, dtype=int)

    # Determinar os índices onde os valores de arr1 serão inseridos
    insert_positions = np.linspace(0, len2 - 1, num=len1, dtype=int)

    # Inserir os valores de arr1 nas posições calculadas
    for i, pos in enumerate(insert_positions):
        new_arr[pos] = arr1[i]

    return new_arr

In [None]:
# Salva as imagens dos plots de loss para o gerador e para o discriminador

if mode == 'train':

    plot_folder = '/content/gdrive/MyDrive/Mestrado/Códigos/Plot/'
    if not os.path.exists(plot_folder):
        os.makedirs(plot_folder)

    # Imprime informações de debug
    print("type gen: ", type(gen_total_loss_list))
    print("type disc: ", type(disc_loss_list))
    print("type val: ", type(val_gen_total_loss_list))
    print("size gen: ", len(gen_total_loss_list))
    print("size disc: ", len(disc_loss_list))
    print("size val: ", len(val_gen_total_loss_list))
    print(val_gen_total_loss_list)

    # Define os steps
    steps = list(range(len(gen_total_loss_list)))

    val_gen_total_loss_list = pad_with_zeros(val_gen_total_loss_list, gen_total_loss_list)
    val_disc_loss_list = pad_with_zeros(val_disc_loss_list, disc_loss_list)

    print("Ajuste no tamanho")
    print("type gen: ", type(gen_total_loss_list))
    print("type disc: ", type(disc_loss_list))
    print("type val: ", type(val_gen_total_loss_list))
    print("size gen: ", len(gen_total_loss_list))
    print("size disc: ", len(disc_loss_list))
    print("size val: ", len(val_gen_total_loss_list))
    print(val_gen_total_loss_list)

    # Salva gráficos
    save_plot(steps, gen_total_loss_list, val_gen_total_loss_list, 'Step', 'Loss', 'Generator Total Loss', 'Generator_Total_Loss.png', 'Treinamento', 'Validação')
    save_plot(steps, disc_loss_list, val_disc_loss_list, 'Step', 'Loss', 'Discriminator Loss', 'Discriminator_Loss.png', 'Treinamento', 'Validação')
    save_plot(steps, gen_total_loss_list, disc_loss_list, 'Step', 'Loss', 'GenxDisc Loss', 'GenxDisc_Loss.png', 'Generator Loss', 'Discriminator Loss')
    plt.tight_layout()

    # Salva dados em CSV
    data = {
        'Step': steps,
        'Generator Total Loss': gen_total_loss_list,
        'Validation Generator Total Loss': val_gen_total_loss_list,
        'Discriminator Loss': disc_loss_list,
        'Validation Discriminator Loss': val_disc_loss_list,
    }

    df = pd.DataFrame(data)

    # Caminho do arquivo CSV
    csv_file = os.path.join(plot_folder, 'training_metrics.csv')

    # Salva o DataFrame no arquivo CSV
    df.to_csv(csv_file, index=False)

In [None]:
# Deleta os conteúdos de uma pasta


def delete_folder_content(folder_path):
    if os.path.exists(folder_path) and os.path.isdir(folder_path):
        contents = os.listdir(folder_path)

        for content in contents:
            content_path = os.path.join(folder_path, content)
            if os.path.isfile(content_path):
                os.remove(content_path)


In [None]:
# Carrega o modelo

model_save_path = '/content/gdrive/MyDrive/Mestrado/Códigos/Modelo/generator_model.keras'

if mode == 'test':
    from tensorflow.keras.models import load_model
    generator = load_model(model_save_path)

In [None]:
# Uso o conjunto de teste para observar o funcionamento do modelo e salva essas imagens em uma pasta


if mode == 'train' or mode == 'test':

    k = 0

    n_test = count_files(dataset_path + 'test')

    print(n_test)

    # n_test = 10

    image_folder = '/content/gdrive/MyDrive/Mestrado/Códigos/Generated Images'
    delete_folder_content(image_folder)
    file_path = '/content/gdrive/MyDrive/Mestrado/Códigos/Generated Images/Image'
    for inp, tar, fil in test_dataset.take(n_test):
        generate_images_file(generator, inp, tar, file_path + str(k) + '_', k, comp = 0)
        k += 1
        if show_predicted_images_after_training:
            generate_images(generator, inp, tar, file_path)

100
