In [22]:
import numpy as np
import pandas as pd
from sklearn import datasets
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.preprocessing import LabelEncoder

In [21]:
df = pd.read_csv(r"C:\Users\trush\OneDrive\Documents\WFU Grad School Info\BAN 6025 Machine Learning\Data\iris.csv")

In [23]:
le = LabelEncoder()
df['species'] = le.fit_transform(df['species'])

In [24]:
df['species'].value_counts()

species
0    50
1    50
2    50
Name: count, dtype: int64

This is a Generative Adversarial Network (GAN). It is used to create synthetic records within a dataset. A GAN has two parts, a generator and a discriminator. The generator, generates the fake records and their class labels. The discriminator tries to prove that these fake records are fake by using the real records from the dataset. The model iterates, and the generator makes fake records until it can fool the discriminator. Those fake records can the be used in the dataset. 

This works especially well when using classification models. If I have a dataset with 70% of one class and only 30% of another class. I can use the GAN to make those missing 40% of the second class which will remove any bias in classification.

In [30]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
from keras.models import Model
from keras.layers import Input, Dense, Concatenate
from keras.optimizers import Adam

# Load Iris dataset
iris = load_iris()
X = iris.data  # Features
Y = iris.target  # Labels

# Normalize the data
scaler = MinMaxScaler()
X = scaler.fit_transform(X)

# Convert data to pandas DataFrame
real_data = pd.DataFrame(X, columns=[str(i) for i in range(1, 5)])  # Four features
real_labels = Y  # No need to convert to numpy array

# One-hot encode labels
one_hot_encoder = OneHotEncoder(sparse_output=False)
one_hot_labels = one_hot_encoder.fit_transform(real_labels.reshape(-1, 1))
print("One-hot labels shape:", one_hot_labels.shape)

# Constants
NOISE_DIM = 100
NUM_CLASSES = one_hot_labels.shape[1]  # Should be 3 for the Iris dataset
NUM_FEATURES = X.shape[1]  # Should be 4 for the Iris dataset
BATCH_SIZE = 32  # Adjusted to be smaller since Iris dataset is small
TRAINING_STEPS = 5000

# Generator
def create_generator():
    noise_input = Input(shape=(NOISE_DIM,))
    class_input = Input(shape=(NUM_CLASSES,))
    merged_input = Concatenate()([noise_input, class_input])
    hidden = Dense(128, activation='relu')(merged_input)
    output = Dense(NUM_FEATURES, activation='linear')(hidden)
    model = Model(inputs=[noise_input, class_input], outputs=output)
    return model

# Discriminator
def create_discriminator():
    data_input = Input(shape=(NUM_FEATURES,))
    class_input = Input(shape=(NUM_CLASSES,))
    merged_input = Concatenate()([data_input, class_input])
    hidden = Dense(128, activation='relu')(merged_input)
    output = Dense(1, activation='sigmoid')(hidden)
    model = Model(inputs=[data_input, class_input], outputs=output)
    return model

# cGAN
def create_cgan(generator, discriminator):
    noise_input = Input(shape=(NOISE_DIM,))
    class_input = Input(shape=(NUM_CLASSES,))
    generated_data = generator([noise_input, class_input])
    validity = discriminator([generated_data, class_input])
    model = Model(inputs=[noise_input, class_input], outputs=validity)
    return model

# Create and compile the models
discriminator = create_discriminator()
generator = create_generator()
gan = create_cgan(generator, discriminator)

# Ensure that only the generator is trained during GAN training
discriminator.trainable = False

discriminator.compile(loss='binary_crossentropy', optimizer=Adam())
gan.compile(loss='binary_crossentropy', optimizer=Adam())

# Generate instances for a given class
def generate_data(generator, data_class, num_instances):
    one_hot_class = one_hot_encoder.transform(np.array([[data_class]]))
    noise = np.random.normal(0, 1, (num_instances, NOISE_DIM))
    generated_data = generator.predict([noise, np.repeat(one_hot_class, num_instances, axis=0)])
    return pd.DataFrame(generated_data, columns=[str(i) for i in range(1, 5)])  # Four features

# Train GAN
step_list, loss_list_discriminator, loss_list_generator = [], [], []

