In [None]:
import torch
import torch.nn as nn

import numpy as np

from tqdm import tqdm
from torchvision.utils import save_image, make_grid


In [None]:
# dataset_path = './datasets/dataset_rbmk/clear'

DEVICE = torch.device( "cpu")
batch_size = 100

x_dim = 784
hidden_dim = 400
latent_dim = 20

lr = 1e-3
num_epochs = 100

In [None]:
import os
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split

dataset_path = './datasets/dataset_rbmk/clearfile'
# 2. Ajuste as transformações
# Nota: Como são imagens de satélite, talvez você precise de Resize ou Grayscale 
# dependendo da arquitetura do seu VAE.
novo_transform = transforms.Compose([
    transforms.Resize((28, 28)),      # Redimensiona para manter compatibilidade com x_dim=784
    transforms.ToTensor(),            # [0, 255] -> [0, 1]
    # transforms.Grayscale(num_output_channels=1) # Descomente se quiser forçar P&B
])
kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available() else {}
# 3. Carregue o dataset completo
full_dataset = ImageFolder(root=dataset_path, transform=novo_transform)

# 4. Divisão em Treino e Teste (Já que você não tem pastas separadas nativamente)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# 5. DataLoaders permanecem quase iguais
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, **kwargs)

# --- Verificação das dimensões ---
# Para acessar a primeira imagem no ImageFolder dentro de um Subset:
amostra_x, rotulo = full_dataset[0]
canais, altura, largura = amostra_x.shape

input_channels = canais

print(f"--- Info da Amostra RBMK ---")
print(f"Dimensões da Imagem: {canais} canais x {altura}px x {largura}px")
print(f"Total de imagens encontradas: {len(full_dataset)}")

In [None]:
# from torchvision.datasets import MNIST
# import torchvision.transforms as transforms
# from torch.utils.data import DataLoader

# mnist_transform = transforms.Compose([
#     transforms.ToTensor(), # [0, 255] -> [0, 1]
# ])

# kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available() else {}

# train_dataset = MNIST(dataset_path, train=True, transform=mnist_transform, download=True)
# test_dataset = MNIST(dataset_path, train=False, transform=mnist_transform, download=True)

# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, **kwargs)

# canais_img = train_dataset[0][0].shape[0]
# input_channels = canais_img
# print(f'Número de canais da imagem: {canais_img}')

# # Acessando o primeiro item do dataset
# amostra_x, rotulo = train_dataset[0]

# # Extraindo dimensões
# canais = amostra_x.shape[0]
# altura = amostra_x.shape[1]
# largura = amostra_x.shape[2]

# print(f"--- Info da Amostra ---")
# print(f"Dimensões da Imagem: {canais} canais x {altura}px x {largura}px")
# print(f"Rótulo (Label): {rotulo}") 

In [None]:
# '''
# implementation of Gaussian MLP 
# '''

# class Encoder(nn.Module):
#     def __init__(self, x_dim, hidden_dim, latent_dim):
#         super(Encoder, self).__init__()
#         self.fc1 = nn.Linear(x_dim, hidden_dim) #ax + b
#         self.fc2 = nn.Linear(hidden_dim, hidden_dim)
#         self.fc3_mean = nn.Linear(hidden_dim, latent_dim)
#         self.fc3_logvar = nn.Linear(hidden_dim, latent_dim)

#         self.LeakyReLU = nn.LeakyReLU(0.2) # ela evita o problema de gradiente nulo, mesmo quando a entrada é negativa, o que pode ocorrer durante o treinamento.


#         self.training = True
    
#     def forward(self, x):
#         h_ = self.LeakyReLU(self.fc1(x))
#         h_ = self.LeakyReLU(self.fc2(h_))
#         mean = self.fc3_mean(h_)
#         logvar = self.fc3_logvar(h_)
#         return mean, logvar

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_channels, latent_dim):
        super(Encoder, self).__init__()
        
        # 1. Camadas Convolucionais para extração de features espaciais
        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=2, padding=1), # Saída: 14x14
            nn.LeakyReLU(0.2),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),            # Saída: 7x7
            nn.LeakyReLU(0.2),
            nn.Flatten() # Transforma o volume (64, 7, 7) em um vetor único
        )
        
        # Precisamos calcular o tamanho da entrada após o flatten
        # Para uma imagem 28x28 com 2 strides, chegamos a 7x7. 64 canais * 7 * 7 = 3136
        flat_features = 64 * 7 * 7 
        
        # 2. Camadas Lineares para os parâmetros da Gaussiana
        self.fc_mean = nn.Linear(flat_features, latent_dim)
        self.fc_logvar = nn.Linear(flat_features, latent_dim)

    def forward(self, x):
        # x deve ter o shape [Batch, Channels, Height, Width]
        h_ = self.conv_layers(x)
        
        mean = self.fc_mean(h_)
        logvar = self.fc_logvar(h_)
        
        return mean, logvar

