In [1]:
import tensorflow as tf
import numpy as np
import keras as tfk
import keras.layers as tfkl

def swish_tf(x):
    return x * tf.sigmoid(x)

def log_norm_pdf_tf(x, mu, logvar):
    return -0.5 * (logvar + np.log(2 * np.pi) + (x - mu)**2 / tf.exp(logvar))



KeyboardInterrupt: 

In [None]:
class KerasEncoder(tfk.Model):
    def __init__(self, hidden_dim, latent_dim, input_dim, eps=1e-1):
        super(KerasEncoder, self).__init__()

        self.fc1 = tfkl.Dense(hidden_dim, activation=swish_tf)
        self.ln1 = tfkl.LayerNormalization(epsilon=eps)
        self.fc2 = tfkl.Dense(hidden_dim, activation=swish_tf)
        self.ln2 = tfkl.LayerNormalization(epsilon=eps)
        self.fc3 = tfkl.Dense(hidden_dim, activation=swish_tf)
        self.ln3 = tfkl.LayerNormalization(epsilon=eps)
        self.fc4 = tfkl.Dense(hidden_dim, activation=swish_tf)
        self.ln4 = tfkl.LayerNormalization(epsilon=eps)
        self.fc5 = tfkl.Dense(hidden_dim, activation=swish_tf)
        self.ln5 = tfkl.LayerNormalization(epsilon=eps)
        self.fc_mu = tfkl.Dense(latent_dim)
        self.fc_logvar = tfkl.Dense(latent_dim)
        self.dropout = tfkl.Dropout(rate=0.0)  # Initialize with a placeholder rate

    def call(self, inputs, training=False, dropout_rate=0.0):
        self.dropout.rate = dropout_rate

        # Normalize the input
        norm = tf.sqrt(tf.reduce_sum(tf.square(inputs), axis=-1, keepdims=True))
        x = inputs / norm

        x = self.dropout(x, training=training)

        h1 = self.ln1(self.fc1(x))
        h2 = self.ln2(self.fc2(h1) + h1)
        h3 = self.ln3(self.fc3(h2) + h1 + h2)
        h4 = self.ln4(self.fc4(h3) + h1 + h2 + h3)
        h5 = self.ln5(self.fc5(h4) + h1 + h2 + h3 + h4)

        return self.fc_mu(h5), self.fc_logvar(h5)


In [None]:
class CompositePrior(tfk.Model):
    def __init__(self, hidden_dim, latent_dim, input_dim, mixture_weights=[3/20, 3/4, 1/10]):
        super(CompositePrior, self).__init__()
        
        self.mixture_weights = mixture_weights
        
        self.mu_prior = tf.Variable(tf.zeros([1, latent_dim]), trainable=False)
        self.logvar_prior = tf.Variable(tf.zeros([1, latent_dim]), trainable=False)
        self.logvar_uniform_prior = tf.Variable(tf.fill([1, latent_dim], 10.0), trainable=False)
        
        # Assuming Encoder is appropriately defined for TensorFlow
        self.encoder_old = KerasEncoder(hidden_dim, latent_dim, input_dim)
        self.encoder_old.trainable = False
        
    def __call__(self, x, z):
        post_mu, post_logvar = self.encoder_old(x, 0)
        
        stnd_prior = log_norm_pdf_tf(z, self.mu_prior, self.logvar_prior)
        post_prior = log_norm_pdf_tf(z, post_mu, post_logvar)
        unif_prior = log_norm_pdf_tf(z, self.mu_prior, self.logvar_uniform_prior)
        
        gaussians = [stnd_prior, post_prior, unif_prior]
        gaussians = [g + np.log(w) for g, w in zip(gaussians, self.mixture_weights)]
        
        density_per_gaussian = tf.stack(gaussians, axis=-1)
                
        return tf.reduce_logsumexp(density_per_gaussian, axis=-1)

In [None]:
import tensorflow as tf
import numpy as np
import tensorflow.keras as tfk
import tensorflow.keras.layers as tfkl

