<a href="https://colab.research.google.com/github/Madhu-Vemula/-Network-Anomaly-Detection-Using-Generative-Adversarial-Networks-/blob/main/Network_Anomaly_Detection_Using_Generative_Adversarial_Networks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Part 1: Loading and Preprocessing Data**

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import logging

# Suppressing the warning
logging.getLogger('tensorflow').setLevel(logging.ERROR)

from google.colab import drive
drive.mount('/content/drive')

# Load and preprocess the dataset
print("Loading and preprocessing the dataset...")
print("Done")
dataset_path = "/content/drive/MyDrive/MINI 1/KDDTrain.csv"

# Load the dataset
df = pd.read_csv(dataset_path, nrows=1000)

# Encode categorical features
print("Encoding categorical features...")
print("Done")
categorical_cols = ['protocol_type', 'service', 'flag', 'label']
for col in categorical_cols:
    df[col] = pd.factorize(df[col])[0]

# Scale numerical features
print("Scaling numerical features...")
print("Done")
scaler = StandardScaler()
numerical_cols = [col for col in df.columns if col not in categorical_cols]
df[numerical_cols] = scaler.fit_transform(df[numerical_cols])

# Split dataset into train and test sets
print("Splitting dataset into train and test sets...")
print("Done")
X_train, X_test = train_test_split(df.values, test_size=0.2, random_state=42)


Mounted at /content/drive
Loading and preprocessing the dataset...
Done
Encoding categorical features...
Done
Scaling numerical features...
Done
Splitting dataset into train and test sets...
Done


**Part 2: Model Definition and Training**

In [None]:
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import numpy as np

# Define the generator model with increased complexity
def build_generator(latent_dim, output_dim):
    input_layer = Input(shape=(latent_dim,))
    x = Dense(256, activation='relu')(input_layer)
    x = Dropout(0.4)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(output_dim, activation='tanh')(x)
    generator = Model(input_layer, x)
    return generator

# Define the discriminator model with increased complexity
def build_discriminator(input_dim):
    input_layer = Input(shape=(input_dim,))
    x = Dense(256, activation='relu')(input_layer)
    x = Dropout(0.4)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(1, activation='sigmoid')(x)
    discriminator = Model(input_layer, x)
    discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002), metrics=['accuracy'])
    return discriminator


# Define a function to train the GAN with adjusted loss weight
def train_gan(generator, discriminator, gan, X_train, latent_dim, epochs, batch_size, loss_weight):
    train_accuracy = []
    test_accuracy = []
    for epoch in range(epochs):
        # Train discriminator
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        gen_data = generator.predict(noise)
        real_data = X_train[np.random.randint(0, X_train.shape[0], batch_size)]
        discriminator_loss_real = discriminator.train_on_batch(real_data, np.ones((batch_size, 1)))
        discriminator_loss_fake = discriminator.train_on_batch(gen_data, np.zeros((batch_size, 1)))
        discriminator_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)

        # Train generator
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        generator_loss = gan.train_on_batch(noise, np.ones((batch_size, 1)))

        # Print progress
        if epoch % 10 == 0:
            print(f"Epoch {epoch}/{epochs}, Discriminator Loss: {discriminator_loss[0]}, Generator Loss: {generator_loss}")

        # Evaluate model accuracy
        if epoch % 5 == 0:
            train_accuracy.append(evaluate_accuracy(generator, discriminator, X_train, batch_size))
            # test_accuracy.append(evaluate_accuracy(generator, discriminator, X_test, batch_size))
            print(f"Epoch {epoch}/{epochs}, Train Accuracy: {train_accuracy[-1]}")
            # print(f"Epoch {epoch}/{epochs}, Test Accuracy: {test_accuracy[-1]}")

    return train_accuracy, test_accuracy

# Combine generator and discriminator into a GAN model with adjusted loss weight
def build_gan(generator, discriminator, loss_weight):
    discriminator.trainable = False
    gan_input = Input(shape=(latent_dim,))
    gan_output = discriminator(generator(gan_input))
    gan = Model(gan_input, gan_output)
    gan.compile(optimizer=Adam(learning_rate=0.0002), loss='binary_crossentropy', loss_weights=loss_weight)
    return gan

# Set hyperparameters
latent_dim = 20
output_dim = df.shape[1]  # Assuming df is defined and contains the dataset
epochs = 100
batch_size = 512
loss_weight = 0.75

