In [1]:

import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
from sklearn.metrics import mean_absolute_error as mae, mean_squared_error as mse, mean_absolute_percentage_error as mape
import numpy as np
# Batch size during training
import torch.optim as optim
import torch
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import os
import warnings
warnings.filterwarnings("ignore")




def asmape(y_true, y_pred, mask=None):
    if mask is not None:
         y_true, y_pred = y_true[mask==1], y_pred[mask==1]
    if type(y_true) is list or type(y_pred) is list:
         y_true, y_pred = np.array(y_true), np.array(y_pred)
    len_ = len(y_true)
    tmp = 100 * (np.nansum(np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred)))/len_)

    return tmp



class LoaderDataset(Dataset):
    def __init__(self, root_zebra, root_horse, root_masks, chanels=3):
        self.root_zebra = root_zebra
        self.root_horse = root_horse
        self.root_index = root_masks
        
        self.zebra_images = sorted(os.listdir(root_zebra))
        self.horse_images = sorted(os.listdir(root_horse))
        self.index = sorted(os.listdir(root_masks))

        self.length_dataset = max(len(self.zebra_images), len(self.horse_images))
        self.zebra_len = len(self.zebra_images)
        self.horse_len = len(self.horse_images)
        self.index_len = len(self.index)
        self.chanels = chanels

    def __len__(self):
        return self.length_dataset

    @staticmethod
    def custom_normalize(image):
        image = torch.tensor(image, dtype=torch.float32)
        min_val = torch.min(image)
        max_val = torch.max(image)
        scale = torch.clamp(max_val - min_val, min=1e-5)  # Evita divisão por zero
        image_normalized = 2 * (image - min_val) / scale - 1  # Escala para [-1, 1]
        return image_normalized, min_val, max_val

    def __getitem__(self, index):
        zebra_img = self.zebra_images[index % self.zebra_len]
        horse_img = self.horse_images[index % self.horse_len]
        index_ids = self.index[index % self.index_len]

        zebra_path = os.path.join(self.root_zebra, zebra_img)
        horse_path = os.path.join(self.root_horse, horse_img)
        index_path = os.path.join(self.root_index, index_ids)
        # print(zebra_path, horse_path, index_path)

        zebra_img = np.load(zebra_path)
        horse_img = np.load(horse_path)
        mask = np.load(index_path)

        if len(zebra_img.shape) > 3:
            zebra_img = zebra_img.reshape(32, 32, 3)
            horse_img = horse_img.reshape(32, 32, 3)

        zebra_img = np.transpose(zebra_img, (2, 0, 1))
        horse_img = np.transpose(horse_img, (2, 0, 1))

        if self.chanels == 2:
            zebra_img = zebra_img[:2, :, :]
            horse_img = horse_img[:2, :, :]
        elif self.chanels == 1:
            zebra_img = np.sum(zebra_img, axis=0, keepdims=True)
            horse_img = np.sum(horse_img, axis=0, keepdims=True)

        zebra_img, min_val_z, max_val_z = LoaderDataset.custom_normalize(zebra_img)
        horse_img, _, _ = LoaderDataset.custom_normalize(horse_img)

        mask = torch.tensor(mask, dtype=torch.float32)

        return zebra_img, horse_img.flatten(), min_val_z, max_val_z, mask
    

