In [16]:
import torch
import torch.nn as nn
from torchsummary import summary

class SimpleConv(nn.Module):
    def __init__(self):
        super(SimpleConv, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 1, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
        )

    def forward(self, x, y):
        print(x.shape, x.shape)
        x1 = self.features(x)
        x2 = self.features(y)
        return x1, x2
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleConv().to(device)

summary(model=model, input_size=[(1, 16, 16), (1, 28, 28)], batch_size=None)

torch.Size([2, 1, 16, 16]) torch.Size([2, 1, 16, 16])
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #


TypeError: unsupported operand type(s) for *: 'NoneType' and 'int'

In [1]:
import torch
# from models import BaseVAE
from torch import nn
from torch.nn import functional as F
from torch import nn, Tensor
# from abc import abstractmethod


class VanillaVAE(nn.Module):

    def __init__(self,
                 input_dim: int,
                 latent_dim: int,
                 hidden_dim: int,
                 degenerate2ae = False,
                 **kwargs) -> None:
        super(VanillaVAE, self).__init__()
        self.training = True

        # Part 1, encoder
        self.FC_input = nn.Linear(input_dim, hidden_dim)
        self.FC_input2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_mean  = nn.Linear(hidden_dim, latent_dim)
        self.FC_var   = nn.Linear (hidden_dim, latent_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
        # Part 2, decoder
        self.FC_hidden = nn.Linear(latent_dim, hidden_dim)
        self.FC_hidden2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_output = nn.Linear(hidden_dim, input_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
        self.degenerate2ae = degenerate2ae

    def encode(self, input: Tensor):
        """
        Encodes the input by passing through the encoder network
        and returns the latent codes.
        :param input: (Tensor) Input tensor to encoder
        :return: (Tensor) List of latent codes
        """
        h_       = self.LeakyReLU(self.FC_input(input))
        h_       = self.LeakyReLU(self.FC_input2(h_))
        mean     = self.FC_mean(h_)
        log_var  = self.FC_var(h_) 

        return mean, log_var

    def decode(self, z: Tensor) -> Tensor:
        """
        Maps the given latent codes
        onto the image space.
        :param z: (Tensor) [B x D]
        :return: (Tensor) [B x C x H x W]
        """
        h     = self.LeakyReLU(self.FC_hidden(z))
        h     = self.LeakyReLU(self.FC_hidden2(h))
        x_hat = self.FC_output(h)

        return x_hat

    def reparameterize(self, mu: Tensor, logvar: Tensor) -> Tensor:
        """
        Reparameterization trick to sample from N(mu, var) from
        N(0,1).
        :param mu: (Tensor) Mean of the latent Gaussian [B x D]
        :param logvar: (Tensor) Standard deviation of the latent Gaussian [B x D]
        :return: (Tensor) [B x D]
        """
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, input: Tensor, **kwargs):
        mu, log_var = self.encode(input)
        if not self.degenerate2ae:
            z = self.reparameterize(mu, log_var)
        else:
            z = mu
        return  [self.decode(z), input, mu, log_var]    #  x_hat, mean, log_var

    def loss_function(self,
                      x, x_hat, mean, log_var, kld_loss_weight, recons_loss_weight,
                      **kwargs) -> dict:
        """
        Computes the VAE loss function.
        KL(N(\mu, \sigma), N(0, 1)) = \log \frac{1}{\sigma} + \frac{\sigma^2 + \mu^2}{2} - \frac{1}{2}
        :param args:
        :param kwargs:
        :return:
        """
        recons = x_hat
        input = x
        mu = mean
        log_var = log_var

        # kld_weight = kwargs['M_N'] # Account for the minibatch samples from the dataset
        recons_loss =F.mse_loss(recons, input)

        # 计算高斯分布和标准正态分布的KL散度
        if not self.degenerate2ae:
            kld_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)
            loss = recons_loss*recons_loss_weight + kld_loss*kld_loss_weight
        else:
            kld_loss = torch.zeros_like(recons_loss)
            loss = recons_loss

        return {'loss': loss, 'Reconstruction_Loss':recons_loss.detach(), 'KLD':-kld_loss.detach()}


    # Do I need this?
    def sample(self,
               num_samples:int,
               current_device: int, **kwargs) -> Tensor:
        """
        Samples from the latent space and return the corresponding
        image space map.
        :param num_samples: (Int) Number of samples
        :param current_device: (Int) Device to run the model
        :return: (Tensor)
        """
        z = torch.randn(num_samples,
                        self.latent_dim)

        z = z.to(current_device)

        samples = self.decode(z)
        return samples

    # Do I need this?
    def generate(self, x: Tensor, **kwargs) -> Tensor:
        """
        Given an input image x, returns the reconstructed image
        :param x: (Tensor) [B x C x H x W]
        :return: (Tensor) [B x C x H x W]
        """

        return self.forward(x)[0]

