## Treinamento do Pix2Pix
Implementação baseada em https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix
Desenvolvido por David Dória https://github.com/daversd para o programa
2021-2022 B-pro Architectural Design RC4

Pix2Pix é um modelo condicional de redes generativas adversárias (conditional generative adversarial network model - cGAN) que executa traduções imagem-para-imagem, aprendendo como executar esta operação a partir de pares de imagens (inputs e outputs esperados).
Esta implementação utiliza imagens de `256x256 px` para o input e output, esperando o set de treinamento para estar já formatado e localizado em uma pasta que contenha as pastas `/AB/train/` e `AB/test/`.

### Importar os pacotes necessários

In [44]:
import torch
import torch.onnx
from torch.utils.tensorboard import SummaryWriter
import pix2pix_helpers.util as util
from pix2pix_helpers.create_dataset import ImageFolderLoader
from pix2pix_helpers.pix2pix_model import Pix2PixModel
from matplotlib import pyplot as plt
import time
import os
import glob

### Configuração high level
Como regra geral, estas são as configurações que você precisará alterar.
Notas importantes:
- Utilize um set de treinamento que esteja na casa das centenas de imagens para ter um bom resultado. Quanto mais, melhor!
- Caso esteja testando múltiplos sets de treinamento, e queira ter controle sobre os diferentes resultados de cada um, lembre de atualizar o valor de `MODEL_NAME`. Caso não o faça, as informações produzidas anteriormente serão sobrescritas.
- 300 é um bom número para `EPOCHS` (épocas ou períodos de treinamento). Caso você utilize mais, acompanhe os resultados para garantir que os resultados não estão overfitting - quando o treinamento produz um modelo muito bom para o material específico de treinamento, mas incapaz de processar novas imagens.

In [45]:
TRAIN = False            # Determina se o programa deve executar o treinamento
TEST = False             # Determina se o programa deve executar o teste (carregando o último checkpoint)
TEST_SAMPLE = 10         # Quantidade de imagens para testar
WRITE_LOGS = True        # Determina se logs do tensorboard devem ser escritos para o disco
SAVE_CKPTS = True        # Determina se checkpoints dever ser salvos
SAVE_IMG_CKPT = True     # Determina se imagens do treinamento devem ser salvas para cada checkpoint
EXPORT_MODEL = True      # Determina se o modelo deve ser salvo (carregando o último checkpoint)

FOLDER_NAME = 'data/tracos'                             # O nome da pasta onde estão os arquivos de treinamento
MODEL_NAME = 'tracos_run_1'                             # O nome do modelo que será treinado (o material do treinamento será salvo usando esse nome)
LOAD_NUMBER = -1                                        # Número do checkpoint a ser carregado (-1 carrega o último)

EPOCHS = 100                # Quantidade de épocas de treinamento. Deve ser número par

PRINT_FREQ = 100            # Intervalo entre logs de treinamento no console, em passos
LOG_FREQ = 100              # Intervalo entre logs tensorboard, em passos
CKPT_FREQ = 10              # Intervalo entre checkpoints, em épocas

### Finalização de configuração

In [46]:
BATCH_SIZE = 1

CKPT_DIR = os.path.join('checkpoints', MODEL_NAME)      # Nome da pasta onde serão salvos os checkpoints
LOG_DIR = 'runs/' + MODEL_NAME                          # Nome da pasta onde serão salvos o logs do tensorboard
TEST_DIR = 'test/' + MODEL_NAME                         # Nome da pasta onde serão salvas as imagens de teste

# Create the required folders
if SAVE_CKPTS:
    if not os.path.isdir(CKPT_DIR):
        os.makedirs(CKPT_DIR)

if WRITE_LOGS:
    if not os.path.isdir(LOG_DIR):
        os.makedirs(LOG_DIR)

if SAVE_IMG_CKPT:
    if not os.path.isdir(TEST_DIR):
        os.makedirs(TEST_DIR)

# Initialize the log writer
if WRITE_LOGS:
    writer = SummaryWriter(log_dir=LOG_DIR)

DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

Função para carregamento de modelos

In [47]:
def load_model(model):
    """
    Loads the networks from the checkpoint specified in LOAD_NUMBER
    Use -1 to load the latest model.
    """
    
    list_of_files = glob.glob(CKPT_DIR + '/*.pth')

    if LOAD_NUMBER == -1:
        file_path = max(list_of_files, key=os.path.getctime)
        file_name = os.path.basename(file_path)
        file_number = file_name.split('_')[0]
        print(file_number)
    else:
        file_number = LOAD_NUMBER
    
    file_prefix = os.path.join(CKPT_DIR, str(file_number) + '_')
    netG_File = file_prefix + 'net_G.pth'
    netD_File = file_prefix + 'net_D.pth'
    
    files_exist = os.path.exists(netG_File) and os.path.exists(netD_File)
    assert files_exist, f"Checkpoint {LOAD_NUMBER} does not exist. Check '{CKPT_DIR}' to see available checkpoints"
    print(f"Loading model from checkpoint {file_number} \n"+ f"Generator is {netG_File} \n" + f"Discriminator is {netD_File}")

    model.load_networks(file_number)


### Programa principal
Treina o descriminador e gerador por `EPOCHS`, utilizando o material de treinamento presente em `FOLDER_NAME`. Checkpoints são salvos em `CKPT_DIR`. Informações sobre o status do treinamento são impressas no console e salvos pelo writer (caso definido que sim).

