In [None]:
import numpy as np
from scipy.stats import norm  # Import the norm module
import matplotlib.pyplot as plt
from scipy.integrate import simps

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np


In [None]:
# Define the function that relates x and z
# def function_x(z):
#     # Define the relationship between x and z
#     # Here, we use f(z) = (z - 0.5)^2 as an example
#     return (z - 0.5)**2

def joint_pdf(x, z, x_variance):
    # Calculate f(z) for given z
    fx = function_x(z)
    # Calculate the PDF of x given f(z) and x_variance
    pdf_x_given_z = norm.pdf(x, loc=fx, scale=np.sqrt(x_variance))
    # Calculate the PDF of z (uniform distribution in the range 0 to 1)
    pdf_z = np.where((z >= 0) & (z <= 1), 1, 0)
    # Return the product of the PDFs of x given z and the PDF of z
    return pdf_x_given_z * pdf_z


def get_p_z_given_x(x, x_variance):
    # Set the values for z for which to calculate the joint PDF
    z_values = np.linspace(0, 1, 10000)
    # Calculate the joint PDF for x and z at the given x value
    pdf_joint = joint_pdf(x, z_values, x_variance)
    # Calculate the marginal PDF of x
    pdf_x = simps(pdf_joint, x=z_values)
    # Calculate p(z|x) using Bayes' theorem
    pdf_z_given_x = pdf_joint / pdf_x
    # Return p(z|x)
    return pdf_z_given_x




def generate_x_samples(num_samples, x_variance):
    """
    Generates x samples from a normal distribution with noise variance and mean f(z).
    
    Args:
        num_samples (int): Number of samples to generate.
        x_variance (float): Variance of the normal distribution for x generation.
        
    Returns:
        x_samples (np.ndarray): Generated x samples.
    """
    # Generate latent variable z from uniform distribution
    z_samples = np.random.uniform(0, 1, num_samples)
    
    # Calculate x mean based on z values
    x_mean = np.where(z_samples < 0.5, (z_samples - 0.5)**2, (2*z_samples - 1.5)**2)
    
    print(x_mean)
    # Generate x samples from normal distribution with calculated mean and given variance
    x_samples = np.random.normal(x_mean, np.sqrt(x_variance), num_samples)
    
    return x_samples







def function_x(z):
    """
    Calculates f(z) based on the value of z. Works for scalar or numpy array inputs.

    Args:
        z (float or numpy array): The value(s) of z.

    Returns:
        float or numpy array: The value(s) of f(z) based on the condition.
    """
    # Convert z to numpy array if it's not already
    z = np.asarray(z)
    # Create a mask for z values less than 0.5
    mask = z < 0.5
    # Apply the condition to calculate f(z) for z < 0.5 and z >= 0.5
    fx = np.where(mask, (2*z - 0.5)**2, (2*z - 1.5)**2)
    return fx








In [None]:
n = 10000
x_values = np.linspace(-1, 1, n)
z_values = np.linspace(0, 1, n)


# Calculate the joint PDF for x and z at each combination of x and z values
X, Z = np.meshgrid(x_values, z_values)
pdf_joint = joint_pdf(X, Z, x_variance=0.01)

# Define a reversed colormap
cmap = plt.cm.viridis_r

# Plot the joint density using contourf with the reversed colormap
plt.contourf(Z, X, pdf_joint, cmap=cmap)
plt.colorbar(label='Joint PDF')
plt.xlabel('z')
plt.ylabel('y')
plt.title('Joint Density of y and z')
plt.ylim((-0.4, 0.6))
plt.show()

In [None]:
# Plot the conditional PDF of z given x for the given x and x_variance

fig, ax = plt.subplots(3, 1, figsize = (10, 10), sharex = True)

color = ['purple', 'red', 'green']

for i, x in enumerate([-3, 0.2, 1][::-1]):

  pdf_z_given_x = get_p_z_given_x(x, 0.01)
  ax[i].plot(np.linspace(0, 1, n), pdf_z_given_x, color=color[i])
  ax[i].set_title('True posterior z given y = ' + str(x), fontsize = 20)
