In [None]:
def xavier_normal_init(shape):
  input_dim, output_dim = shape
  std = tf.sqrt(2.)/tf.sqrt(tf.cast(input_dim + output_dim, dtype = tf.float32))
  weight_values = tf.random.normal(shape, stddev=std)
  return weight_values

In [None]:
class DenseLayer(tf.Module):
  def __init__(self, output_dim, weight_init_fn = xavier_normal_init, activation = tf.identity):
    self.output_dim = output_dim
    self.weight_init_fn = weight_init_fn
    self.activation = activation
    self.built = False

  def __call__(self, x):
    if not self.built:
      self.input_dim = x.shape[-1]
      self.weight = tf.Variable(self.weight_init_fn((self.input_dim, self.output_dim)), name = "weight")
      self.bias = tf.Variable(tf.zeros((self.output_dim, )), name = "bias")
      self.built = True
    z = tf.add(tf.matmul(x, self.weight), self.bias)
    return self.activation(z)

In [None]:
class AdamOptimizer:
  def __init__(self, learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-8):
    self.learning_rate = learning_rate
    self.beta_1 = beta_1
    self.beta_2 = beta_2
    self.learning_rate = learning_rate
    self.epsilon = epsilon
    self.t = 1.
    self.v_dvar, self.s_dvar = [], []
    self.built = False

  def apply_gradients(self, grads, vars):
    if not self.built:
      for var in vars:
        s = tf.Variable(tf.zeros(var.shape))
        v = tf.Variable(tf.zeros(var.shape))
        self.v_dvar.append(v)
        self.s_dvar.append(s)
      self.built = True
    for i, (grad, var) in enumerate(zip(grads, vars)):
      self.v_dvar[i].assign(self.beta_1 * self.v_dvar[i] + (1 - self.beta_1) * grad)
      self.s_dvar[i].assign(self.beta_2 * self.s_dvar[i] + (1 - self.beta_2) * tf.square(grad))
      v_corrected = self.v_dvar[i] / (1 - tf.pow(self.beta_1, self.t))
      s_corrected = self.s_dvar[i] / (1 - tf.pow(self.beta_2, self.t))
      var.assign_sub(self.learning_rate * v_corrected / (tf.sqrt(s_corrected) + self.epsilon))
    self.t += 1

In [None]:
class BetaScheduler(keras.callbacks.Callback):
    def __init__(self, beta, initial_beta=1.0, min_beta=0.1, decay_rate=0.1):
        super().__init__()
        self.beta = beta
        self.initial_beta = initial_beta
        self.min_beta = min_beta
        self.decay_rate = decay_rate

    def on_epoch_begin(self, epoch, logs=None):
        new_beta = max(self.min_beta, self.initial_beta - self.decay_rate * epoch)   #min(self.max_beta, (epoch + 1) / self.anneal_epochs * self.max_beta)
        self.beta.assign(new_beta)
        print(f"Epoch {epoch+1}: Beta = {self.beta.numpy():.4f}")

# Initialize Beta
beta = tf.Variable(0.0, trainable=False, dtype=tf.float32)
beta_scheduler = BetaScheduler(beta)

In [None]:
class Lagrange_Constrained_VAE_Encoder(keras.Model):
  def __init__(self, latent_dim):

    super(Lagrange_Constrained_VAE_Encoder, self).__init__()
    self.latent_dim = latent_dim
    self.conv1 = layers.Conv2D(32, (3, 3), activation="relu", padding="same")
    self.conv2 = layers.Conv2D(32, (3, 3), activation="relu", padding="same")  # 1 Conv Layer
    self.pool = layers.MaxPooling2D((2, 2))
    self.flatten = layers.GlobalMaxPooling2D()
    self.dense1 = layers.Dense(128, activation=tf.nn.relu)
    self.loc_layer = layers.Dense(latent_dim)  # Mean layer
    self.logvar_layer = layers.Dense(latent_dim)  # Log variance layer
    self.seed_generator = keras.random.SeedGenerator(1337)


  def sample_latent(self, inputs):
    z_mean, z_log_var = inputs
    batch_size = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = keras.random.normal(shape=(batch_size, dim), seed = self.seed_generator)
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

  def call(self, inputs):
    x = self.conv1(inputs)
    x = self.conv2(x)
    x = self.pool(x)
    x = self.flatten(x)  # Convert (10, 12) → (120)
    x = self.dense1(x)
    mu = self.loc_layer(x)  # Mean of latent distribution
    logvar = self.logvar_layer(x)  # Log variance
    z = self.sample_latent([mu, logvar])

    return mu, logvar, z