In [2]:
import numpy as np
from UTILS.tensor_ops import _2tensor, cfg
cfg.device_ = 'cuda:0'
cfg.use_float64_ = False
cfg.init = True

input_dim = 10
my_data_sample_x = np.random.rand(1000,10)*10 - 1 # random from -1 to +1
my_data_sample_y = my_data_sample_x.mean(-1)**2 + my_data_sample_x.mean(-1)*(-2) + 1

# solve the weight inital problem
vae_mod = VanillaVAE(input_dim=input_dim, latent_dim=16, hidden_dim=32, degenerate2ae=False)
my_data_sample_x = _2tensor(my_data_sample_x)
vae_mod = _2tensor(vae_mod)
from torch.optim import Adam
optimizer = Adam(vae_mod.parameters(), lr=3e-3)

print("Start training VAE...")
vae_mod.train()

for epoch in range(5000):
    overall_loss = 0
    x = my_data_sample_x

    optimizer.zero_grad()

    #  [self.decode(z), input, mu, log_var]    #  x_hat, mean, log_var
    x_hat, x_origin, mean, log_var = vae_mod(x) # model(x)
    error = ( torch.abs(x_hat)-torch.abs(x) )/( torch.abs(x)+1e-9 )
    std_, mean_ = torch.std_mean(error, unbiased=False)
    print('\test error mean %.2f and std %.2f'%(mean_.item(), std_.item()) )


    lossdict = vae_mod.loss_function(x=x, x_hat=x_hat, mean=mean, log_var=log_var, kld_loss_weight=1, recons_loss_weight=1)
    loss = lossdict['loss']
    overall_loss += loss.item()

    loss.backward()
    optimizer.step()
    # 'Reconstruction_Loss':recons_loss.detach(), 'KLD'
    print("\tEpoch", epoch + 1, "complete!", "\tReconstruction_Loss: ", lossdict['Reconstruction_Loss'].item(), "\tKLD: ", lossdict['KLD'].item())

print("Finish!!")

Start training VAE...
	est error mean 0.41 and std 102.26
	Epoch 1 complete! 	Reconstruction_Loss:  24.63921546936035 	KLD:  -2.741346836090088
	est error mean 1.09 and std 183.47
	Epoch 2 complete! 	Reconstruction_Loss:  24.383832931518555 	KLD:  -1.6888296604156494
	est error mean -0.34 and std 38.65
	Epoch 3 complete! 	Reconstruction_Loss:  24.15789222717285 	KLD:  -1.107545256614685
	est error mean 0.33 and std 102.45
	Epoch 4 complete! 	Reconstruction_Loss:  23.952356338500977 	KLD:  -0.8092849254608154
	est error mean 2.79 and std 333.62
	Epoch 5 complete! 	Reconstruction_Loss:  23.7401123046875 	KLD:  -0.6579384803771973
	est error mean 0.77 and std 141.02
	Epoch 6 complete! 	Reconstruction_Loss:  23.540189743041992 	KLD:  -0.5716368556022644
	est error mean 3.88 and std 451.96
	Epoch 7 complete! 	Reconstruction_Loss:  23.27976417541504 	KLD:  -0.5116891264915466
	est error mean 3.67 and std 422.31
	Epoch 8 complete! 	Reconstruction_Loss:  23.001203536987305 	KLD:  -0.4633805155