ax[2].set_xlabel('z', fontsize = 20)

plt.subplots_adjust(hspace=.35)




# True f function

In [None]:
z_ = np.linspace(0, 1, 10000)
plt.plot(z_, function_x(z_) )

# Vanilla VAE

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import numpy as np

# Define the VAE model
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(1, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU()
        )
        self.mu_layer = nn.Linear(16, 1)
        self.logvar_layer = nn.Linear(16, 1)
        self.decoder = nn.Sequential(
            nn.Linear(1, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        h = self.encoder(x)
        mu = self.mu_layer(h)
        logvar = self.logvar_layer(h)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z) , mu, logvar


def loss_function(recon_x, x, mu, logvar):
    BCE = nn.MSELoss()(recon_x, x)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD


def train(model, dataloader, device, optimizer, epoch):
    model.train()
    train_loss = 0
    for batch_idx, data in enumerate(dataloader):
        data = data.to(device)
        optimizer.zero_grad()
        recon_batch, mu, logvar = model(data)
        loss = loss_function(recon_batch, data, mu, logvar)
        loss.backward()
        train_loss += loss.item()
        optimizer.step()

    print('Epoch: {:03d} Average loss: {:.4f}'.format(epoch, train_loss / len(dataloader.dataset)))


# Hyperparameters
epochs = 20
batch_size = 64
learning_rate = 0.005


# Generate your 1D continuous dataset
data_np = torch.from_numpy(generate_x_samples(10000, 0.01).reshape(-1, 1))
data_tensor = torch.tensor(data_np, dtype=torch.float32)
dataloader = data.DataLoader(data_tensor, batch_size=batch_size, shuffle=True)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the VAE model
model = VAE().to(device)

# Set the optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
for epoch in range(1, epochs + 1):
    train(model, dataloader, device, optimizer, epoch)


In [None]:
def generate_samples(model, num_samples):
    z = torch.tensor(np.random.normal(0,1 , num_samples).astype(np.float32).reshape(-1,1) )
    
    
    with torch.no_grad():
        samples = model.decoder(z) 

    return samples.cpu().numpy().reshape(-1, )

def visualize_vae(model, data, num_samples=1000):
    generated_samples = generate_samples(model, num_samples) 
    
    plt.figure(figsize=(10, 5))

    # Plot original dataset
    plt.subplot(1, 2, 1)
    plt.hist(data, bins=150, alpha=1, density=True)
    plt.title('Original Dataset\n Mean True Data: ' +  str(np.round(np.mean(data), 4) ) )

    # Plot generated samples
    plt.subplot(1, 2, 2)
    plt.hist(generated_samples, bins=150, alpha=1, density=True)
    plt.title('Generated Dataset MFG-VAE\n Mean MFG-VAE: '+  str(np.round(np.mean(generated_samples), 4 ) ) )

    plt.show()
    print('Mean True Data:', np.mean(data))
    print('Mean generated:', np.mean(generated_samples))

# Visualize the distribution X learnt by our Vanilla VAE




In [None]:
visualize_vae(model, data_np.numpy().reshape(-1, ), 10000)


# Plot Vanilla VAE p(z|x)


In [None]:
with torch.no_grad():
  for x in [-3, 0.2, 1][::-1]:
    z, mu, log_var = model.forward(torch.tensor(np.array(x).astype(np.float32).reshape(-1, 1))) 
    print(z, mu, np.exp(log_var))

In [None]:
def plot_posterior_z_given_x(model, x_s = [-3, 0.2, 1][::-1]):

  mus = []
  vars = []



  fig, ax = plt.subplots(3, 1, figsize = (10, 10), sharex = True)

  colors = ['purple', 'red', 'green']


  

  for i, x in enumerate(x_s):
    with torch.no_grad():
      _, mu, log_var = model.forward(torch.tensor(np.array(x).astype(np.float32).reshape(-1, 1)))
      mu = mu.numpy()[0][0]
      var = np.exp(2 * log_var.numpy()[0][0])
      mus.append(mu)
      vars.append(var)


  
      sigma = np.sqrt(var)    # standard deviation

      x_ = np.linspace(mu - 3*sigma, mu + 3*sigma, 1000)
      y_ = (1/(np.sqrt(2*np.pi)*sigma))*np.exp(-((x_-mu)**2)/(2*var))

      ax[i].plot(x_, y_, color = colors[i])
      ax[i].set_title('Learnt posterior z given y = ' + str(x) + '\nMean = ' + str(np.round(mu, 4))+ ' Std = ' + str(np.round(np.sqrt(var), 4)), fontsize = 20 )

  ax[2].set_xlabel('z', fontsize = 20)

  plt.subplots_adjust(hspace=.35)


plot_posterior_z_given_x(model)

# Function f learnt by the vanilla VAE

In [None]:
with torch.no_grad():
  y_ = model.decoder(torch.tensor(z_values.astype(np.float32).reshape(-1, 1))).numpy().reshape(-1, )
  plt.plot(z_values, y_)
  plt.title('Learnt f by MFG-VAE')

# GMM Posterior VAE

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim, n_gaussians):
        super(Encoder, self).__init__()
        # First, we create a fully-connected layer that takes as input our
        # flattened images and outputs a hidden dimension.
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        # Next, we create two fully-connected layers that take as input the
        # hidden dimension and output the mean and log variance of our
        # Gaussian mixture model. We use the number of Gaussians in the
        # mixture and the latent dimension to reshape the output of the
        # layers into a tensor of shape (n_gaussians, latent_dim).
        self.fc2_mean = nn.Linear(hidden_dim, latent_dim * n_gaussians)
        self.fc2_logvar = nn.Linear(hidden_dim, latent_dim * n_gaussians)
        self.n_gaussians = n_gaussians
        self.latent_dim = latent_dim

    def forward(self, x):
        # First, we pass our input through the first fully-connected layer
        # and apply a ReLU activation function. This will output a tensor
        # with the shape (batch_size, hidden_dim).
        h = torch.relu(self.fc1(x)) 
        # Next, we pass the output of the hidden layer through the two
        # fully-connected layers that output the mean and log variance of
        # the Gaussian mixture model. We reshape the output of each layer
        # into a tensor of shape (batch_size, n_gaussians, latent_dim).
        mean = self.fc2_mean(h).view(-1, self.n_gaussians, self.latent_dim)
        logvar = self.fc2_logvar(h).view(-1, self.n_gaussians, self.latent_dim)
        return mean, logvar

