In [16]:
# Install necessary libraries
!pip install numpy pandas scikit-learn tensorflow



In [27]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Lambda, BatchNormalization
from tensorflow.keras.models import Model
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import classification_report
import pickle


In [28]:
# Generate synthetic dataset
np.random.seed(42)

# Normal and anomalous data
normal_data = np.random.normal(loc=0.5, scale=0.1, size=(900, 10))
anomaly_data = np.random.normal(loc=1.0, scale=0.1, size=(100, 10))

# Combine and label
data = np.vstack([normal_data, anomaly_data])
labels = np.array([0] * 900 + [1] * 100)

# Normalize the dataset
scaler = MinMaxScaler()
data = scaler.fit_transform(data)

# Split into train and test sets
X_train = data[:800]  # 800 samples for training
X_test = data[800:]   # Remaining 200 samples for testing
y_test = labels[800:] # True labels for X_test


In [29]:
# Define the latent space dimension
latent_dim = 2

# Encoder
inputs = Input(shape=(10,), name="encoder_input")
x = Dense(64, activation='relu')(inputs)
x = BatchNormalization()(x)
x = Dense(32, activation='relu')(x)
x = BatchNormalization()(x)

z_mean = Dense(latent_dim, name="z_mean")(x)
z_log_var = Dense(latent_dim, name="z_log_var")(x)

def sampling(args):
    z_mean, z_log_var = args
    epsilon = tf.random.normal(shape=(tf.shape(z_mean)[0], latent_dim), mean=0., stddev=1.0)
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

z = Lambda(sampling, output_shape=(latent_dim,), name="z")([z_mean, z_log_var])

# Decoder
decoder_inputs = Input(shape=(latent_dim,), name="decoder_input")
x = Dense(32, activation='relu')(decoder_inputs)
x = BatchNormalization()(x)
x = Dense(64, activation='relu')(x)
x = BatchNormalization()(x)
outputs = Dense(10, activation='sigmoid', name="decoder_output")(x)

# Encoder and Decoder models
encoder = Model(inputs, [z_mean, z_log_var, z], name="encoder")
decoder = Model(decoder_inputs, outputs, name="decoder")

# Full VAE model
vae_outputs = decoder(encoder(inputs)[2])

# Define custom VAE model with integrated loss
class VAE(Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Reconstruction loss
        reconstruction_loss = tf.reduce_mean(tf.reduce_sum(tf.square(inputs - reconstructed), axis=1))
        # KL divergence loss
        kl_loss = -0.5 * tf.reduce_mean(tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1))
        self.add_loss(reconstruction_loss + kl_loss)
        return reconstructed

vae = VAE(encoder, decoder)
vae.compile(optimizer=tf.keras.optimizers.Adam())


In [30]:
vae.fit(X_train, X_train, epochs=50, batch_size=32, validation_split=0.1, verbose=1)

Epoch 1/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 14ms/step - loss: 0.3943 - val_loss: 0.1927
Epoch 2/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.1741 - val_loss: 0.1234
Epoch 3/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.1186 - val_loss: 0.1202
Epoch 4/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.1039 - val_loss: 0.1119
Epoch 5/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.1020 - val_loss: 0.1015
Epoch 6/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.0993 - val_loss: 0.1048
Epoch 7/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.0978 - val_loss: 0.1013
Epoch 8/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.0954 - val_loss: 0.1025
Epoch 9/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

<keras.src.callbacks.history.History at 0x7b3f39882710>

In [31]:
vae.save("vae_anomaly_detection_model.h5")

# Save scaler for deployment
with open("scaler.pkl", "wb") as f:
    pickle.dump({"scaler": scaler}, f)

print("Model and scaler saved successfully!")




Model and scaler saved successfully!


In [35]:
# Generate a separate test dataset without labels
np.random.seed(42)
test_data = np.vstack([
    np.random.normal(loc=0.5, scale=0.1, size=(50, 10)),  # Normal data
    np.random.normal(loc=1.5, scale=0.2, size=(10, 10))   # Data with anomalies
])

# Normalize the test data using the same scaler
test_data = scaler.transform(test_data)

