In [1]:
from tensorflow.keras import backend as K

def clear_memory():
    K.clear_session()
    tf.compat.v1.reset_default_graph()

2024-12-05 13:40:56.893448: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-05 13:40:57.585165: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [4]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Flatten, Reshape, Lambda, Conv2D, Conv2DTranspose
from tensorflow.keras.models import Model
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import matplotlib.pyplot as plt


# Directories containing the datasets
normal_dir = '/home/UFAD/mohitkukreja/Documents/data_perfect_driving/images/'
blurry_dir = '/home/UFAD/mohitkukreja/Documents/data_perfect_driving/images_blurred/'
dirt_dir = '/home/UFAD/mohitkukreja/Documents/data_perfect_driving/images_dirt/'
tape_dir = '/home/UFAD/mohitkukreja/Documents/data_perfect_driving/images_tape/'
directories = [normal_dir, blurry_dir, dirt_dir, tape_dir]

# Parameters
image_size = (128, 128)  # Resize all images to 128x128
batch_size = 64  # Adjust as per your system's capacity
latent_dim = 16  # Latent space dimension
epochs = 20

# Data loading function
def load_images_from_directories(directories, image_size):
    """
    Load and preprocess images from multiple directories.
    """
    all_images = []
    
    for directory in directories:
        for filename in os.listdir(directory):
            filepath = os.path.join(directory, filename)
            try:
                # Load image
                img = load_img(filepath, target_size=image_size)
                # Convert to numpy array
                img_array = img_to_array(img) / 255.0  # Normalize to [0, 1]
                all_images.append(img_array)
            except Exception as e:
                print(f"Error loading image {filepath}: {e}")
    
    return np.array(all_images)

# Load and preprocess the data
data = load_images_from_directories(directories, image_size)
print(f"Total images loaded: {data.shape[0]}")

# Shuffle data
np.random.shuffle(data)

# Split into train and validation sets
split_ratio = 0.8
split_index = int(len(data) * split_ratio)
x_train = data[:split_index]
x_val = data[split_index:]

print(f"Training set size: {x_train.shape[0]}, Validation set size: {x_val.shape[0]}")

# VAE Model definition
# Encoder
input_img = Input(shape=(128, 128, 3))
x = Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
x = Conv2D(64, (3, 3), activation='relu', padding='same', strides=(2, 2))(x)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
z_mean = Dense(latent_dim)(x)
z_log_var = Dense(latent_dim)(x)

# Sampling function
def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])

# Decoder
decoder_input = Input(shape=(latent_dim,))
x = Dense(16 * 16 * 64, activation='relu')(decoder_input)
x = Reshape((16, 16, 64))(x)
x = Conv2DTranspose(64, (3, 3), activation='relu', padding='same', strides=(2, 2))(x)
x = Conv2DTranspose(32, (3, 3), activation='relu', padding='same')(x)
decoded_img = Conv2DTranspose(3, (3, 3), activation='sigmoid', padding='same')(x)

# Define encoder and decoder models
encoder = Model(input_img, [z_mean, z_log_var, z], name="encoder")
decoder = Model(decoder_input, decoded_img, name="decoder")

# Connect encoder and decoder into a VAE model
output_img = decoder(encoder(input_img)[2])
vae = Model(input_img, output_img, name="vae")

# VAE Loss
reconstruction_loss = MeanSquaredError()(K.flatten(input_img), K.flatten(output_img))
kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')

# Train the VAE
history = vae.fit(x_train, x_train, 
                  epochs=epochs, 
                  batch_size=batch_size, 
                  validation_data=(x_val, x_val))

# Evaluate the VAE
def plot_reconstructions(vae, images, n=10):
    """
    Plot original and reconstructed images.
    """
    reconstructed = vae.predict(images[:n])
    fig, axes = plt.subplots(2, n, figsize=(20, 4))
    for i in range(n):
        # Original
        axes[0, i].imshow(images[i])
        axes[0, i].axis('off')
        # Reconstructed
        axes[1, i].imshow(reconstructed[i])
        axes[1, i].axis('off')
    plt.show()

# Visualize reconstructions
plot_reconstructions(vae, x_val)

# Anomaly detection
def compute_anomaly_score(vae, images):
    """
    Compute anomaly scores as reconstruction loss for given images.
    """
    reconstructed = vae.predict(images)
    return np.mean(np.square(images - reconstructed), axis=(1, 2, 3))

anomaly_scores = compute_anomaly_score(vae, x_val)
threshold = np.percentile(anomaly_scores, 95)  # Set threshold at 95th percentile
anomalies = x_val[anomaly_scores > threshold]

print(f"Number of anomalies detected: {len(anomalies)}")

# Visualize anomalies
plot_reconstructions(vae, anomalies, n=10)


NameError: name 'folder' is not defined