class Decoder(nn.Module):
# This class defines a decoder network for a variational autoencoder.
# It takes in the latent variable and outputs the reconstructed input.
# The decoder is made up of two fully connected layers.
# The first layer is a linear transformation from the latent dimension to the hidden dimension.
# The second layer is a linear transformation from the hidden dimension to the output dimension.
# The output is then passed through a sigmoid function to get a value between 0 and 1.
# This is the reconstructed input.
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(latent_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, z):
        h = torch.relu(self.fc1(z))
        x_recon = torch.sigmoid(self.fc2(h))
        return x_recon

class VAE_GMM(nn.Module):
# This class is for creatiang and training a variational autoencoder with 
# a Gaussian mixture posterior.
# The encoder is a neural network with an input layer, a hidden layer, and a latent layer.
# The decoder is a neural network with a latent layer, a hidden layer, and an output layer.
# The Gaussian mixture prior on the latent space is specified by setting the number of Gaussian components.
    def __init__(self, input_dim, hidden_dim, latent_dim, n_gaussians):
        super(VAE_GMM, self).__init__()
        self.encoder = Encoder(input_dim, hidden_dim, latent_dim, n_gaussians)
        self.decoder = Decoder(latent_dim, hidden_dim, input_dim)
        self.n_gaussians = n_gaussians
        self.latent_dim = latent_dim
        self.mixture_logits = nn.Parameter(torch.zeros(n_gaussians))

    def reparameterize(self, mean, logvar):
        # Reparameterization trick to sample from N(mu, var) from
        # N(0,1). This is also called the "reparameterization trick".
        # This trick allows us to backpropagate through the sampling process.
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mean + eps * std

    def forward(self, x):
        mean, logvar = self.encoder(x)
        z = self.reparameterize(mean, logvar)
        logits = torch.softmax(self.mixture_logits, dim=0)
        z_weighted = torch.sum(z * logits.unsqueeze(0).unsqueeze(2), dim=1)
        x_recon = self.decoder(z_weighted)
        return x_recon, mean, logvar, logits