tensor(0.0975, device='cuda:0', grad_fn=<MeanBackward0>)

In [1]:
import torch
# from models import BaseVAE
from torch import nn
from torch.nn import functional as F
from torch import nn, Tensor
# from abc import abstractmethod


class VanillaVAE(nn.Module):

    def __init__(self,
                 input_dim: int,
                 latent_dim: int,
                 hidden_dim: int,
                 degenerate2ae = False,
                 **kwargs) -> None:
        super(VanillaVAE, self).__init__()
        self.training = True

        # Part 1, encoder
        self.FC_input = nn.Linear(input_dim, hidden_dim)
        self.FC_input2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_mean  = nn.Linear(hidden_dim, latent_dim)
        self.FC_var   = nn.Linear (hidden_dim, latent_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
        # Part 2, decoder
        self.FC_hidden = nn.Linear(latent_dim, hidden_dim)
        self.FC_hidden2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_output = nn.Linear(hidden_dim, input_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
        self.degenerate2ae = degenerate2ae

    def encode(self, input: Tensor):
        """
        Encodes the input by passing through the encoder network
        and returns the latent codes.
        :param input: (Tensor) Input tensor to encoder
        :return: (Tensor) List of latent codes
        """
        h_       = self.LeakyReLU(self.FC_input(input))
        h_       = self.LeakyReLU(self.FC_input2(h_))
        mean     = self.FC_mean(h_)
        log_var  = self.FC_var(h_) 

        return mean, log_var

    def decode(self, z: Tensor) -> Tensor:
        """
        Maps the given latent codes
        onto the image space.
        :param z: (Tensor) [B x D]
        :return: (Tensor) [B x C x H x W]
        """
        h     = self.LeakyReLU(self.FC_hidden(z))
        h     = self.LeakyReLU(self.FC_hidden2(h))
        x_hat = self.FC_output(h)

        return x_hat

    def reparameterize(self, mu: Tensor, logvar: Tensor) -> Tensor:
        """
        Reparameterization trick to sample from N(mu, var) from
        N(0,1).
        :param mu: (Tensor) Mean of the latent Gaussian [B x D]
        :param logvar: (Tensor) Standard deviation of the latent Gaussian [B x D]
        :return: (Tensor) [B x D]
        """
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, input: Tensor, **kwargs):
        mu, log_var = self.encode(input)
        if not self.degenerate2ae:
            z = self.reparameterize(mu, log_var)
        else:
            z = mu
        return  [self.decode(z), input, mu, log_var]    #  x_hat, mean, log_var

    def loss_function(self,
                      x, x_hat, mean, log_var, kld_loss_weight, recons_loss_weight,
                      **kwargs) -> dict:
        """
        Computes the VAE loss function.
        KL(N(\mu, \sigma), N(0, 1)) = \log \frac{1}{\sigma} + \frac{\sigma^2 + \mu^2}{2} - \frac{1}{2}
        :param args:
        :param kwargs:
        :return:
        """
        recons = x_hat
        input = x
        mu = mean
        log_var = log_var

        # kld_weight = kwargs['M_N'] # Account for the minibatch samples from the dataset
        recons_loss = F.mse_loss(recons, input)

        # 计算高斯分布和标准正态分布的KL散度
        if not self.degenerate2ae:
            kld_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)
            loss = recons_loss*recons_loss_weight + kld_loss*kld_loss_weight
        else:
            kld_loss = torch.zeros_like(recons_loss)
            loss = recons_loss

        return {'loss': loss, 'Reconstruction_Loss':recons_loss.detach(), 'KLD':-kld_loss.detach()}


    # Do I need this?
    def sample(self,
               num_samples:int,
               current_device: int, **kwargs) -> Tensor:
        """
        Samples from the latent space and return the corresponding
        image space map.
        :param num_samples: (Int) Number of samples
        :param current_device: (Int) Device to run the model
        :return: (Tensor)
        """
        z = torch.randn(num_samples,
                        self.latent_dim)

        z = z.to(current_device)

        samples = self.decode(z)
        return samples

    # Do I need this?
    def generate(self, x: Tensor, **kwargs) -> Tensor:
        """
        Given an input image x, returns the reconstructed image
        :param x: (Tensor) [B x C x H x W]
        :return: (Tensor) [B x C x H x W]
        """

        return self.forward(x)[0]