In [None]:
class Lagrange_Constrained_VAE_Decoder2(keras.Model):
    def __init__(self,output_shape=(10, 12, 1)):
        super(Lagrange_Constrained_VAE_Decoder2, self).__init__()

        self.output_shape = output_shape

        # Expand latent vector to a feature map
        self.dense1 = layers.Dense(128, activation="relu")
        self.dense2 = layers.Dense(64 * 5 * 6, activation="relu")  # Increased spatial size
        self.reshape = layers.Reshape((5, 6, 64))  # Larger reshaped feature map

        # Upsampling to match (10,12)
        self.convT1 = layers.Conv2DTranspose(64, (3, 3), strides=2, activation="relu", padding="same")
        self.convT2 = layers.Conv2DTranspose(32, (3, 3), activation="relu", padding="same")
        self.convT3 = layers.Conv2DTranspose(1, (3, 3), activation="sigmoid", padding="same")  # Final output

    def call(self, z):
        x = self.dense1(z)
        x = self.dense2(x)
        x = self.reshape(x)  # Now (5,6,64)
        x = self.convT1(x)  # Upsample to (10,12,32)
        x = self.convT2(x)  # Keep spatial size (10,12,32)
        x = self.convT3(x)  # Final output (10,12,1)
        return x



In [None]:
class Lagrange_Constrained_VAE(keras.Model):
    def __init__(self,encoder, decoder, lambda_lagrange_init = 1.0, **kwargs):
        super(Lagrange_Constrained_VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.lambda_lagrange = tf.Variable(lambda_lagrange_init, trainable = True, dtype = tf.float32)
        self.total_loss = keras.metrics.Mean(name = 'total_loss')
        self.reconstruction_loss = keras.metrics.Mean(name = 'reconstruction_loss')
        self.kl_loss = keras.metrics.Mean(name = 'kl_loss')
        self.constraint_loss = keras.metrics.Mean(name = 'constraints_loss')

    @property
    def metrics(self):
      return [
              self.total_loss,
              self.reconstruction_loss,
              self.kl_loss,
              self.constraint_loss
      ]

    def train_step(self, data):
      with tf.GradientTape() as tape:
        z_mean, z_log_var, z = self.encoder(data)
        reconstruction = self.decoder(z)
        reconstruction_loss = tf.reduce_mean(
          tf.reduce_sum(
              keras.losses.binary_crossentropy(data, reconstruction), axis=(1)
          )
      )


        tf.keras.losses.MeanSquaredError(reduction = "sum_over_batch_size")(data, reconstruction)



        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        constraint_loss = tf.reduce_mean(self.lambda_lagrange*tf.square(tf.reduce_sum(z_log_var, axis = 1) - 1))
        total_loss = reconstruction_loss + kl_loss # + constraint_loss

      grads = tape.gradient(total_loss, self.trainable_variables)
      self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
      self.total_loss.update_state(total_loss)
      self.reconstruction_loss.update_state(reconstruction_loss)
      self.kl_loss.update_state(kl_loss)
      self.constraint_loss.update_state(constraint_loss)
      return {
          "loss": self.total_loss.result(),
          "reconstruction_loss": self.reconstruction_loss.result(),
          "kl_loss": self.kl_loss.result(),
          "constraint_loss": self.constraint_loss.result()
      }



In [None]:

class VAE_Encoder(keras.Model):
  def __init__(self, latent_dim):

    super(VAE_Encoder, self).__init__()
    self.latent_dim = latent_dim
    self.conv1 = layers.Conv2D(filters=32, kernel_size=3, strides=(2, 2), activation='relu')
    self.flatten = layers.Flatten()
    self.dense1 = layers.Dense(128, activation=tf.nn.relu)
    self.dense2 = layers.Dense(128, activation=tf.nn.relu)
    self.loc_layer = layers.Dense(latent_dim)  # Mean layer
    self.logvar_layer = layers.Dense(latent_dim)  # Log variance layer
    self.seed_generator = keras.random.SeedGenerator(1337)


  def sample_latent(self, inputs):
    z_mean, z_log_var = inputs
    batch_size = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = keras.random.normal(shape=(batch_size, dim), seed = self.seed_generator)
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

  def call(self, inputs):
    x = self.conv1(inputs)
    x = self.flatten(x)  # Convert (10, 12) → (120)
    x = self.dense1(x)
    x = self.dense2(x)
    mu = self.loc_layer(x)  # Mean of latent distribution
    logvar = self.logvar_layer(x)  # Log variance
    z = self.sample_latent([mu, logvar])

    return mu, logvar, z


In [None]:
class VAE_Decoder(keras.Model):
    def __init__(self,output_shape=(10, 12, 1)):
        super(VAE_Decoder, self).__init__()

        self.output_shape = output_shape

        # Expand latent vector to a feature map
        self.dense1 = layers.Dense(128, activation="relu")
        self.dense2 = layers.Dense(128, activation="relu")  # Increased spatial size
        self.dense3 = layers.Dense(5*6*32, activation="relu")
        self.reshape = layers.Reshape((5, 6, 32))  # Larger reshaped feature map

        # Upsampling to match (10,12)
        self.convT1 = layers.Conv2DTranspose(filters=32, kernel_size=3, strides=2, activation="relu", padding="same")
        self.convT2 = layers.Conv2DTranspose(filters=1, kernel_size=3, strides=1, padding='same', activation='sigmoid')  # Final output

    def call(self, z):
        x = self.dense1(z)
        x = self.dense2(x)
        x = self.dense3(x)
        x = self.reshape(x)  # Now (5,6,64)
        x = self.convT1(x)  # Upsample to (10,12,32)
        x = self.convT2(x)  # Keep spatial size (10,12,32)
        return x

In [None]:
class Encoder(keras.Model):
  def __init__(self, latent_dim):

    super(Encoder, self).__init__()
    self.latent_dim = latent_dim
    self.conv1 = layers.Conv2D(filters=32, kernel_size=3, strides=(2, 2), activation='relu')
    self.flatten = layers.Flatten()
    self.dense1 = layers.Dense(128, activation=tf.nn.relu)
    self.dense2 = layers.Dense(128, activation=tf.nn.relu)
    self.latent_layer = layers.Dense(latent_dim)  # Mean layer


  def call(self, inputs):
    x = self.conv1(inputs)
    x = self.flatten(x)  # Convert (10, 12) → (120)
    x = self.dense1(x)
    x = self.dense2(x)
    z = self.latent_layer(x)  # Mean of latent distribution


    return z


In [None]:
class Decoder(keras.Model):
    def __init__(self,output_shape=(10, 12, 1)):
        super(Decoder, self).__init__()

        self.output_shape = output_shape

        # Expand latent vector to a feature map
        self.dense1 = layers.Dense(128, activation="relu")
        self.dense2 = layers.Dense(128, activation="relu")  # Increased spatial size
        self.dense3 = layers.Dense(5*6*32, activation="relu")
        self.reshape = layers.Reshape((5, 6, 32))  # Larger reshaped feature map

        # Upsampling to match (10,12)
        self.convT1 = layers.Conv2DTranspose(filters=32, kernel_size=3, strides=2, activation="relu", padding="same")
        self.convT2 = layers.Conv2DTranspose(filters=1, kernel_size=3, strides=1, padding='same', activation='sigmoid')  # Final output

    def call(self, z):
        x = self.dense1(z)
        x = self.dense2(x)
        x = self.dense3(x)
        x = self.reshape(x)  # Now (5,6,64)
        x = self.convT1(x)  # Upsample to (10,12,32)
        x = self.convT2(x)  # Keep spatial size (10,12,32)
        return x

In [None]:
class Autoencoder(keras.Model):
    def __init__(self,encoder, decoder, **kwargs):
        super(Autoencoder, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss = keras.metrics.Mean(name = 'total_loss')
        self.reconstruction_loss = keras.metrics.Mean(name = 'reconstruction_loss')

    @property
    def metrics(self):
      return [
              self.total_loss,
              self.reconstruction_loss,

      ]

    def train_step(self, data):
      with tf.GradientTape() as tape:
        z = self.encoder(data)
        reconstruction = self.decoder(z)
        reconstruction_loss = tf.reduce_mean(
          tf.reduce_sum(
              keras.losses.binary_crossentropy(data, reconstruction), axis=(1)
          )
      )



        total_loss = reconstruction_loss

      grads = tape.gradient(total_loss, self.trainable_variables)
      self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
      self.total_loss.update_state(total_loss)
      self.reconstruction_loss.update_state(reconstruction_loss)
      return {
          "loss": self.total_loss.result(),
          "reconstruction_loss": self.reconstruction_loss.result(),

      }





In [None]:
epochs = 15
# set the dimensionality of the latent space to a plane for visualization later
latent_dim = 2
output_shape = (10, 12,1)
encoder = Encoder(latent_dim)
decoder = Decoder(output_shape)
model = Autoencoder(encoder, decoder)

In [None]:
model.compile(optimizer = keras.optimizers.Adam(learning_rate = .0001))
history = model.fit(train_data, epochs = 10)

In [None]:
epochs = 15
# set the dimensionality of the latent space to a plane for visualization later
latent_dim = 2
output_shape = (10, 12,1)
num_examples_to_generate = 16

# keeping the random vector constant for generation (prediction) so
# it will be easier to see the improvement.
random_vector_for_generation = tf.random.normal(
    shape=[num_examples_to_generate, latent_dim])
encoder = VAE_Encoder(latent_dim)
decoder = VAE_Decoder(output_shape)
model = Lagrange_Constrained_VAE(encoder, decoder)

In [None]:
model2 = GMM_VAE(latent_dim, 3, encoder, decoder)
model2.compile(optimizer = keras.optimizers.Adam(learning_rate = .0001))
model2.fit(train_data, epochs = 10)

In [None]:
model.compile(optimizer = keras.optimizers.Adam(learning_rate = .0001))
history = model.fit(train_data, epochs = 10, callbacks = [beta_scheduler])

In [None]:
for batch in train_data.take(1):  # Take only the first batch
    x1 = batch
x1[0].shape

In [None]:
from tensorflow.keras.utils import plot_model
plot_model(model, to_file="vae_architecture.png", show_shapes=True, show_layer_names=True, expand_nested=True)

In [None]:
plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.imshow(x1[26].numpy().reshape(10, 12), cmap='gray')  # Reshape if needed
plt.title("Original")

plt.subplot(1,2,2)
plt.imshow(recons[26].numpy().reshape(10, 12), cmap = 'gray')  # Reshape if needed
plt.title("Reconstructed")

plt.show()