def loss_function(x, x_recon, mean, logvar):
    BCE = nn.functional.binary_cross_entropy(x_recon, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
    return BCE + KLD

def train_vae_gmm(data_loader, model, optimizer, device, epochs=100):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for _, data in enumerate(data_loader):
            data = data[0]
            data = data.to(device)
            optimizer.zero_grad()
            x_recon, mean, logvar, logits = model(data)
            loss = loss_function(data, x_recon, mean, logvar)
            loss.backward()
            train_loss += loss.item()
            optimizer.step()

        print(f"Epoch: {epoch+1}/{epochs}, Loss: {train_loss / len(data_loader.dataset):.4f}")

In [None]:
device = torch.device("cpu")

# generate data and trainingg

data = generate_x_samples(10000,0.01)
dataset = torch.utils.data.TensorDataset(torch.Tensor(data[:,np.newaxis]))
data_loader = DataLoader(dataset, batch_size=64, shuffle=True)

input_dim = 1
hidden_dim = 100
latent_dim = 1
n_gaussians = 10

model = VAE_GMM(input_dim, hidden_dim, latent_dim, n_gaussians).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3) 

train_vae_gmm(data_loader, model, optimizer, device, epochs=100)

In [None]:
num_samples = 10000
def visualize_vae_gmm(model, data, num_samples):

    # sample from the prior
    generated_samples = np.random.normal(0, 1, num_samples)
    generated_samples = model.decoder(torch.Tensor(generated_samples[:,np.newaxis])).cpu().detach().numpy() 
    #generated_samples = generated_samples + np.random.normal(0, np.sqrt(0.01), num_samples)[:,np.newaxis]
    
    plt.figure(figsize=(10, 5))

    # Plot original dataset
    plt.subplot(1, 2, 1)
    plt.hist(data, bins=50, alpha=0.5, density=True)
    plt.title(f'Original Dataset \n Mean True Data Normal: {np.mean(data):.4f}')

    # Plot generated samples
    plt.subplot(1, 2, 2)
    plt.hist(generated_samples, bins=50, alpha=0.5, density=True)
    plt.title(f'Generated Dataset MoG-VAE \n Mean Generated Data: {np.mean(generated_samples):.4f}')

    plt.show()
    print(np.mean(data))
    print(np.mean(generated_samples))
visualize_vae_gmm(model, data, num_samples)

In [None]:
z_values = np.linspace(0, 1, n)
z_samples = np.array(z_values)
z_samples = z_samples.reshape(-1, 1)  # Reshape the latent variables to have shape (batch_size, z_dim)
z_samples_tensor = torch.tensor(z_samples, dtype=torch.float32).to(next(model.parameters()).device)

with torch.no_grad():
    x_decoded = model.decoder(z_samples_tensor).squeeze().numpy()
    
# %%
true_x = function_x(z_samples)

plt.plot(z_samples, x_decoded)
plt.ylabel('f(z) learned') 
plt.xlabel('z')
plt.title('Learned f(z) function by MoG-VAE')

In [None]:
# Function to compute the Gaussian mixture density
from scipy.stats import norm


def gaussian_mixture_density(x, means, variances, weights):
    density = np.zeros_like(x)
    stds_dev = 0
    for m, v, w in zip(means, variances, weights):
        stds_dev += w * np.sqrt(v)
        density += w * norm.pdf(x, loc=m, scale=np.sqrt(v))
    return density, np.mean(density), stds_dev[0]