class Generator(nn.Module):
    def __init__(self, nz=3072, ngf=32, nc=3):
        super(Generator, self).__init__()
        self.nz = nz
        self.ngf = ngf
        self.nc = nc

        self.main = nn.Sequential(
            # Camada Linear para mapear o vetor de entrada para um tamanho de 8x8xngf*8
            nn.Linear(self.nz, self.ngf * 8 * 8 * 8),
            nn.ReLU(True),

            # Redimensiona para (ngf*8) x 8 x 8
            nn.Unflatten(1, (self.ngf * 8, 8, 8)),

            # Convoluções transpostas para aumentar gradualmente a resolução
            nn.ConvTranspose2d(self.ngf * 8, self.ngf * 4, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(self.ngf * 4),
            nn.ReLU(True),

            nn.ConvTranspose2d(self.ngf * 4, self.ngf * 2, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(self.ngf * 2),
            nn.ReLU(True),

            nn.ConvTranspose2d(self.ngf * 2, self.ngf, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(self.ngf),
            nn.ReLU(True),         
        
            # Ajuste final: reduzir a última camada para não aumentar o tamanho da imagem
            nn.ConvTranspose2d(self.ngf, self.nc, kernel_size=3, stride=1, padding=1, bias=False),
            nn.Tanh()  # Normaliza a saída entre [-1, 1]
        )

    def forward(self, input):
        return self.main(input)


class Discriminator(nn.Module):
    def __init__(self, nc=3, ndf=64):
        super(Discriminator, self).__init__()
    
        self.nc = nc
        self.ndf = ndf

        self.main = nn.Sequential(
            # Camada 1: Convolução com stride 2 reduz a imagem para (image_size/2) x (image_size/2)
            nn.Conv2d(self.nc, self.ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            
            # Camada 2: Convolução com stride 2 reduz a imagem para (image_size/4) x (image_size/4)
            nn.Conv2d(self.ndf, self.ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            
            # Camada 3: Convolução com stride 2 reduz a imagem para (image_size/8) x (image_size/8)
            nn.Conv2d(self.ndf * 2, self.ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            
            # Camada 4: Convolução com stride 2 reduz a imagem para (image_size/16) x (image_size/16)
            nn.Conv2d(self.ndf * 4, self.ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            
            # Camada final: Convolução com kernel de (2, 2) para reduzir a imagem para 1 x 1
            nn.Conv2d(self.ndf * 8, 1, 2, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input)


from torch.utils.data import Dataset
import os 
import numpy as np
import torch
import torch.nn as nn

def asmape(y_true, y_pred, mask=None):
    if mask is not None:
         y_true, y_pred = y_true[mask==1], y_pred[mask==1]
    if type(y_true) is list or type(y_pred) is list:
         y_true, y_pred = np.array(y_true), np.array(y_pred)
    len_ = len(y_true)
    tmp = 100 * (np.nansum(np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred)))/len_)

    return tmp


def validate(loader, netG, netD, device):
    netG.eval()
    netD.eval()
    
    total_loss_G = 0.0
    total_loss_D = 0.0
    num_batches = 0

    with torch.no_grad():
        for real_cpu, noise, _, _, masks in loader:
            real_cpu = real_cpu.to(device)
            noise = noise.to(device)
            masks = masks.to(device)
            masks = masks.view(-1, 1, 32, 32).float()  # Ajustar a máscara para o mesmo formato da imagem

            # Aplicar máscara na imagem real
            real_masked = real_cpu * masks  

            # Gerar imagens falsas e aplicar a máscara
            fake = netG(noise)
            fake_masked = fake * masks  

            # Avaliação do discriminador
            output_real = netD(real_masked).view(-1)
            output_fake = netD(fake_masked).view(-1)

            # Calcular perdas
            loss_D = discriminator_loss(output_real, output_fake)
            loss_G = generator_loss(output_fake)

            total_loss_D += loss_D.item()
            total_loss_G += loss_G.item()
            num_batches += 1

    avg_loss_D = total_loss_D / num_batches
    avg_loss_G = total_loss_G / num_batches
    print(f'Validation Loss_D: {avg_loss_D:.4f} \tValidation Loss_G: {avg_loss_G:.4f}')
    
    netG.train()
    netD.train()
    return avg_loss_G

    
# Funções de perda como exemplo

def discriminator_loss(real_output, fake_output):
    criterion = nn.BCEWithLogitsLoss()
    real_loss = criterion(real_output, torch.ones_like(real_output, device=real_output.device))
    fake_loss = criterion(fake_output, torch.zeros_like(fake_output, device=fake_output.device))
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    criterion = nn.BCEWithLogitsLoss()
    return criterion(fake_output, torch.ones_like(fake_output, device=fake_output.device))


def load_checkpoint(checkpoint_file, model, DEVICE):
    # print("=> Loading checkpoint")
    checkpoint = torch.load(checkpoint_file,weights_only=True, map_location=DEVICE)  # Carrega o estado salvo
    model.load_state_dict(checkpoint)  # Usa diretamente o dicionário de pesos
    return model

def test( gen, test_loader, taxa, fold, chanells):  
    gen.eval()
    with torch.no_grad():
        df = pd.DataFrame([],columns=['mae','asmape','mape','rmse','scale'], index=test_loader.dataset.horse_images)
        for (zebra, horse,std_val,mean_val, masks), name in zip(test_loader,test_loader.dataset.horse_images):
        
            zebra = zebra.to(device)
            horse = horse.to(device)
           
            # Converter k e min_val para tensores, mas movê-los para GPU somente se necessário
            std_val = torch.tensor(std_val) if not isinstance(std_val, torch.Tensor) else std_val
            mean_val = torch.tensor(mean_val) if not isinstance(mean_val, torch.Tensor) else mean_val
            
            # Gerar fake_zebra usando o gerador
            fake_zebra = gen(horse)
         
            # Mover apenas as imagens para CPU antes de operações subsequentes
            zebra = zebra.cpu()
            fake_zebra = fake_zebra.cpu()
            
             #Voltar para escala original 
            zebra = (zebra)*std_val + mean_val
            fake_zebra = (fake_zebra)* std_val + mean_val

             # Somar sobre o canal e achatar as imagens
            zebra = torch.sum(zebra, dim=1).flatten()
            fake_zebra = torch.sum(fake_zebra, dim=1).flatten()

            # Calcular as métricas
            zebra_np = zebra*masks
            fake_zebra_np = fake_zebra*masks

				    # Calcular as métricas corretamente
            mae_value = round(mae(zebra_np, fake_zebra_np), 3)
            mape_value = round(mape(zebra_np, fake_zebra_np) * 100, 3)
            rmse_value = round(np.sqrt(mse(zebra_np, fake_zebra_np)), 3)
            smape_value = round(asmape(zebra_np, fake_zebra_np,masks), 3)
            # Adicionar os resultados ao DataFram
            df.loc[name] = [mae_value, smape_value, mape_value, rmse_value, np.max(zebra.numpy()) - np.min(zebra.numpy())]

				# Salva o DataFrame em um arquivo CSV
        directory =  "./resultados/resultados_dc"
        if not os.path.exists(directory):
          os.makedirs(directory)
        df.to_csv(rf'{directory}/result_{str(chanells)}c_{taxa}_{fold}.csv')


import time

# import torch.multiprocessing as mp
TRAIN_DIR = os.path.abspath("../dataset_final")  
VAL_DIR = os.path.abspath("../dataset_final")  
INDEX_TRAIN = os.path.abspath("../dataset_final")  
INDEX_VAL = os.path.abspath("../dataset_final")  
INDEX_TEST = os.path.abspath("../dataset_final")  

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def main(in_channels):
    
    for taxa in ['10','20','30','40']:
        for fold in ['1','2','3','4','5']:  
        
            test_dataset = LoaderDataset(
                  root_zebra=os.path.join( VAL_DIR, "label", str(taxa), "folds", f"fold{fold}", "test"),
									root_horse=os.path.join( VAL_DIR, "input", str(taxa), "folds", f"fold{fold}", "test"),
									root_masks=os.path.join(INDEX_TEST, "input", str(taxa), "folds", f"fold{fold}", "index"),
									chanels=in_channels
						)
            
            test_loader = DataLoader(
                    test_dataset,
                    batch_size=1,
                    shuffle=False,
                    pin_memory=False )
            
            #Treino        
           
        
            save_dir = f"./models_saved/dcgan/{in_channels}/{taxa}/fold{fold}"
            if not os.path.exists(save_dir):
                os.makedirs(save_dir)
                
            # Salvar modelo
            model_path = os.path.join(save_dir, "generator.pth")
            generator = Generator(nz=in_channels*1024, nc=in_channels).to(device)

            # Teste
            
            test(generator,test_loader=test_loader,taxa=taxa,fold=fold,chanells=in_channels)

if __name__ == '__main__':
    start_time = time.time()
  # Necessário no Windows para compatibilidade
    for i in [1,2,3]:
        print(f'{i} channels: Iniciando o teste ....')
        main(i)
    total_time = time.time() - start_time
    print(f'{total_time/3600} horas')

1 channels: Iniciando o teste ....
2 channels: Iniciando o teste ....
3 channels: Iniciando o teste ....
0.00945029358069102 horas