In [None]:
# class Decoder(nn.Module):
#     def __init__(self, latent_dim, hidden_dim, y_dim):
#         super(Decoder, self).__init__()
#         self.fc1 = nn.Linear(latent_dim, hidden_dim)
#         self.fc2 = nn.Linear(hidden_dim, hidden_dim)
#         self.fc3 = nn.Linear(hidden_dim, y_dim)

#         self.LeakyReLU = nn.LeakyReLU(0.2)
    
#     def forward(self, z):
#         h = self.LeakyReLU(self.fc1(z))
#         h = self.LeakyReLU(self.fc2(h))
#         y_hat = torch.sigmoid(self.fc3(h))
#         return y_hat

In [None]:
class Decoder(nn.Module):
    def __init__(self, latent_dim, output_channels):
        super(Decoder, self).__init__()
        
        # 1. Expandir o vetor latente de volta para a dimensão do último mapa de features
        # No Encoder era (64, 7, 7), então flat_features = 3136
        self.fc_upsample = nn.Linear(latent_dim, 64 * 7 * 7)
        
        # 2. Camadas de Convolução Transposta (Upsampling)
        self.deconv_layers = nn.Sequential(
            # Entrada: [Batch, 64, 7, 7]
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1), 
            # Saída: [Batch, 32, 14, 14]
            nn.LeakyReLU(0.2),
            
            nn.ConvTranspose2d(32, output_channels, kernel_size=3, stride=2, padding=1, output_padding=1),
            # Saída: [Batch, output_channels, 28, 28]
            nn.Sigmoid() # Garante que os pixels estejam entre 0 e 1
        )

    def forward(self, z):
        # Transforma o vetor latente em um vetor "largo"
        h = self.fc_upsample(z)
        
        # Faz o "Reshape" para o formato de volume (Batch, Channels, Height, Width)
        # Isso é o inverso do Flatten()
        h = h.view(-1, 64, 7, 7)
        
        # Passa pelas camadas de expansão espacial
        y_hat = self.deconv_layers(h)
        
        return y_hat

In [None]:
class Model(nn.Module):
    def __init__(self, Encoder, Decoder):
        super(Model, self).__init__()
        self.Encoder = Encoder
        self.Decoder = Decoder
    
    def reparameterize(self, mean, logvar):
        dp = torch.exp(0.5 * logvar)
        epsilon = torch.randn_like(dp).to(DEVICE)
        z = mean + dp * epsilon
        return z
    
    def forward(self, x):
        mean, logvar = self.Encoder(x)
        z = self.reparameterize(mean, logvar)
        y_hat = self.Decoder(z)
        return y_hat, mean, logvar
    

In [None]:
# encoder = Encoder(x_dim, hidden_dim, latent_dim)
# decoder = Decoder(latent_dim, hidden_dim, x_dim)

encoder = Encoder(input_channels, latent_dim)
decoder = Decoder(latent_dim, input_channels)

model = Model(encoder, decoder).to(DEVICE)
print("model: ", model)

In [None]:
from torch.optim import Adam

# BCE_loss = nn.BCELoss()
# nn.functional.binary_cross_entropy(y_hat, x, reduction='sum')

def loss_(x, y_hat, mean, logvar):
    reproduction_loss = -torch.sum(x * torch.log(y_hat + 1e-10) + (1 - x) * torch.log(1 - y_hat + 1e-10))
    DKL = -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
    return reproduction_loss + DKL

optimizer = Adam(model.parameters(), lr=lr)

In [None]:
print("Starting training VAE...")
model.train()