# Plot the conditional PDF of z given x for the given x and x_variance
fig, ax = plt.subplots(3, 1, figsize = (10, 10), sharex = True)
color = ['purple', 'red', 'green']
for tu, i in zip([-3, 0.2, 1],[0,1,2]):
  with torch.no_grad():
      means = model.encoder(torch.Tensor([[tu]]))[0][0].numpy()
      stdevs = torch.exp(0.5 * model.encoder(torch.Tensor([[tu]]))[1][0]).numpy()
      weights = torch.nn.functional.softmax(model.mixture_logits, dim=0).numpy()
  #y, mea, stad = gaussian_mixture_density(x, means,  stdevs, weights)
    # Generate the x-axis values
  
  x = np.linspace(min(means) - 3 * max(stdevs), max(means) + 3 * max( stdevs), 1000)

    # Compute the Gaussian mixture density
  y, mea, stad = gaussian_mixture_density(x, means,  stdevs, weights)
  #pdf_z_given_x = get_p_z_given_x(x, 0.01)
  ax[i].plot(x, y, color=color[i])
  ax[i].set_title(f'True posterior z given y =  {tu} \n   Mean: {mea:.4f}, Std: {stad:.4f}', fontsize = 20)
ax[2].set_xlabel('z', fontsize = 20)
plt.subplots_adjust(hspace=.35)

