In [None]:
import os
import tensorflow as tf
from tensorflow.keras import layers, models, backend as K
from tensorflow.keras.datasets import mnist
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import roc_curve, auc

# ---------------------------
# Step 1: GPU Configuration
# ---------------------------

os.environ["CUDA_VISIBLE_DEVICES"] = "1"  

# b. Enable Dynamic GPU Memory Allocation
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("Memory growth set for GPUs")
    except RuntimeError as e:
        print(e)


import numpy as np
print(f"NumPy version: {np.__version__}")

# ---------------------------
# Step 3: Define the VAE Components
# ---------------------------

# Sampling function using the reparameterization trick
def sampling(args):
    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

# Load and preprocess MNIST data
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_train = np.reshape(x_train, (-1, 28, 28, 1))
x_test = x_test.astype('float32') / 255.
x_test = np.reshape(x_test, (-1, 28, 28, 1))

# Encoder
latent_dim = 10
encoder_inputs = layers.Input(shape=(28, 28, 1))
x = layers.Conv2D(32, 3, activation='relu', strides=2, padding='same')(encoder_inputs)
x = layers.Conv2D(64, 3, activation='relu', strides=2, padding='same')(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation='relu')(x)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = layers.Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])
encoder = models.Model(encoder_inputs, [z_mean, z_log_var, z], name='encoder')
encoder.summary()