In [48]:
if TRAIN:
    # Create the training data set
    trainData = ImageFolderLoader(
        f"{FOLDER_NAME}/AB", phase='train', preprocess='none')
    trainSet = torch.utils.data.DataLoader(
        trainData, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

    # Create the pix2pix model
    model = Pix2PixModel(CKPT_DIR, MODEL_NAME, is_train=True,
                         n_epochs=EPOCHS/2, n_epochs_decay=EPOCHS/2)

    model.setup()
    total_iters = 0

    # Initiate the training iteration
    for epoch in range(EPOCHS):
        epoch_start_time = time.time()
        iter_data_time = time.time()
        epoch_iter = 0

        if epoch != 0:
            model.update_learning_rate()

        # Iterate through the data batches in the training set
        for i, data in enumerate(trainSet):
            iter_start_time = time.time()

            # Setup counters
            total_iters += BATCH_SIZE
            epoch_iter += BATCH_SIZE

            # Feed input through model, optimize parameters
            model.set_input(data)
            model.optimize_parameters()

            # Use this for logging losses in tensorboard
            if total_iters % PRINT_FREQ == 0:
                losses = model.get_current_losses()
                t_comp = (time.time() - iter_start_time) / BATCH_SIZE
                print(
                    f'Step {total_iters} | Epoch {epoch} | GAN Loss: {losses["G_GAN"]:.3f} | Gen. L1: {losses["G_L1"]:.3f} | Disc. real: {losses["D_real"]:.3f} | Disc. fake: {losses["D_fake"]:.3f}')

            # Use this to log to tensorboard
            if WRITE_LOGS and total_iters % LOG_FREQ == 0:
                losses = model.get_current_losses().items()
                for name, loss in losses:
                    writer.add_scalar(name, loss, total_iters)  # type: ignore
                writer.close()  # type: ignore

            iter_data_time = time.time()

        # Save checkpoints per epochs
        if SAVE_CKPTS and epoch % CKPT_FREQ == 0:
            print('Saving the model at the end of epoch %d, iters %d' %
                  (epoch, total_iters))
            model.save_network(epoch)

            # Save image per checkpoint
            if SAVE_IMG_CKPT:
                print('Saving current epoch test to test folder')
                visuals = model.get_current_visuals()
                save_path = os.path.join(
                    TEST_DIR, 'epoch_' + str(epoch) + '.jpg')
                util.save_visuals(visuals, save_path)

        # Print details at the end of the epoch
        print('End of epoch %d / %d \t Time Taken: %d secs' %
              (epoch, EPOCHS -1, time.time() - epoch_start_time))

    # Save / overwrite final epoch and image
    if SAVE_CKPTS:
        print('Saving the model at the end of training')
        model.save_network(epoch)

        if SAVE_IMG_CKPT:
            print('Saving final epoch test to test folder')
            visuals = model.get_current_visuals()
            save_path = os.path.join(TEST_DIR, 'epoch_' + str(epoch) + '.jpg')
            util.save_visuals(visuals, save_path)

    # Plot last visuals from the model once training is complete
    visuals = model.get_current_visuals()
    util.plot_visuals(visuals)


### Teste do modelo
Gera um leva de imagens, definidas por `TEST_SAMPLE`, utilizando as imagens que estão na pasta `test`. Carrega o modelo definido por `LOAD_NUMBER`. As imagens geradas são salvas em `TEST_DIR`.

In [49]:
if TEST:
        # Create the testing data set
        testData = ImageFolderLoader(f'{FOLDER_NAME}/AB', phase='test', flip=False, preprocess='none')
        testSet = torch.utils.data.DataLoader(testData, batch_size=BATCH_SIZE, shuffle= False, num_workers=0)

        # Create the pix2pix model in testing mode
        model = Pix2PixModel(CKPT_DIR, MODEL_NAME, is_train=False, n_epochs=EPOCHS/2, n_epochs_decay=EPOCHS/2)
        model.setup()
        model.eval()
        load_model(model)

        # Iterate through test data set, for the lenght of the test sample
        for i, data in enumerate(testSet):
            if i < TEST_SAMPLE:
                model.set_input(data)
                model.test()
                visuals = model.get_current_visuals()
                save_path = os.path.join(TEST_DIR, 'test_' + str(i) + '.jpg')
                util.save_visuals(visuals, save_path)
            else:
                break

### Exportação do modelo
Para utilizar o modelo fora deste ambiente de treinamento, é preciso exportá-lo. Um modelo exportado no formato `.onnx` pode ser utilizado pelo Unity utilizando seu pacote `Barracuda`.

In [50]:
if EXPORT_MODEL:
        # Create dummy input
        x = torch.randn(1, 3, 256, 256)

        # Create the model and load the latest checkpoint
        model = Pix2PixModel(CKPT_DIR, MODEL_NAME, is_train=False, n_epochs=EPOCHS/2, n_epochs_decay=EPOCHS/2)
        model.setup()
        model.eval()
        load_model(model)

        if not os.path.isdir('exported'):
            os.makedirs('exported')
        
        path = os.path.join('exported', f'{MODEL_NAME}.onnx')
        f = open(path, 'w+')

        torch.onnx.export(model.netG, x.to(DEVICE), path, training=torch.onnx.TrainingMode.EVAL, export_params=True, opset_version=10)

---------- Networks initialized -------------
[Network G] Total number of parameters : 54.414 M
Loading model from checkpoint 100 
Generator is checkpoints\tracos_run_1\100_net_G.pth 
Discriminator is checkpoints\tracos_run_1\100_net_D.pth
Loading the model from checkpoints\tracos_run_1\100_net_G.pth