for step in range(TRAINING_STEPS):
    # Select a random batch of real data with labels
    idx = np.random.randint(0, real_data.shape[0], BATCH_SIZE)
    real_batch = real_data.iloc[idx].values
    labels_batch = one_hot_labels[idx]

    # Generate a batch of new data
    noise = np.random.normal(0, 1, (BATCH_SIZE, NOISE_DIM))
    generated_batch = generator.predict([noise, labels_batch])

    real_batch = real_batch.astype('float32')
    labels_batch = labels_batch.astype('float32')
    
    # Train the discriminator
    real_loss = discriminator.train_on_batch([real_batch, labels_batch], np.ones((BATCH_SIZE, 1)))
    fake_loss = discriminator.train_on_batch([generated_batch, labels_batch], np.zeros((BATCH_SIZE, 1)))
    discriminator_loss = 0.5 * (real_loss + fake_loss)

    # Train the generator
    generator_loss = gan.train_on_batch([noise, labels_batch], np.ones((BATCH_SIZE, 1)))

    if step % 50 == 0:
        print(f"Step: {step}, Discriminator Loss: {discriminator_loss:.4f}, Generator Loss: {generator_loss:.4f}")

    # Save losses for plotting later
    step_list.append(step)
    loss_list_discriminator.append(discriminator_loss)
    loss_list_generator.append(generator_loss)

    # Visualization every 500 steps
    if step % 500 == 0:
        generated_samples = generate_data(generator, 0, 10)  # Change class as needed (0, 1, or 2)
        plt.figure(figsize=(10, 5))
        plt.scatter(generated_samples.iloc[:, 0], generated_samples.iloc[:, 1])  # Scatter plot for first two features
        plt.title(f"Generated Samples at Step {step}")
        plt.xlabel("Feature 1")
        plt.ylabel("Feature 2")
        plt.show()

# Generate 40 instances of each class
for class_idx in range(NUM_CLASSES):
    generated_data = generate_data(generator, class_idx, 40)
    print(f"Generated data for class {class_idx}:")
    print(generated_data)

# Plot loss curves
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(step_list, loss_list_discriminator)
plt.title('Discriminator Loss vs. Step')
plt.xlabel('Step')
plt.ylabel('Loss')

plt.subplot(1, 2, 2)
plt.plot(step_list, loss_list_generator)
plt.title('Generator Loss vs. Step')
plt.xlabel('Step')
plt.ylabel('Loss')

plt.tight_layout()
plt.show()


One-hot labels shape: (150, 3)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step




AttributeError: 'NoneType' object has no attribute 'update_state'

In [28]:
# Load dataset
X = df.drop(['Label'], axis=1)
Y = df['Label']

# Normalize the data
scaler = MinMaxScaler()
X = scaler.fit_transform(X)

# Convert data to pandas DataFrame
real_data = pd.DataFrame(X, columns=['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14',
                                    '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27',
                                    '28', '29', '30'])
real_labels = Y

# One hot encode labels
one_hot_encoder = OneHotEncoder(sparse_output=False)
one_hot_labels = one_hot_encoder.fit_transform(np.array(real_labels).reshape(-1, 1))
print("One-hot labels shape:", one_hot_labels.shape)
print(one_hot_labels)

# Constants
NOISE_DIM = 100
NUM_CLASSES = 2
NUM_FEATURES = 30
BATCH_SIZE = 64
TRAINING_STEPS = 5000

# Generator
def create_generator():
    noise_input = Input(shape=(NOISE_DIM,))
    class_input = Input(shape=(NUM_CLASSES,))
    merged_input = Concatenate()([noise_input, class_input])
    hidden = Dense(128, activation='relu')(merged_input)
    output = Dense(NUM_FEATURES, activation='linear')(hidden)
    model = Model(inputs=[noise_input, class_input], outputs=output)
    return model

# Discriminator
def create_discriminator():
    data_input = Input(shape=(NUM_FEATURES,))
    class_input = Input(shape=(NUM_CLASSES,))
    merged_input = Concatenate()([data_input, class_input])
    hidden = Dense(128, activation='relu')(merged_input)
    output = Dense(1, activation='sigmoid')(hidden)
    model = Model(inputs=[data_input, class_input], outputs=output)
    return model

# cGAN
def create_cgan(generator, discriminator):
    noise_input = Input(shape=(NOISE_DIM,))
    class_input = Input(shape=(NUM_CLASSES,))
    generated_data = generator([noise_input, class_input])
    validity = discriminator([generated_data, class_input])
    model = Model(inputs=[noise_input, class_input], outputs=validity)
    return model

# Create and compile the Discriminator
discriminator = create_discriminator()

# Create the Generator
generator = create_generator()

# Create the GAN
gan = create_cgan(generator, discriminator)

# Ensure that only the generator is trained
discriminator.trainable = False

discriminator.compile(loss='binary_crossentropy', optimizer=Adam())
gan.compile(loss='binary_crossentropy', optimizer=Adam())