# Decoder
latent_inputs = layers.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(7 * 7 * 64, activation='relu')(latent_inputs)
x = layers.Reshape((7, 7, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation='relu', strides=2, padding='same')(x)
x = layers.Conv2DTranspose(32, 3, activation='relu', strides=2, padding='same')(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation='sigmoid', padding='same')(x)
decoder = models.Model(latent_inputs, decoder_outputs, name='decoder')
decoder.summary()

# VAE Model with custom loss and metrics
class VAEWithMetrics(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAEWithMetrics, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = tf.keras.metrics.Mean(name="elbo")
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        
        # Reconstruction loss
        reconstruction_loss = tf.keras.losses.binary_crossentropy(
            K.flatten(inputs), K.flatten(reconstructed)
        )
        reconstruction_loss *= 28 * 28
        reconstruction_loss = tf.reduce_mean(reconstruction_loss)
        
        # KL divergence
        kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
        kl_loss = tf.reduce_sum(kl_loss, axis=-1)
        kl_loss *= -0.5
        kl_loss = tf.reduce_mean(kl_loss)
        
        # Total loss
        total_loss = reconstruction_loss + kl_loss
        self.add_loss(total_loss)
        
        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return reconstructed
    
    @property
    def metrics(self):
        return [self.total_loss_tracker, self.kl_loss_tracker]

# Custom callback to store losses
class VAEHistory(tf.keras.callbacks.Callback):
    def __init__(self):
        super(VAEHistory, self).__init__()
        self.elbo = []
        self.kl_loss = []
    
    def on_epoch_end(self, epoch, logs=None):
        self.elbo.append(logs.get('elbo'))
        self.kl_loss.append(logs.get('kl_loss'))

# Initialize and compile VAE
vae = VAEWithMetrics(encoder, decoder)
learning_rate = 1e-3
optimizer = Adam(learning_rate=learning_rate)
vae.compile(optimizer=optimizer)

# Initialize callback
history_callback = VAEHistory()

# ---------------------------
# Step 4: Train the VAE
# ---------------------------

# Reduce batch size to prevent OOM
history = vae.fit(
    x_train,
    epochs=50,
    batch_size=128,  # Reduced from 128
    validation_data=(x_test, None),
    callbacks=[history_callback]
)

# ---------------------------
# Step 5: Plot ELBO and KL-Divergence Separately
# ---------------------------

# Plot ELBO Loss
plt.figure(figsize=(10, 5))
plt.plot(history_callback.elbo, label='ELBO Loss')
plt.xlabel('Epochs')
plt.ylabel('ELBO Loss')
plt.title('ELBO Loss Over Epochs')
plt.legend()
plt.show()

# Plot KL-Divergence
plt.figure(figsize=(10, 5))
plt.plot(history_callback.kl_loss, label='KL Divergence', color='orange')
plt.xlabel('Epochs')
plt.ylabel('KL Divergence')
plt.title('KL Divergence Over Epochs')
plt.legend()
plt.show()

# ---------------------------
# Step 6: Visualize Reconstructions
# ---------------------------

def plot_reconstructions(model, data, n=10):
    z_mean, z_log_var, z = model.encoder.predict(data)
    reconstructed = model.decoder.predict(z)
    
    plt.figure(figsize=(20, 4))
    for i in range(n):
        # Original
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(data[i].reshape(28, 28), cmap='gray')
        plt.axis('off')
        
        # Reconstructed
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(reconstructed[i].reshape(28, 28), cmap='gray')
        plt.axis('off')
    plt.show()

# Plot reconstructions
plot_reconstructions(vae, x_test)

# ---------------------------
# Step 7: Generate New Images
# ---------------------------

def plot_generated_images(decoder, latent_dim=10, num_images=10):

    # Sample from standard normal distribution
    z_samples = np.random.normal(size=(num_images, latent_dim))
    
    # Generate images using the decoder
    generated = decoder.predict(z_samples)
    
    # Plot the generated images
    plt.figure(figsize=(num_images * 2, 2))
    for i in range(num_images):
        ax = plt.subplot(1, num_images, i + 1)
        plt.imshow(generated[i].squeeze(), cmap='gray')
        plt.axis('off')
    plt.tight_layout()
    

# Plot generated images
plot_generated_images(decoder, latent_dim=latent_dim, num_images=15)

# ---------------------------
# Step 8: VAE Latent Space Visualization
# ---------------------------

# Encode the test data
z_mean, z_log_var, z = encoder.predict(x_test, batch_size=64)

# Load test labels for coloring
(_, y_train_labels), (_, y_test_labels) = mnist.load_data()
y_test_labels = y_test_labels.astype('int')

# Create a scatter plot of the latent space
plt.figure(figsize=(12, 10))
scatter = plt.scatter(z[:, 0], z[:, 1], c=y_test_labels, cmap='tab10', alpha=0.5)
plt.colorbar(scatter, ticks=range(10))
plt.xlabel('z[0]')
plt.ylabel('z[1]')
plt.title('VAE Latent Space Visualization')
plt.show()

# ---------------------------
# Step 9: VAE for Anomaly Detection
# ---------------------------

# Function to add noise to create anomalous data
def add_noise(data, noise_factor=0.5):
    noisy = data + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=data.shape)
    noisy = np.clip(noisy, 0., 1.)
    return noisy

# Create noisy test data as anomalies
x_test_noisy = add_noise(x_test.copy(), noise_factor=0.5)

# Compute Reconstruction Errors
def compute_reconstruction_errors(model, data):
    reconstructed = model.predict(data)
    errors = np.mean(np.square(data - reconstructed), axis=(1,2,3))
    return errors

# Compute errors for normal and anomalous data
errors_normal = compute_reconstruction_errors(vae, x_test)
errors_anomalous = compute_reconstruction_errors(vae, x_test_noisy)

# Plot the Distribution of Reconstruction Errors
plt.figure(figsize=(10,6))
plt.hist(errors_normal, bins=50, alpha=0.6, label='Normal', color='green', density=True)
plt.hist(errors_anomalous, bins=50, alpha=0.6, label='Anomalous', color='red', density=True)
plt.xlabel('Reconstruction Error')
plt.ylabel('Density')
plt.title('Distribution of Reconstruction Errors')
plt.legend()
plt.show()

# ---------------------------
# Step 10: Set Threshold and Classify Anomalies
# ---------------------------

# Set threshold based on the 95th percentile of normal reconstruction errors
threshold = np.percentile(errors_normal, 95)
print(f"Reconstruction error threshold: {threshold}")

# Classify
y_pred_normal = errors_normal < threshold
y_pred_anomalous = errors_anomalous < threshold

# Calculate accuracy
from sklearn.metrics import classification_report

# Create labels: 0 for normal, 1 for anomalous
y_true = np.concatenate([np.zeros_like(errors_normal), np.ones_like(errors_anomalous)])
y_pred = np.concatenate([y_pred_normal, y_pred_anomalous])

print(classification_report(y_true, y_pred, target_names=['Normal', 'Anomalous']))

# ---------------------------
# Step 11: ROC PLOT
# ---------------------------

# Create a combined array of all errors
errors_all = np.concatenate([errors_normal, errors_anomalous])

# Create y_true where 0 denotes normal and 1 denotes anomalous
y_true = np.concatenate([np.zeros(len(errors_normal)), np.ones(len(errors_anomalous))])

# Compute ROC curve and AUC using reconstruction errors as scores
fpr, tpr, thresholds = roc_curve(y_true, errors_all)
roc_auc = auc(fpr, tpr)

# Plot ROC Curve
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([-0.01, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()