# GMM Prior VAE

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(Encoder, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2_mean = nn.Linear(hidden_dim, latent_dim)
        self.fc2_logvar = nn.Linear(hidden_dim, latent_dim)

    def forward(self, x):
        h = torch.relu(self.fc1(x))
        z_mean = self.fc2_mean(h)
        z_logvar = self.fc2_logvar(h)
        return z_mean, z_logvar


class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(latent_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, z):
        h = torch.relu(self.fc1(z))
        x_recon = torch.sigmoid(self.fc2(h))
        return x_recon


class MixtureVAE(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim, n_components):
        super(MixtureVAE, self).__init__()
        self.laten_dim = latent_dim
        self.encoder = Encoder(input_dim, hidden_dim, latent_dim)
        self.decoder = Decoder(latent_dim, hidden_dim, input_dim)
        self.prior_means = nn.Parameter(torch.zeros(n_components, latent_dim))
        self.prior_logvars = nn.Parameter(torch.zeros(n_components, latent_dim))
        self.prior_logits = nn.Parameter(torch.zeros(n_components))

    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mean + eps * std

    def forward(self, x):
        z_mean, z_logvar = self.encoder(x)
        z = self.reparameterize(z_mean, z_logvar)
        x_recon = self.decoder(z)
        return x_recon, z_mean, z_logvar, z

    def loss_function(self, x, x_recon, z_mean, z_logvar, z):
        recon_loss = nn.functional.binary_cross_entropy(x_recon, x, reduction='sum')
    
        prior_mean = torch.unsqueeze(z_mean, 1)  # Shape: (batch_size, 1, latent_dim)
        prior_logvar = torch.unsqueeze(z_logvar, 1)  # Shape: (batch_size, 1, latent_dim)
        prior_var = torch.exp(self.prior_logvars)  # Shape: (n_components, latent_dim)

        z_expanded = z.unsqueeze(1)  # Shape: (batch_size, 1, latent_dim)
        log_p_z = torch.sum(-0.5 * (torch.log(2 * torch.pi * prior_var) + (z_expanded - self.prior_means) ** 2 / prior_var), dim=2)
        log_p_z = torch.logsumexp(log_p_z + self.prior_logits, dim=1)
    
        log_q_z = torch.sum(-0.5 * (torch.log(2 * torch.pi * torch.exp(z_logvar)) + (z - z_mean) ** 2 / torch.exp(z_logvar)), dim=1)
    
        kl_divergence = torch.sum(log_q_z - log_p_z)
    
        loss = recon_loss + kl_divergence
        return loss

In [None]:
def train_mixture_vae(model, data_loader, epochs, learning_rate):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    device = torch.device("cpu")
    model.to(device)
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for _, data in enumerate(data_loader):
            data = data[0]
            data = data.to(device)
            optimizer.zero_grad()
            x_recon, z_mean, z_logvar, z = model(data)
            loss = model.loss_function(data, x_recon, z_mean, z_logvar, z)
            loss.backward()
            train_loss += loss.item()
            optimizer.step()
        
        train_loss /= len(data_loader.dataset)
        print(f"Epoch: {epoch + 1}/{epochs}, Train loss: {train_loss:.4f}")

In [None]:
input_dim = 1
hidden_dim = 100
latent_dim = 1
n_gaussians = 10
model2 = MixtureVAE(input_dim, hidden_dim, latent_dim, n_gaussians)

In [None]:
train_mixture_vae(model2, data_loader, 100, 1e-3)

In [None]:
z_samples = np.array(z_values)
z_samples = z_samples.reshape(-1, 1)  # Reshape the latent variables to have shape (batch_size, z_dim)
z_samples_tensor = torch.tensor(z_samples, dtype=torch.float32).to(next(model.parameters()).device)

with torch.no_grad():
    x_decoded = model2.decoder(z_samples_tensor).squeeze().numpy()
    
plt.plot(z_samples, x_decoded)
plt.ylabel('f(z) learned') 
plt.xlabel('z')
plt.title('Learned f(z) function by MFG-VAE with MoG prior')

In [None]:
def sample_from_gmm_prior(model, num_samples):
    with torch.no_grad():
        logits = torch.softmax(model.prior_logits, dim=0)
        chosen_components = np.random.choice(model.prior_logits.shape[0], size=num_samples, p=logits.cpu().numpy())
        z = torch.randn(num_samples, model.laten_dim).to(model.prior_logits.device)
        z_weighted = z * logits[chosen_components].unsqueeze(1)
        return z_weighted

def generate_samples(model, num_samples):
    z_weighted = sample_from_gmm_prior(model, num_samples)
    with torch.no_grad():
        samples = model.decoder(z_weighted)
    return samples.cpu().numpy()


In [None]:
num_samples = 10000

def visualize_vae_gmm(model, data, num_samples=1000):
    """
    Visualizes the original data and the generated samples from the model.
    """
    # Generate samples from the model
    generated_samples = generate_samples(model, num_samples)
    
    # Create the figure
    plt.figure(figsize=(10, 5))

    # Plot original dataset
    plt.subplot(1, 2, 1)
    plt.hist(data, bins=50, alpha=0.5, density=True)
    plt.title(f'Original Dataset \n Mean True Data Normal: {np.mean(data):.4f}')

    # Plot generated samples
    plt.subplot(1, 2, 2)
    plt.hist(generated_samples, bins=50, alpha=0.5, density=True)
    plt.title(f'Generated Dataset MFG-VAE with MoG prior \n Mean Generated Data: {np.mean(generated_samples):.4f}')

    plt.show()
    print(np.mean(data))
    print(np.mean(generated_samples))
visualize_vae_gmm(model2, data, num_samples)

In [None]:

# Plot the conditional PDF of z given x for the given x and x_variance

fig, ax = plt.subplots(3, 1, figsize = (10, 10), sharex = True)
color = ['purple', 'red', 'green']
for tu, i in zip([-3, 0.2, 1],[0,1,2]):
  with torch.no_grad():
        means = model2.encoder(torch.Tensor([[tu]]))[0][0].numpy()
        stdevs = torch.exp(0.5 * model.encoder(torch.Tensor([[tu]]))[1][0]).numpy()
  
  x = np.linspace(means[0]- 3 * stdevs[0][0], means[0] + 3 * stdevs[0][0], 1000)

  print(stdevs[0][0])
    # Compute the Gaussian mixture density
  y = pdf = (1 / (stdevs[0][0] * np.sqrt(2 * np.pi))) * np.exp(- (x - means[0]) ** 2 / (2 * stdevs[0][0] ** 2))
  ax[i].plot(x, y, color=color[i])
  ax[i].set_title(f'True posterior z given y =  {tu} \n   Mean: {means[0]:.4f}, Std: {stdevs[0][0]:.4f}', fontsize = 20)
ax[2].set_xlabel('z', fontsize = 20)
plt.subplots_adjust(hspace=.35)