class VAE(tfk.Model):
    def __init__(self, hidden_dim, latent_dim, input_dim):
        super(VAE, self).__init__()

        self.encoder = KerasEncoder(hidden_dim, latent_dim, input_dim)
        self.prior = CompositePrior(hidden_dim, latent_dim, input_dim)
        self.decoder = tfkl.Dense(input_dim)

    def reparameterize(self, mu, logvar, training):
        if training:
            std = tf.exp(0.5 * logvar)
            eps = tf.random.normal(shape=tf.shape(std))
            return eps * std + mu
        else:
            return mu

    def call(self, user_ratings, beta=None, gamma=1, dropout_rate=0.5, calculate_loss=True, training=True):
        mu, logvar = self.encoder(user_ratings, training, dropout_rate)    
        z = self.reparameterize(mu, logvar, training)
        x_pred = self.decoder(z)
        
        if calculate_loss:
            if gamma is not None:
                norm = tf.reduce_sum(user_ratings, axis=-1)
                kl_weight = gamma * norm
            elif beta is not None:
                kl_weight = beta

            mll = tf.reduce_mean(tf.reduce_sum(tf.nn.log_softmax(x_pred, axis=-1) * user_ratings, axis=-1))
            kld = tf.reduce_mean(tf.reduce_sum(log_norm_pdf_tf(z, mu, logvar) - self.prior(user_ratings, z), axis=-1) * kl_weight)
            negative_elbo = -(mll - kld)
            
            return (mll, kld), negative_elbo
            
        else:
            return x_pred

    def update_prior(self):
        self.prior.encoder_old.set_weights(self.encoder.get_weights())


In [None]:
# Assuming hidden_dim, latent_dim, and input_dim are defined
hidden_dim = 128
latent_dim = 64
input_dim = 784  # Example for MNIST dataset

# Create an instance of the VAE
vae = VAE(hidden_dim, latent_dim, input_dim)

# Compile the VAE (You need to define an appropriate loss and optimizer)
vae.compile(optimizer='adam', loss='mean_squared_error')  # Example loss and optimizer

# Print the model summary
vae.build(input_shape=(None,input_dim,))  # Specify the input shape
vae.summary()

Model: "vae_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 keras_encoder_12 (KerasEnco  multiple                 184320    
 der)                                                            
                                                                 
 composite_prior_6 (Composit  multiple                 0 (unused)
 ePrior)                                                         
                                                                 
 dense_104 (Dense)           multiple                  50960     
                                                                 
Total params: 419,792
Trainable params: 235,280
Non-trainable params: 184,512
_________________________________________________________________


In [None]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Input, LayerNormalization, Dropout, Lambda
import tensorflow as tf
import tensorflow.keras.backend as K

# Swish activation function
def swish(x):
    return x * K.sigmoid(x)

class KerasEncoder(Model):
    def __init__(self, hidden_dim, latent_dim, input_dim, eps=1e-1):
        super(KerasEncoder, self).__init__()

        self.fc1 = Dense(hidden_dim, activation=swish)
        self.ln1 = LayerNormalization(epsilon=eps)
        self.fc2 = Dense(hidden_dim, activation=swish)
        self.ln2 = LayerNormalization(epsilon=eps)
        self.fc3 = Dense(hidden_dim, activation=swish)
        self.ln3 = LayerNormalization(epsilon=eps)
        self.fc4 = Dense(hidden_dim, activation=swish)
        self.ln4 = LayerNormalization(epsilon=eps)
        self.fc5 = Dense(hidden_dim, activation=swish)
        self.ln5 = LayerNormalization(epsilon=eps)
        self.fc_mu = Dense(latent_dim)
        self.fc_logvar = Dense(latent_dim)
        self.dropout = Dropout(rate=0.0)  # Initialize with a placeholder rate

    def call(self, inputs, training=False, dropout_rate=0.0):
        self.dropout.rate = dropout_rate

        # Normalize the input
        norm = tf.sqrt(tf.reduce_sum(tf.square(inputs), axis=-1, keepdims=True))
        x = inputs / norm

        x = self.dropout(x, training=training)

        h1 = self.ln1(self.fc1(x))
        h2 = self.ln2(self.fc2(h1) + h1)
        h3 = self.ln3(self.fc3(h2) + h1 + h2)
        h4 = self.ln4(self.fc4(h3) + h1 + h2 + h3)
        h5 = self.ln5(self.fc5(h4) + h1 + h2 + h3 + h4)

        return self.fc_mu(h5), self.fc_logvar(h5)

# Example usage
hidden_dim = 128
latent_dim = 64
input_dim = 256
dropout_rate = 0.5
encoder = KerasEncoder(hidden_dim, latent_dim, input_dim)