import numpy as np
from UTILS.tensor_ops import _2tensor, cfg
cfg.device_ = 'cuda:0'
cfg.use_float64_ = False
cfg.init = True

input_dim = 10
my_data_sample_x = (np.random.rand(1000,10)-0.5)*2 + 5 # 0.75~1.25
my_data_sample_y = my_data_sample_x.mean(-1)**2 + my_data_sample_x.mean(-1)*(-2) + 1

# solve the weight inital problem
vae_mod = VanillaVAE(input_dim=input_dim, latent_dim=16, hidden_dim=32, degenerate2ae=False)
my_data_sample_x = _2tensor(my_data_sample_x)
vae_mod = _2tensor(vae_mod)
from torch.optim import Adam
optimizer = Adam(vae_mod.parameters(), lr=3e-3)

print("Start training VAE...")
vae_mod.train()

for epoch in range(5000):
    overall_loss = 0
    x = my_data_sample_x

    optimizer.zero_grad()

    #  [self.decode(z), input, mu, log_var]    #  x_hat, mean, log_var
    x_hat, x_origin, mean, log_var = vae_mod(x) # model(x)
    error = ( torch.abs(x_hat - x) )/( torch.abs(x)+1e-9 )
    std_, mean_ = torch.std_mean(error, unbiased=False)
    print('\test error mean %.2f%% and std %.2f'%(mean_.item()*100, std_.item()) , end='\t')


    lossdict = vae_mod.loss_function(x=x, x_hat=x_hat, mean=mean, log_var=log_var, kld_loss_weight=1, recons_loss_weight=1)
    loss = lossdict['loss']
    overall_loss += loss.item()

    loss.backward()
    optimizer.step()
    # 'Reconstruction_Loss':recons_loss.detach(), 'KLD'
    print("\tEpoch", epoch + 1, "complete!", "\tReconstruction_Loss: ", lossdict['Reconstruction_Loss'].item(), "\tKLD: ", lossdict['KLD'].item())

print("Finish!!")

Start training VAE...
	est error mean 101.50% and std 0.03		Epoch 1 complete! 	Reconstruction_Loss:  26.14052391052246 	KLD:  -2.1015288829803467
	est error mean 100.80% and std 0.03		Epoch 2 complete! 	Reconstruction_Loss:  25.792667388916016 	KLD:  -1.0327492952346802
	est error mean 100.25% and std 0.03		Epoch 3 complete! 	Reconstruction_Loss:  25.51686668395996 	KLD:  -0.47612470388412476
	est error mean 99.79% and std 0.03		Epoch 4 complete! 	Reconstruction_Loss:  25.28688621520996 	KLD:  -0.28897348046302795
	est error mean 99.40% and std 0.03		Epoch 5 complete! 	Reconstruction_Loss:  25.09489631652832 	KLD:  -0.25286567211151123
	est error mean 99.01% and std 0.03		Epoch 6 complete! 	Reconstruction_Loss:  24.903974533081055 	KLD:  -0.23312784731388092
	est error mean 98.53% and std 0.03		Epoch 7 complete! 	Reconstruction_Loss:  24.671171188354492 	KLD:  -0.20038768649101257
	est error mean 98.11% and std 0.03		Epoch 8 complete! 	Reconstruction_Loss:  24.472043991088867 	KLD:  -0