for epoch in range(num_epochs):
    overall_loss = 0
    for batch_idx, (x, _) in enumerate(tqdm(train_loader)):
        x = x.view(-1, input_channels, altura, largura).to(DEVICE) # Garantindo que x tenha o formato correto para o Encoder

        optimizer.zero_grad()

        y_hat, mean, logvar = model(x)

        # Printando a média e o logvar do primeiro item do batch
        if batch_idx == 0: # Printa apenas no primeiro batch para não inundar a tela
            print(f"Média (primeiros 5 valores do vetor): {mean[0][:5].detach().cpu().numpy()}")
            print(f"LogVar (primeiros 5 valores do vetor): {logvar[0][:5].detach().cpu().numpy()}")

        loss = loss_(x, y_hat, mean, logvar)

        overall_loss += loss.item()
        loss.backward()
        optimizer.step()

        # print("\tEpoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
        #     epoch, batch_idx * len(x), len(train_loader.dataset),
        #     100. * batch_idx / len(train_loader), overall_loss / len(train_loader.dataset)))
        # print("\tEpoch", epoch + 1, "complete!", "\tAverage Loss: ", overall_loss / ((batch_idx+1)*batch_size))
    print("\tEpoch", epoch + 1, "complete!", "\tAverage Loss: ", overall_loss / ((batch_idx+1)*batch_size))
print("Finish!!")

In [None]:

import matplotlib.pyplot as plt

In [None]:
model.eval()

with torch.no_grad():
    for batch_idx, (x, _) in enumerate(tqdm(test_loader)):
        # x = x.view(batch_size, x_dim).to(DEVICE)
        x = x.view(-1, input_channels, altura, largura).to(DEVICE)
        y_hat, _, _ = model(x)
        


        break

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def show_image(x, idx):
    # 1. Debug: Printar o shape original (Batch, Canais, Altura, Largura)
    print(f"Shape original recebido: {x.shape}")
    
    x = x.detach().cpu()
    
    x = x.permute(0, 2, 3, 1)
    print(f"Shape após permute (Batch): {x.shape}")
    
    # 3. Configuração do Grid
    grid_size = int(np.sqrt(idx))
    fig, axes = plt.subplots(grid_size, grid_size, figsize=(10, 10))
    
    # Caso idx seja 1, o subplots não retorna um array, então forçamos a lista
    if idx == 1:
        axes = [axes]
    else:
        axes = axes.flatten()

    for i in range(idx):
        if i < len(x):
            # Para 3 canais (RGB), imshow detecta automaticamente se o canal estiver no final
            # Se for 1 canal após o permute, o Matplotlib ainda pode precisar de cmap='gray'
            img_to_show = x[i].numpy()
            
            # Se a última dimensão for 1 (Grayscale), o imshow precisa dela 'achatada'
            if img_to_show.shape[-1] == 1:
                img_to_show = img_to_show.squeeze(-1)
                axes[i].imshow(img_to_show, cmap='gray')
            else:
                axes[i].imshow(img_to_show)
                
            axes[i].axis('off')
        else:
            axes[i].axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
show_image(x, idx=25)

In [None]:
show_image(y_hat, idx=25)

In [None]:
import matplotlib.pyplot as plt
import torch

model.eval()
with torch.no_grad():
    for x, _ in test_loader:
        # x já vem do loader como [B, 3, H, W]
        x = x.to(DEVICE)
        y_hat, _, _ = model(x)
        break

n = 10 
plt.figure(figsize=(20, 6)) # Aumentei um pouco a altura para caber melhor os títulos

for i in range(n):
    # --- Imagem Original ---
    ax = plt.subplot(2, n, i + 1)
    
    # Prepara a imagem: move canais para o final [3, 28, 28] -> [28, 28, 3]
    img_original = x[i].cpu().permute(1, 2, 0)
    
    plt.imshow(img_original)
    plt.title("Original")
    ax.axis('off')

    # --- Imagem Reconstruída ---
    ax = plt.subplot(2, n, i + 1 + n)
    
    # Mesmo processo para a reconstrução
    img_reconstruida = y_hat[i].cpu().permute(1, 2, 0)
    
    plt.imshow(img_reconstruida)
    plt.title("VAE")
    ax.axis('off')

plt.tight_layout()
plt.show()