# Test the VAE on the new test dataset
reconstructed = vae.predict(test_data)
reconstruction_errors = np.mean(np.square(test_data - reconstructed), axis=1)

# Determine threshold for anomaly detection
threshold = np.percentile(reconstruction_errors, 95)  # Use 95th percentile

# Classify anomalies based on reconstruction error
predictions = (reconstruction_errors > threshold).astype(int)

# Identify anomalies and highlight tuples
anomalous_tuples = []
anomalous_attributes = []
for i, (original, recon) in enumerate(zip(test_data, reconstructed)):
    if predictions[i] == 1:  # If the instance is an anomaly
        attribute_diff = np.abs(original - recon)
        significant_attributes = np.where(attribute_diff > 0.2)[0]  # Threshold for significant deviation
        anomalous_tuples.append((i, original))
        anomalous_attributes.append((i, significant_attributes))

# Output results
print("Anomaly Detection Results:")
for i, attributes in anomalous_attributes:
    print(f"Instance {i} is an anomaly. Significant deviations in attributes: {attributes}")

# Save the test dataset and results
import pandas as pd
test_df = pd.DataFrame(test_data, columns=[f"Feature_{i+1}" for i in range(test_data.shape[1])])
test_df["Anomaly"] = predictions
test_df.to_csv("test_dataset_results.csv", index=False)

print("\nTest dataset results saved as 'test_dataset_results.csv'.")


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step 
Anomaly Detection Results:
Instance 55 is an anomaly. Significant deviations in attributes: [0 1 2 3 4 5 6 7 8 9]
Instance 57 is an anomaly. Significant deviations in attributes: [0 1 2 3 4 5 6 7 8 9]
Instance 58 is an anomaly. Significant deviations in attributes: [0 1 2 3 4 5 6 7 8 9]

Test dataset results saved as 'test_dataset_results.csv'.


In [37]:

# Function to evaluate a specific tuple
def evaluate_tuple(input_tuple):
    # Normalize the input tuple using the same scaler
    normalized_tuple = scaler.transform([input_tuple])

    # Reconstruct using the VAE
    reconstructed_tuple = vae.predict(normalized_tuple)
    reconstruction_error = np.mean(np.square(normalized_tuple - reconstructed_tuple))

    # Check if it's an anomaly
    is_anomalous = reconstruction_error > threshold
    significant_attributes = np.where(np.abs(normalized_tuple - reconstructed_tuple) > 0.2)[1]

    print("\nTuple Evaluation Results:")
    print(f"Input Tuple: {input_tuple}")
    print(f"Reconstruction Error: {reconstruction_error:.4f}")
    print(f"Anomaly Status: {'Anomalous' if is_anomalous else 'Normal'}")
    if is_anomalous:
        print(f"Significant Deviations in Attributes: {significant_attributes.tolist()}")

# Example usage for a specific tuple
'''
Chemical_Composition_Compliance (%)
Normalized compliance percentage of the chemical composition.

Packaging_Compliance_Score (1-10)
Normalized score for packaging compliance.

Regulatory_Benchmark_Score (%)
Normalized regulatory compliance score.

Predicted_Compliance_Score (%)
Normalized predicted compliance score by the model.

Potency (%)
Normalized potency value.

Purity (%)
Normalized purity value.

Supplier_Reliability_Score (1-100)
Normalized reliability score for the supplier.

Historical_Quality_Score (%)
Normalized historical quality score of the supplier's batches.

Real_Time_Status (Binary)
Encoded real-time compliance status (e.g., 0 for non-compliant, 1 for compliant).

Batch_Status (Binary)
Encoded batch status (e.g., 0 for rejected, 1 for approved).
*#
'''

sample_tuple = [0.6, 0.7, 0.5, 0.8, 0.4, 0.9, 0.5, 0.6, 0, 1]
evaluate_tuple(sample_tuple)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step

Tuple Evaluation Results:
Input Tuple: [0.6, 0.7, 0.5, 0.8, 0.4, 0.9, 0.5, 0.6, 0, 1]
Reconstruction Error: 0.0744
Anomaly Status: Normal