# Generate instances for a given class
def generate_data(generator, data_class, num_instances):
    one_hot_class = one_hot_encoder.transform(np.array([[data_class]]))
    noise = np.random.normal(0, 1, (num_instances, NOISE_DIM))
    generated_data = generator.predict([noise, np.repeat(one_hot_class, num_instances, axis=0)])
    return pd.DataFrame(generated_data, columns=['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14',
                                    '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27',
                                    '28', '29', '30'])
# Train GAN
step_list = []
loss_list_discriminator = []
loss_list_generator = []
for step in range(TRAINING_STEPS):
    # Select a random batch of real data with labels
    idx = np.random.randint(0, real_data.shape[0], BATCH_SIZE)
    real_batch = real_data.iloc[idx].values
    labels_batch = one_hot_labels[idx]

    print("Real batch shape:", real_batch.shape)
    print("Labels batch shape:", labels_batch.shape)

    # Check if the input structure matches
    print("Inputs to discriminator:", [real_batch, labels_batch])

    # Generate a batch of new data
    noise = np.random.normal(0, 1, (BATCH_SIZE, NOISE_DIM))
    generated_batch = generator.predict([noise, labels_batch])

    
    # Train the discriminator
    real_loss = discriminator.train_on_batch([real_batch, labels_batch], np.ones((BATCH_SIZE, 1)))
    fake_loss = discriminator.train_on_batch([generated_batch, labels_batch], np.zeros((BATCH_SIZE, 1)))
    discriminator_loss = 0.5 * np.add(real_loss, fake_loss)

    # Train the generator
    generator_loss = gan.train_on_batch([noise, labels_batch], np.ones((BATCH_SIZE, 1)))

    if step % 50 == 0:
        print(f"Step: {step}, Discriminator Loss: {discriminator_loss}, Generator Loss: {generator_loss}")

    if step % 500 == 0:
        generated_samples = generate_data(generator, 1, 10)  # Change class as needed
        plt.figure(figsize=(10, 5))
        plt.scatter(generated_samples.iloc[:, 0], generated_samples.iloc[:, 1])  # Change to relevant columns
        plt.title(f"Generated Samples at Step {step}")
        plt.show()
    
    step_list.append(step)
    loss_list_discriminator.append(discriminator_loss)
    loss_list_generator.append(generator_loss)


# Generate 40 instances of class 1
generated_data = generate_data(generator, 1, 40)
print(generated_data)

plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.plot(step_list, loss_list_discriminator)
plt.title('Discriminator Loss vs. Step')
    
plt.subplot(1,2,2)
plt.plot(step_list, loss_list_generator)
plt.title('Generator Loss vs. Step')

KeyError: "['Label'] not found in axis"

In [None]:
plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.plot(step_list, loss_list_discriminator)
plt.title('Discriminator Loss vs. Step')
    
plt.subplot(1,2,2)
plt.plot(step_list, loss_list_generator)
plt.title('Generator Loss vs. Step')

In [None]:
synthetic_data_class_1 = generate_data(generator, 1, 12226)

In [29]:
synthetic_data = synthetic_data_class_1
synthetic_data = pd.DataFrame(scaler.inverse_transform(synthetic_data), 
                              columns=['Dport', 'SrcBytes', 'DstBytes', 'SrcLoad', 'DstLoad', 'SIntPkt',
       'DIntPkt', 'SIntPktAct', 'SrcJitter', 'DstJitter', 'sMaxPktSz',
       'dMaxPktSz', 'sMinPktSz', 'Dur', 'TotPkts', 'TotBytes', 'Load', 'Loss',
       'pLoss', 'pSrcLoss', 'pDstLoss', 'Rate', 'Temp', 'SpO2', 'Pulse_Rate',
       'SYS', 'DIA', 'Heart_rate', 'Resp_Rate', 'ST'])

synthetic_labels = [1]*12226
synthetic_data['Label'] = synthetic_labels
synthetic_data.to_csv('synthetic_attack_csv', index=False)

In [None]:
synthetic_data = pd.read_csv('synthetic_attack_csv.csv')
synthetic_data.head()

In [None]:
df.head()

In [None]:
combined_data = pd.concat([df, synthetic_data], ignore_index=True)
combined_data.head()

In [None]:
combined_data['Label'].value_counts()

In [None]:
#Running a Classifier on the combined dataset

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split, cross_val_predict

X = combined_data.drop('Label', axis=1)
y = combined_data['Label']

lr = LogisticRegression()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
lr.fit(X_train, y_train)

y_pred = cross_val_predict(lr, X, y, cv=5)

print('The Accuracy of this model is: ', accuracy_score(y, y_pred))
print('Confusion Matrix')
print(confusion_matrix(y, y_pred))