<a href="https://colab.research.google.com/github/ashmangla/ashmangla.gitbhub.io/blob/main/AutoencoderforAnomalydetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np


In [3]:
# Create a synthetic dataset
def generate_data(samples=1000, features=20, anomalies=10):
    normal_data = np.random.normal(0, 1, size=(samples - anomalies, features))
    anomalies_data = np.random.uniform(-5, 5, size=(anomalies, features))
    data = np.vstack([normal_data, anomalies_data])
    np.random.shuffle(data)
    return data

data = generate_data()

In [4]:
pip install tensorflow



In [5]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model

In [10]:
import pandas as pd
from sklearn.model_selection import train_test_split
# Load the data from a CSV file
#data = pd.read_csv('data.csv')

# Split the data into training and testing sets
X_train, X_test = train_test_split(data, test_size=0.25, random_state=42)

In [12]:
# Scale the data to the range [0, 1]
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [13]:
input_dim = X_train.shape[1]
# Encoder
input_layer = Input(shape=(input_dim,))
encoded = Dense(128, activation='relu')(input_layer)
encoded = Dense(64, activation='relu')(encoded)
encoded = Dense(32, activation='relu')(encoded)
# Decoder
decoded = Dense(64, activation='relu')(encoded)
decoded = Dense(128, activation='relu')(decoded)
decoded = Dense(input_dim, activation='sigmoid')(decoded)
# Autoencoder model
autoencoder = Model(inputs=input_layer, outputs=decoded)

In [14]:
autoencoder.compile(optimizer='adam', loss='mse')

In [15]:
autoencoder.fit(X_train, X_train, epochs=100, validation_split=0.2)

Epoch 1/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 39ms/step - loss: 0.0170 - val_loss: 0.0145
Epoch 2/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 17ms/step - loss: 0.0142 - val_loss: 0.0139
Epoch 3/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 0.0138 - val_loss: 0.0131
Epoch 4/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.0138 - val_loss: 0.0124
Epoch 5/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - loss: 0.0127 - val_loss: 0.0117
Epoch 6/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - loss: 0.0115 - val_loss: 0.0108
Epoch 7/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0114 - val_loss: 0.0103
Epoch 8/100
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0099 - val_loss: 0.0098
Epoch 9/100
[1m19/19[0m [32m━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x7d4e42062f10>

In [16]:
# Evaluate the model on the test data
loss = autoencoder.evaluate(X_test, X_test)
mse = autoencoder.evaluate(X_test, X_test)

print('Test loss:', loss)
print('Test mean squared error:', mse)

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0010 
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0010 
Test loss: 0.0008770273416303098
Test mean squared error: 0.0008770273416303098


In [17]:
# Calculate the reconstruction errors for the test data
reconstruction_errors = autoencoder.predict(X_test) - X_test

# Threshold the reconstruction errors to identify anomalies
anomaly_threshold = 0.1

anomalies = np.where(reconstruction_errors > anomaly_threshold)[0]

# Print the number of anomalies detected
print('Number of anomalies detected:', len(anomalies))

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
Number of anomalies detected: 30


In [18]:
anomalies

array([  2,   6,  15,  30,  30,  30,  30,  30,  30,  30,  30,  33,  55,
        77,  77, 101, 101, 101, 114, 130, 134, 138, 151, 161, 165, 167,
       169, 185, 195, 206])

# GANS for Anomaly detection

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt

# Define the generator model
def build_generator(latent_dim):
    model = tf.keras.Sequential()
    model.add(layers.Dense(256, input_dim=latent_dim, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Dense(784, activation='sigmoid'))
    model.add(layers.Reshape((28, 28, 1)))
    return model

# Define the discriminator model
def build_discriminator(img_shape):
    model = tf.keras.Sequential()
    model.add(layers.Flatten(input_shape=img_shape))
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dense(256, activation='relu'))
    model.add(layers.Dense(1, activation='sigmoid'))
    return model

# Define the GAN model
def build_gan(generator, discriminator):
    discriminator.trainable = False  # Freeze discriminator during GAN training
    model = tf.keras.Sequential()
    model.add(generator)
    model.add(discriminator)
    return model

# Function to compile models
def compile_models(generator, discriminator, gan, latent_dim):
    discriminator.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
                          loss='binary_crossentropy',
                          metrics=['accuracy'])

    gan.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
                loss='binary_crossentropy')

# Function to generate random noise for the generator
def generate_latent_points(latent_dim, batch_size):
    return np.random.normal(0, 1, size=(batch_size, latent_dim))

# Function to train the GAN
def train_gan(generator, discriminator, gan, dataset, latent_dim, epochs, batch_size):
    batch_per_epoch = dataset.shape[0] // batch_size

    for epoch in range(epochs):
        for batch in range(batch_per_epoch):
            noise = generate_latent_points(latent_dim, batch_size)
            generated_data = generator.predict(noise)

            real_data = dataset[np.random.randint(0, dataset.shape[0], batch_size)]
            labels_real = np.ones((batch_size, 1))
            labels_fake = np.zeros((batch_size, 1))

            d_loss_real = discriminator.train_on_batch(real_data, labels_real)
            d_loss_fake = discriminator.train_on_batch(generated_data, labels_fake)

            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            noise = generate_latent_points(latent_dim, batch_size)
            labels_gan = np.ones((batch_size, 1))

            g_loss = gan.train_on_batch(noise, labels_gan)

            print(f"Epoch {epoch + 1}/{epochs}, Batch {batch}/{batch_per_epoch}, D Loss: {d_loss[0]}, G Loss: {g_loss}")

# Function to generate and plot synthetic data
def generate_and_plot(generator, latent_dim, examples=10):
    noise = generate_latent_points(latent_dim, examples)
    generated_data = generator.predict(noise)

    for i in range(examples):
        plt.subplot(2, 5, i+1)
        plt.imshow(generated_data[i, :, :, 0], cmap='gray_r')
        plt.axis('off')

    plt.show()

# Example usage
latent_dim = 100
img_shape = (28, 28, 1)

# Build and compile the models
generator = build_generator(latent_dim)
discriminator = build_discriminator(img_shape)
gan = build_gan(generator, discriminator)
compile_models(generator, discriminator, gan, latent_dim)

# Load and preprocess your dataset (e.g., MNIST)
(train_images, _), (_, _) = tf.keras.datasets.mnist.load_data()
train_images = train_images / 127.5 - 1.0  # Normalize images to the range [-1, 1]
train_images = np.expand_dims(train_images, axis=-1)

# Train the GAN
train_gan(generator, discriminator, gan, train_images, latent_dim, epochs=100, batch_size=64)

# Generate and plot synthetic data
generate_and_plot(generator, latent_dim)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
  super().__init__(**kwargs)


[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step




[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
Epoch 96/100, Batch 528/937, D Loss: 5.654833793640137, G Loss: 0.0004655024968087673
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
Epoch 96/100, Batch 529/937, D Loss: 5.654836654663086, G Loss: 0.00046549743274226785
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
Epoch 96/100, Batch 530/937, D Loss: 5.654839992523193, G Loss: 0.0004654923686757684
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
Epoch 96/100, Batch 531/937, D Loss: 5.654842853546143, G Loss: 0.0004654873046092689
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
Epoch 96/100, Batch 532/937, D Loss: 5.65484619140625, G Loss: 0.00046548224054276943
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
Epoch 96/100, Batch 533/937, D Loss: 5.654849052429199, G Loss: 0.000465