# Input data example
import numpy as np
input_data = np.random.rand(1, input_dim)
mu, logvar = encoder(input_data, dropout_rate=dropout_rate)


In [None]:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer
import numpy as np

# Assuming the KerasEncoder class is already defined as shown previously

def log_norm_pdf(z, mu, logvar):
    """
    Calculate the log probability of 'z' under the Gaussian distribution defined by 'mu' and 'logvar'.
    Ensure all constants and operations are in the same dtype (float32).
    """
    dtype = z.dtype  # Use the same dtype as the input tensor
    const = tf.constant(np.log(2.0 * np.pi), dtype=dtype)
    return -0.5 * (const + logvar + tf.square(z - mu) / tf.exp(logvar))


class CompositePrior(Model):
    def __init__(self, hidden_dim, latent_dim, input_dim, mixture_weights=[3/20, 3/4, 1/10]):
        super(CompositePrior, self).__init__()

        self.mixture_weights = np.log(mixture_weights)  # Log of mixture weights

        # Parameters
        self.mu_prior = tf.Variable(tf.zeros((1, latent_dim)), trainable=False)
        self.logvar_prior = tf.Variable(tf.zeros((1, latent_dim)), trainable=False)
        self.logvar_uniform_prior = tf.Variable(tf.fill((1, latent_dim), 10.0), trainable=False)

        # Encoder
        self.encoder_old = KerasEncoder(hidden_dim, latent_dim, input_dim)
        self.encoder_old.trainable = False

    def call(self, x, z):
        post_mu, post_logvar = self.encoder_old(x, training=False)

        stnd_prior = log_norm_pdf(z, self.mu_prior, self.logvar_prior)
        post_prior = log_norm_pdf(z, post_mu, post_logvar)
        unif_prior = log_norm_pdf(z, self.mu_prior, self.logvar_uniform_prior)

        gaussians = [stnd_prior, post_prior, unif_prior]
        gaussians = [g + w for g, w in zip(gaussians, self.mixture_weights)]

        density_per_gaussian = tf.stack(gaussians, axis=-1)

        return tf.reduce_logsumexp(density_per_gaussian, axis=-1)

# Example usage
hidden_dim = 128
latent_dim = 64
input_dim = 256
composite_prior = CompositePrior(hidden_dim, latent_dim, input_dim)

## Input data example
#z = np.random.rand(1, latent_dim)
#x = np.random.rand(1, input_dim)
#output = composite_prior(x, z)


In [None]:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
import numpy as np

# Assuming the KerasEncoder and CompositePrior classes are already defined as shown previously

class KerasVAE(Model):
    def __init__(self, hidden_dim, latent_dim, input_dim):
        super(KerasVAE, self).__init__()

        self.encoder = KerasEncoder(hidden_dim, latent_dim, input_dim)
        self.prior = CompositePrior(hidden_dim, latent_dim, input_dim)
        self.decoder = Dense(input_dim, activation='linear')  # Linear activation

    def reparameterize(self, mu, logvar):
        std = tf.exp(0.5 * logvar)
        eps = tf.random.normal(std.shape)
        return eps * std + mu

    def call(self, user_ratings, beta=None, gamma=1, dropout_rate=0.5, calculate_loss=True):
        mu, logvar = self.encoder(user_ratings, dropout_rate=dropout_rate)
        z = self.reparameterize(mu, logvar)
        x_pred = self.decoder(z)

        if calculate_loss:
            kl_weight = gamma * tf.reduce_sum(user_ratings, axis=-1) if gamma else beta

            mll = tf.reduce_mean(tf.reduce_sum(
                tf.nn.log_softmax(x_pred) * user_ratings, axis=-1))
            kld = tf.reduce_mean(
                tf.reduce_sum(log_norm_pdf(z, mu, logvar) - self.prior(user_ratings, z), axis=-1) * kl_weight)

            negative_elbo = -(mll - kld)
            return (mll, kld), negative_elbo
        else:
            return x_pred

    def update_prior(self):
        # Updating the prior's encoder with the current encoder's weights
        self.prior.encoder_old.set_weights(self.encoder.get_weights())

# Example usage
hidden_dim = 128
latent_dim = 64
input_dim = 256
vae = KerasVAE(hidden_dim, latent_dim, input_dim)

# Input data example
user_ratings = np.random.rand(1, input_dim)
output = vae(user_ratings, calculate_loss=True)


In [None]:
output