# Build generator and discriminator
generator = build_generator(latent_dim, output_dim)
discriminator = build_discriminator(output_dim)

# Save generator and discriminator models to Google Drive
generator.save("/content/drive/MyDrive/MINI 1/generator_model.h5")
discriminator.save("/content/drive/MyDrive/MINI 1/discriminator_model.h5")
print("Models Saved Successfully....")


  saving_api.save_model(


Models Saved Successfully....


**Part 3: Evaluation and Visualization**

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt

# Function to evaluate model accuracy
def evaluate_accuracy(generator, discriminator, X_data, batch_size):
    noise = np.random.normal(0, 1, (X_data.shape[0], latent_dim))
    gen_data = generator.predict(noise)
    real_data = X_data[np.random.randint(0, X_data.shape[0], batch_size)]
    synthetic_labels = np.zeros((X_data.shape[0], 1))
    real_labels = np.ones((batch_size, 1))
    discriminator_accuracy = accuracy_score(np.concatenate([np.ones((batch_size,)), np.zeros((X_data.shape[0]))]),
                                             np.concatenate([discriminator.predict(real_data).ravel() >= 0.5, discriminator.predict(gen_data).ravel() >= 0.5]))
    return discriminator_accuracy

# Set hyperparameters
print("Setting hyperparameters...")
latent_dim = 20
batch_size = 512
epochs = 100
loss_weight = 0.75

# Rebuild and compile the models with modified architecture and loss weight
print("Building and compiling the models...")
generator = build_generator(latent_dim, df.shape[1])
discriminator = build_discriminator(df.shape[1])
gan = build_gan(generator, discriminator, loss_weight)

# Train the GAN with adjusted loss weight
print("Training the GAN...")
train_accuracy, test_accuracy = train_gan(generator, discriminator, gan, X_train, latent_dim, epochs, batch_size, loss_weight)



Setting hyperparameters...
Building and compiling the models...
Training the GAN...
Epoch 0/100, Discriminator Loss: 0.6655756831169128, Generator Loss: 0.46752357482910156
Epoch 0/100, Train Accuracy: 0.3719512195121951
Epoch 5/100, Train Accuracy: 0.39253048780487804
Epoch 10/100, Discriminator Loss: 0.5233138203620911, Generator Loss: 0.4397878646850586
Epoch 10/100, Train Accuracy: 0.4169207317073171
Epoch 15/100, Train Accuracy: 0.5823170731707317
Epoch 20/100, Discriminator Loss: 0.45878664404153824, Generator Loss: 0.4424753189086914
Epoch 20/100, Train Accuracy: 0.9283536585365854
Epoch 25/100, Train Accuracy: 0.9839939024390244
Epoch 30/100, Discriminator Loss: 0.41094178706407547, Generator Loss: 0.4653846323490143
Epoch 30/100, Train Accuracy: 0.9908536585365854
Epoch 35/100, Train Accuracy: 0.9786585365853658
Epoch 40/100, Discriminator Loss: 0.3888697288930416, Generator Loss: 0.5030081272125244
Epoch 40/100, Train Accuracy: 0.9390243902439024
Epoch 45/100, Train Accuracy:

**Part 4: Classification Report and Analysis**

In [None]:
# Use the trained GAN for anomaly detection
print("Using the trained GAN for anomaly detection...")
# Generate synthetic data using the generator
synthetic_data = generator.predict(np.random.normal(0, 1, size=(len(df), latent_dim)))

# Calculate anomaly scores
print("Calculating anomaly scores...")
anomaly_scores = np.mean(np.abs(synthetic_data - df.values), axis=1)

# Set threshold for anomaly detection
threshold = np.percentile(anomaly_scores, 96)

# Detect anomalies
print("Detecting anomalies...")
predicted_labels = anomaly_scores > threshold

# Calculate confusion matrix
print("Calculating confusion matrix...")
tn, fp, fn, tp = confusion_matrix(df['label'] == 4, predicted_labels).ravel()

# Count and classify anomalies
print("Counting and classifying anomalies...")
anomaly_count = {}
for i in range(len(df.columns) - 1):
    count = df[df['label'] == i].shape[0]
    if count > 0:
        if i == 0:
            anomaly_count['Normal (attack_type 4)'] = count
        elif i == 15:
            anomaly_count['DoS (attack_type 15)'] = count
        elif i == 0:
            anomaly_count['Probe (attack_type 0)'] = count
        elif i == 9:
            anomaly_count['User-to-Root (attack_type 9)'] = count
        elif i == 10:
            anomaly_count['RemoteAccess (attack_type 10)'] = count
        elif i == 20:
            anomaly_count['WarezClient (attack_type 20)'] = count
        elif i == 8:
            anomaly_count['RootKit (attack_type 8)'] = count
        elif i == 17:
            anomaly_count['GuessPassword (attack_type 17)'] = count
        elif i == 7:
            anomaly_count['FTPWrite (attack_type 7)'] = count
        elif i == 6:
            anomaly_count['Multihop (attack_type 6)'] = count
        else:
            anomaly_count['Others'] = anomaly_count.get('Others', 0) + count

# Print results
print("Results:")
print(f"Accuracy: {accuracy_score(df['label'] == 4, predicted_labels) * 100:.2f}%")
print(f"True Negatives (TN): {tn}")
print(f"False Positives (FP): {fp}")
print(f"False Negatives (FN): {fn}")
print(f"True Positives (TP): {tp}")
# Print the count and classification of anomalies
print("Number of Anomalies:", sum(anomaly_count.values()))
print("Indices of Anomalies:", np.where(predicted_labels == 1)[0])
print("Attack Types of Anomalies:")
for key, value in anomaly_count.items():
    print(f"{key}: {value}")
print("Confusion Matrix:")
print(confusion_matrix(df['label'] == 4, predicted_labels))
print("Classification Report:")
print(classification_report(df['label'] == 4, predicted_labels))


Using the trained GAN for anomaly detection...
Calculating anomaly scores...
Detecting anomalies...
Calculating confusion matrix...
Counting and classifying anomalies...
Results:
Accuracy: 96.00%
True Negatives (TN): 960
False Positives (FP): 40
False Negatives (FN): 0
True Positives (TP): 0
Number of Anomalies: 1000
Indices of Anomalies: [254 301 308 312 348 373 381 405 411 440 443 481 540 561 592 597 647 650
 693 707 732 739 750 761 770 780 795 812 841 844 853 870 911 925 948 965
 966 976 986 991]
Attack Types of Anomalies:
Normal (attack_type 4): 516
Others: 484
Confusion Matrix:
[[960  40]
 [  0   0]]
Classification Report:
              precision    recall  f1-score   support

       False       1.00      0.96      0.98      1000
        True       0.00      0.00      0.00         0

    accuracy                           0.96      1000
   macro avg       0.50      0.48      0.49      1000
weighted avg       1.00      0.96      0.98      1000



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


**Part 5: GUI**

In [None]:
!pip install gradio


Collecting gradio
  Downloading gradio-4.31.5-py3-none-any.whl (12.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m57.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl (15 kB)
Collecting fastapi (from gradio)
  Downloading fastapi-0.111.0-py3-none-any.whl (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.0/92.0 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ffmpy (from gradio)
  Downloading ffmpy-0.3.2.tar.gz (5.5 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gradio-client==0.16.4 (from gradio)
  Downloading gradio_client-0.16.4-py3-none-any.whl (315 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m315.9/315.9 kB[0m [31m40.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting httpx>=0.24.1 (from gradio)
  Downloading httpx-0.27.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━

In [None]:
import gradio as gr
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import load_model
from sklearn.metrics import confusion_matrix

# Load the saved generator model
generator_model_path = "/content/drive/MyDrive/MINI 1/generator_model.h5"
generator = load_model(generator_model_path)

# Define the anomaly_detection_interface function
def anomaly_detection_interface(dataset_file):
    try:
        # Load the dataset
        df = pd.read_csv(dataset_file.name)

        # Encode categorical features
        categorical_cols = ['protocol_type', 'service', 'flag', 'label']
        for col in categorical_cols:
            df[col] = pd.factorize(df[col])[0]

        # Scale numerical features
        scaler = StandardScaler()
        numerical_cols = [col for col in df.columns if col not in categorical_cols]
        df[numerical_cols] = scaler.fit_transform(df[numerical_cols])

        # Generate synthetic data using the generator
        synthetic_data = generator.predict(np.random.normal(0, 1, size=(len(df), generator.input_shape[1])))

        # Calculate anomaly scores
        anomaly_scores = np.mean(np.abs(synthetic_data - df.values), axis=1)

        # Set threshold for anomaly detection
        threshold = np.percentile(anomaly_scores, 95)

        # Detect anomalies
        predicted_labels = anomaly_scores > threshold

        # Evaluate model accuracy
        binary_labels = np.where(df['label'] == 4, 0, 1)
        tn, fp, fn, tp = confusion_matrix(binary_labels, predicted_labels).ravel()
        test_accuracy = (tn + tp) / (tn + fp + fn + tp)

        # Determine if there are anomalies
        presence_of_anomaly = "Yes" if np.any(predicted_labels) else "No"

        # Count and classify anomalies
        attack_types = {
            0: "Normal (attack_type 4)",
            15: "DoS (attack_type 15)",
            9: "User-to-Root (attack_type 9)",
            10: "RemoteAccess (attack_type 10)",
            20: "WarezClient (attack_type 20)",
            8: "RootKit (attack_type 8)",
            17: "GuessPassword (attack_type 17)",
            7: "FTPWrite (attack_type 7)",
            6: "Multihop (attack_type 6)"
        }
        types_of_anomalies = [attack_types[label] for label in df.loc[predicted_labels, 'label'].unique() if label in attack_types]

        # Return the results
        return {
            "Presence of Anomaly": presence_of_anomaly,
            "Number of Anomalies": np.sum(predicted_labels),
            "Types of Anomalies": types_of_anomalies
        }
    except Exception as e:
        # Print the exception for troubleshooting
        print("Error:", e)
        # Return a message indicating an error occurred
        return {"error": str(e)}

# Create a Gradio interface
interface = gr.Interface(
    fn=anomaly_detection_interface,
    inputs="file",
    outputs="json",
    title="Anomaly Detection",
    description="Upload dataset file."
)

# Launch the interface
interface.launch()


Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://54fbff26fd6f45ee90.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import logging

# Suppressing the warning
logging.getLogger('tensorflow').setLevel(logging.ERROR)

from google.colab import drive
drive.mount('/content/drive')

# Part 1: Loading and Preprocessing Data
print("Loading and preprocessing the dataset...")
dataset_path = "/content/drive/MyDrive/MINI 1/KDDTrain.csv"
df = pd.read_csv(dataset_path, nrows=1000)

categorical_cols = ['protocol_type', 'service', 'flag', 'label']
for col in categorical_cols:
    df[col] = pd.factorize(df[col])[0]

scaler = StandardScaler()
numerical_cols = [col for col in df.columns if col not in categorical_cols]
df[numerical_cols] = scaler.fit_transform(df[numerical_cols])

X_train, X_test = train_test_split(df.values, test_size=0.2, random_state=42)

# Part 2: Model Definition and Training
def build_generator(latent_dim, output_dim):
    input_layer = Input(shape=(latent_dim,))
    x = Dense(256, activation='relu')(input_layer)
    x = Dropout(0.4)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(output_dim, activation='tanh')(x)
    generator = Model(input_layer, x)
    return generator

def build_discriminator(input_dim):
    input_layer = Input(shape=(input_dim,))
    x = Dense(256, activation='relu')(input_layer)
    x = Dropout(0.4)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(1, activation='sigmoid')(x)
    discriminator = Model(input_layer, x)
    discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002), metrics=['accuracy'])
    return discriminator

def train_gan(generator, discriminator, gan, X_train, latent_dim, epochs, batch_size, loss_weight):
    train_accuracy = []
    test_accuracy = []
    for epoch in range(epochs):
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        gen_data = generator.predict(noise)
        real_data = X_train[np.random.randint(0, X_train.shape[0], batch_size)]
        discriminator_loss_real = discriminator.train_on_batch(real_data, np.ones((batch_size, 1)))
        discriminator_loss_fake = discriminator.train_on_batch(gen_data, np.zeros((batch_size, 1)))
        discriminator_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)

        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        generator_loss = gan.train_on_batch(noise, np.ones((batch_size, 1)))

        if epoch % 10 == 0:
            print(f"Epoch {epoch}/{epochs}, Discriminator Loss: {discriminator_loss[0]}, Generator Loss: {generator_loss}")

        if epoch % 5 == 0:
            train_accuracy.append(evaluate_accuracy(generator, discriminator, X_train, batch_size))
            print(f"Epoch {epoch}/{epochs}, Train Accuracy: {train_accuracy[-1]}")

    return train_accuracy, test_accuracy

def build_gan(generator, discriminator, loss_weight):
    discriminator.trainable = False
    gan_input = Input(shape=(latent_dim,))
    gan_output = discriminator(generator(gan_input))
    gan = Model(gan_input, gan_output)
    gan.compile(optimizer=Adam(learning_rate=0.0002), loss='binary_crossentropy', loss_weights=loss_weight)
    return gan

latent_dim = 20
output_dim = df.shape[1]
epochs = 100
batch_size = 512
loss_weight = 0.75

generator = build_generator(latent_dim, output_dim)
discriminator = build_discriminator(output_dim)

generator.save("/content/drive/MyDrive/MINI 1/generator_model.h5")
discriminator.save("/content/drive/MyDrive/MINI 1/discriminator_model.h5")
print("Models Saved Successfully....")

# Part 3: Evaluation and Visualization
def evaluate_accuracy(generator, discriminator, X_data, batch_size):
    noise = np.random.normal(0, 1, (X_data.shape[0], latent_dim))
    gen_data = generator.predict(noise)
    real_data = X_data[np.random.randint(0, X_data.shape[0], batch_size)]
    synthetic_labels = np.zeros((X_data.shape[0], 1))
    real_labels = np.ones((batch_size, 1))
    discriminator_accuracy = accuracy_score(np.concatenate([np.ones((batch_size,)), np.zeros((X_data.shape[0]))]),
                                             np.concatenate([discriminator.predict(real_data).ravel() >= 0.5, discriminator.predict(gen_data).ravel() >= 0.5]))
    return discriminator_accuracy

latent_dim = 20
batch_size = 512
epochs = 100
loss_weight = 0.75

generator = build_generator(latent_dim, df.shape[1])
discriminator = build_discriminator(df.shape[1])
gan = build_gan(generator, discriminator, loss_weight)

train_accuracy, test_accuracy = train_gan(generator, discriminator, gan, X_train, latent_dim, epochs, batch_size, loss_weight)

# Part 4: Classification Report and Analysis
print("Using the trained GAN for anomaly detection...")
synthetic_data = generator.predict(np.random.normal(0, 1, size=(len(df), latent_dim)))

print("Calculating anomaly scores...")
anomaly_scores = np.mean(np.abs(synthetic_data - df.values), axis=1)

# Adjust the threshold dynamically
threshold = anomaly_scores.mean() + 2 * anomaly_scores.std()

print("Detecting anomalies...")
predicted_labels = anomaly_scores > threshold

print("Calculating confusion matrix...")
tn, fp, fn, tp = confusion_matrix(df['label'] == 4, predicted_labels).ravel()

print("Counting and classifying anomalies...")
anomaly_count = {}
for i in range(len(df.columns) - 1):
    count = df[df['label'] == i].shape[0]
    if count > 0:
        if i == 0:
            anomaly_count['Normal (attack_type 4)'] = count
        elif i == 15:
            anomaly_count['DoS (attack_type 15)'] = count
        elif i == 0:
            anomaly_count['Probe (attack_type 0)'] = count
        elif i == 9:
            anomaly_count['User-to-Root (attack_type 9)'] = count
        elif i == 10:
            anomaly_count['RemoteAccess (attack_type 10)'] = count
        elif i == 20:
            anomaly_count['WarezClient (attack_type 20)'] = count
        elif i == 8:
            anomaly_count['RootKit (attack_type 8)'] = count
        elif i == 17:
            anomaly_count['GuessPassword (attack_type 17)'] = count
        elif i == 7:
            anomaly_count['FTPWrite (attack_type 7)'] = count
        elif i == 6:
            anomaly_count['Multihop (attack_type 6)'] = count
        else:
            anomaly_count['Others'] = anomaly_count.get('Others', 0) + count

print("Results:")
print(f"Accuracy: {accuracy_score(df['label'] == 4, predicted_labels) * 100:.2f}%")
print(f"True Negatives (TN): {tn}")
print(f"False Positives (FP): {fp}")
print(f"False Negatives (FN): {fn}")
print(f"True Positives (TP): {tp}")

print("Number of Anomalies:", sum(anomaly_count.values()))
print("Indices of Anomalies:", np.where(predicted_labels == 1)[0])
print("Attack Types of Anomalies:")
for key, value in anomaly_count.items():
    print(f"{key}: {value}")

print("Confusion Matrix:")
print(confusion_matrix(df['label'] == 4, predicted_labels))
print("Classification Report:")
print(classification_report(df['label'] == 4, predicted_labels))


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Loading and preprocessing the dataset...


  saving_api.save_model(


Models Saved Successfully....
Epoch 0/100, Discriminator Loss: 0.8783400356769562, Generator Loss: 0.5384610891342163
Epoch 0/100, Train Accuracy: 0.4344512195121951
Epoch 5/100, Train Accuracy: 0.40625
Epoch 10/100, Discriminator Loss: 0.5537185817956924, Generator Loss: 0.4851207733154297
Epoch 10/100, Train Accuracy: 0.39176829268292684
Epoch 15/100, Train Accuracy: 0.3940548780487805
Epoch 20/100, Discriminator Loss: 0.48529694974422455, Generator Loss: 0.46769022941589355
Epoch 20/100, Train Accuracy: 0.4260670731707317
Epoch 25/100, Train Accuracy: 0.6638719512195121
Epoch 30/100, Discriminator Loss: 0.43277083337306976, Generator Loss: 0.4889828562736511
Epoch 30/100, Train Accuracy: 0.9413109756097561
Epoch 35/100, Train Accuracy: 0.9786585365853658
Epoch 40/100, Discriminator Loss: 0.4119521304965019, Generator Loss: 0.5342110395431519
Epoch 40/100, Train Accuracy: 0.9550304878048781
Epoch 45/100, Train Accuracy: 0.9222560975609756
Epoch 50/100, Discriminator Loss: 0.400002092

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import logging

# Suppressing the warning
logging.getLogger('tensorflow').setLevel(logging.ERROR)

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Part 1: Loading and Preprocessing Data
print("Loading and preprocessing the dataset...")
dataset_path = "/content/drive/MyDrive/MINI 1/KDDTrain.csv"
df = pd.read_csv(dataset_path, nrows=1000)

categorical_cols = ['protocol_type', 'service', 'flag', 'label']
for col in categorical_cols:
    df[col] = pd.factorize(df[col])[0]

scaler = StandardScaler()
numerical_cols = [col for col in df.columns if col not in categorical_cols]
df[numerical_cols] = scaler.fit_transform(df[numerical_cols])

X_train, X_test = train_test_split(df, test_size=0.2, random_state=42)

# Part 2: Model Definition and Training
def build_generator(latent_dim, output_dim):
    input_layer = Input(shape=(latent_dim,))
    x = Dense(256, activation='relu')(input_layer)
    x = Dropout(0.4)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(output_dim, activation='tanh')(x)
    generator = Model(input_layer, x)
    return generator

def build_discriminator(input_dim):
    input_layer = Input(shape=(input_dim,))
    x = Dense(256, activation='relu')(input_layer)
    x = Dropout(0.4)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x)
    x = Dense(1, activation='sigmoid')(x)
    discriminator = Model(input_layer, x)
    discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002), metrics=['accuracy'])
    return discriminator

def train_gan(generator, discriminator, gan, X_train, latent_dim, epochs, batch_size, loss_weight):
    train_accuracy = []
    test_accuracy = []
    for epoch in range(epochs):
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        gen_data = generator.predict(noise)
        real_data = X_train.values[np.random.randint(0, X_train.shape[0], batch_size)]
        discriminator_loss_real = discriminator.train_on_batch(real_data, np.ones((batch_size, 1)))
        discriminator_loss_fake = discriminator.train_on_batch(gen_data, np.zeros((batch_size, 1)))
        discriminator_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)

        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        generator_loss = gan.train_on_batch(noise, np.ones((batch_size, 1)))

        if epoch % 10 == 0:
            print(f"Epoch {epoch}/{epochs}, Discriminator Loss: {discriminator_loss[0]}, Generator Loss: {generator_loss}")

        if epoch % 5 == 0:
            train_accuracy.append(evaluate_accuracy(generator, discriminator, X_train, batch_size))
            test_accuracy.append(evaluate_accuracy(generator, discriminator, X_test, batch_size))
            print(f"Epoch {epoch}/{epochs}, Train Accuracy: {train_accuracy[-1]}, Test Accuracy: {test_accuracy[-1]}")

    return train_accuracy, test_accuracy

def build_gan(generator, discriminator, loss_weight):
    discriminator.trainable = False
    gan_input = Input(shape=(latent_dim,))
    gan_output = discriminator(generator(gan_input))
    gan = Model(gan_input, gan_output)
    gan.compile(optimizer=Adam(learning_rate=0.0002), loss='binary_crossentropy', loss_weights=loss_weight)
    return gan

def evaluate_accuracy(generator, discriminator, X_data, batch_size):
    noise = np.random.normal(0, 1, (X_data.shape[0], latent_dim))
    gen_data = generator.predict(noise)
    real_data = X_data.values[np.random.randint(0, X_data.shape[0], batch_size)]
    synthetic_labels = np.zeros((X_data.shape[0], 1))
    real_labels = np.ones((batch_size, 1))
    discriminator_accuracy = accuracy_score(np.concatenate([np.ones((batch_size,)), np.zeros((X_data.shape[0]))]),
                                             np.concatenate([discriminator.predict(real_data).ravel() >= 0.5, discriminator.predict(gen_data).ravel() >= 0.5]))
    return discriminator_accuracy

latent_dim = 20
output_dim = df.shape[1]
epochs = 200  # Increased number of epochs
batch_size = 512
loss_weight = 0.75

generator = build_generator(latent_dim, output_dim)
discriminator = build_discriminator(output_dim)
gan = build_gan(generator, discriminator, loss_weight)

# Train the GAN model
train_accuracy, test_accuracy = train_gan(generator, discriminator, gan, X_train, latent_dim, epochs, batch_size, loss_weight)

# Use the trained GAN for anomaly detection
print("Using the trained GAN for anomaly detection...")
# Generate synthetic data using the generator
synthetic_data = generator.predict(np.random.normal(0, 1, size=(len(df), latent_dim)))

# Calculate anomaly scores
print("Calculating anomaly scores...")
anomaly_scores = np.mean(np.abs(synthetic_data - df.values), axis=1)

# Dynamically determine threshold based on anomaly scores
threshold = np.mean(anomaly_scores) + 1.5 * np.std(anomaly_scores)

# Detect anomalies based on the dynamically determined threshold
predicted_labels = anomaly_scores > threshold

# Count and classify anomalies
anomaly_count = {}
for i in range(len(df.columns) - 1):
    count = df[df['label'] == i].shape[0]
    if count > 0:
        anomaly_count[f"Anomaly (attack_type {i})"] = count

# Include normal instances in the count of anomalies
anomaly_count['Normal (attack_type 4)'] = (df['label'] == 4).sum()

# Print results
print("Results:")
print(f"Accuracy: {accuracy_score(df['label'] == 4, predicted_labels) * 100:.2f}%")
print(f"True Negatives (TN): {tn}")
print(f"False Positives (FP): {fp}")
print(f"False Negatives (FN): {fn}")
print(f"True Positives (TP): {tp}")
print("Number of Anomalies:", sum(anomaly_count.values()))
print("Indices of Anomalies:", np.where(predicted_labels == 1)[0])
print("Attack Types of Anomalies:")
for key, value in anomaly_count.items():
    print(f"{key}: {value}")

# Print confusion matrix and classification report
print("Confusion Matrix:")
print(confusion_matrix(df['label'] == 4, predicted_labels))
print("Classification Report:")
print(classification_report(df['label'] == 4, predicted_labels))


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Loading and preprocessing the dataset...
Epoch 0/200, Discriminator Loss: 0.6960268020629883, Generator Loss: 0.4575808644294739
Epoch 0/200, Train Accuracy: 0.38414634146341464, Test Accuracy: 0.7134831460674157
Epoch 5/200, Train Accuracy: 0.3902439024390244, Test Accuracy: 0.7191011235955056
Epoch 10/200, Discriminator Loss: 0.5197523534297943, Generator Loss: 0.40747690200805664
Epoch 10/200, Train Accuracy: 0.3902439024390244, Test Accuracy: 0.7191011235955056
Epoch 15/200, Train Accuracy: 0.3902439024390244, Test Accuracy: 0.7191011235955056
Epoch 20/200, Discriminator Loss: 0.461400143802166, Generator Loss: 0.41099846363067627
Epoch 20/200, Train Accuracy: 0.4245426829268293, Test Accuracy: 0.7331460674157303
Epoch 25/200, Train Accuracy: 0.6920731707317073, Test Accuracy: 0.8890449438202247
Epoch 30/200, Discriminator Loss: 0.42316070944070816, Gener

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
