# Enhanced detection of DNS tunnelling: Leveraging random forest and genetic algorithm for improved security

To download the necessary CSV files for the project, use the following `wget` commands:


In [None]:
import os
import requests

# List of file URLs to download
file_urls = [
    ("https://drive.usercontent.google.com/download?id=1cictwnxUyu1vCa4H9iefIrQeVLCC3RCv&export=download&authuser=0&confirm=t&uuid=8ec5d698-4d5d-4592-94eb-8a82234966ac&at=AC2mKKTzwehwnBUepaEJIDoKDql-:1690876827674", "benign-chrome.csv"),
    ("https://drive.usercontent.google.com/download?id=1cms99qEylyvesqcX3dQRZOUQRAONy2uS&export=download&authuser=0&confirm=t&uuid=0f089685-41f1-40fe-903e-8fcc8e2bcac8&at=AC2mKKSfqH9g0sjW4mQVa5-J4gMf:1690877149684", "benign-firefox.csv"),
    ("https://drive.usercontent.google.com/download?id=1cqDL7A_kdOCL4Km4uUifRPllFmB3WaZ_&export=download&authuser=0&confirm=t&uuid=19171c97-ad00-4af4-bf46-ef8c453b2964&at=AC2mKKROICucTfu1coxAIff16wi1:1690878058234", "mal-dns2tcp.csv"),
    ("https://drive.usercontent.google.com/download?id=1cxeTvXNV-OY_4T6xs4sUB98lmanROw3m&export=download&authuser=0&confirm=t&uuid=67df7c64-15ed-450d-bad8-f416080d378d&at=AC2mKKST9kQGoFcvwe9EhJoY6jRA:1690878087508", "mal-dnscat2.csv"),
    ("https://drive.google.com/u/1/uc?id=1czNRMpNyicFNYW2fbK_WjsoF77qB9_XA&export=download", "mal-iodine.csv")
]

# Create a directory to save the files
if not os.path.exists("DoHBrw-2020"):
    os.makedirs("DoHBrw-2020")

# Loop through the file URLs and download files if not already present
for url, filename in file_urls:
    file_path = os.path.join("DoHBrw-2020", filename)
    if not os.path.exists(file_path):
        try:
            print(f"Downloading {filename}...")
            response = requests.get(url, stream=True)
            with open(file_path, "wb") as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)
            print(f"{filename} downloaded successfully!")
        except Exception as e:
            print(f"Error downloading {filename}: {e}")
    else:
        print(f"{filename} already exists!")



In [None]:
# Importing required libraries
import json  # For working with JSON data
import math  # For mathematical operations
from collections import Counter  # For counting elements in a list
from os.path import join  # For joining file paths
import numpy as np  # For numerical operations and arrays
import pandas as pd  # For data manipulation and analysis
import plotly.express as px  # For interactive plotting
import plotly.figure_factory as ff  # For creating various types of figures
import plotly.graph_objects as go  # For creating customized plots
import random  # For generating random values
from tqdm.notebook import tqdm, trange  # For displaying progress bars in Jupyter Notebook
from deap import base, creator, tools, algorithms  # For evolutionary algorithms
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, f1_score  # For model evaluation metrics
from sklearn.model_selection import cross_val_score, train_test_split  # For cross-validation and data splitting
from sklearn.preprocessing import StandardScaler, LabelEncoder  # For data preprocessing
from sklearn.ensemble import RandomForestClassifier  # For building a Random Forest classifier
from sklearn.impute import SimpleImputer  # For imputing missing values
from sklearn.inspection import permutation_importance  # For feature importance analysis

from plotly.offline import iplot  # For offline plotting

# Additional imports
import matplotlib.pyplot as plt  # For creating traditional plots

# Shuffle data
from sklearn.utils import shuffle  # For shuffling data

import time

from sklearn.metrics import accuracy_score


import pickle  # Add this import statement



In [None]:
# Read the first benign CSV file into a DataFrame called df1_benign
df1_benign = pd.read_csv('DoHBrw-2020/benign-chrome.csv', delimiter=',')

# Read the second benign CSV file into another DataFrame called df2_benign
df2_benign = pd.read_csv('DoHBrw-2020/benign-firefox.csv', delimiter=',')

# Append the contents of df2_benign to df1_benign (Note: This does not modify df1_benign in-place, it returns a new DataFrame)
df_benign = pd.concat([df1_benign, df2_benign], ignore_index=True)

# Add a new column 'DoH' to df1_benign and set all values in that column to 0, indicating benign traffic
df_benign['DoH'] = 0  # 'DoH' stands for DNS-over-HTTPS, and 0 indicates benign traffic

# Rename the column 'DoH' to 'labels' in df1_benign
df_benign = df_benign.rename(columns={'DoH': 'labels'})
df_benign


In [None]:

# Read the first malicious CSV file into a DataFrame called df1_malic
df1_malic = pd.read_csv('DoHBrw-2020/mal-iodine.csv', delimiter=',')

# Add a new column 'DoH' to df1_malic and set all values in that column to 1, indicating malicious traffic of type 'iodine'
df1_malic['DoH'] = 1  # 1 stands for 'iodine' (a type of malicious traffic)

# Read the second malicious CSV file into another DataFrame called df2_malic
df2_malic = pd.read_csv('DoHBrw-2020/mal-dns2tcp.csv', delimiter=',')

# Add a new column 'DoH' to df2_malic and set all values in that column to 2, indicating malicious traffic of type 'dns2tcp'
df2_malic['DoH'] = 2  # 2 stands for 'dns2tcp' (another type of malicious traffic)

# Read the third malicious CSV file into another DataFrame called df3_malic
df3_malic = pd.read_csv('DoHBrw-2020/mal-dnscat2.csv', delimiter=',')

# Add a new column 'DoH' to df3_malic and set all values in that column to 3, indicating malicious traffic of type 'dnscat2'
df3_malic['DoH'] = 3  # 3 stands for 'dnscat2' (yet another type of malicious traffic)

# Concatenate the DataFrames df1_malic, df2_malic, and df3_malic into a single DataFrame
# The 'ignore_index=True' ensures that the index is reset after concatenation to avoid index duplication
df1_malic = pd.concat([df1_malic, df2_malic, df3_malic], ignore_index=True)

# Rename the column 'DoH' to 'labels' in df1_malic to have a common label indicating the type of traffic (0 for benign, 1, 2, 3 for malicious types)
df1_malic = df1_malic.rename(columns={'DoH': 'labels'})
df1_malic


In [None]:
# Shuffle the DataFrame
data = shuffle(pd.concat([df_benign, df1_malic], ignore_index=True))

# Check the number of null (missing) values in each column of the DataFrame 'data'
null_value_counts = data.isnull().sum()

# Drop columns with the same value across all rows
columns_to_drop = [col for col in data.columns if data[col].nunique() == 1]
data_dropped = data.drop(columns=columns_to_drop)

# Fill missing values or NaN values with 0 for all columns
data_filled = data_dropped.fillna(0)

# Print the number of null values after filling
print("Null Value Counts after Filling:")
print(data_filled.isnull().sum())

# Now 'data_filled' contains the DataFrame with missing values filled with 0


In [None]:
data

In [None]:
# Compute the statistical summary of numeric columns in the DataFrame 'data'
data.describe()


In [None]:
"""
The code data['SourceIP'] is used to access the 'SourceIP' column in the DataFrame data.
It retrieves the values of the 'SourceIP' column, which represents the source IP addresses of
the network traffic data.
"""
data['SourceIP']

In [None]:
# Compute the count of each unique value in the 'labels' column of the DataFrame 'data'
data.labels.value_counts()

In [None]:

# Map the numeric labels to their corresponding descriptions
attack_descriptions = {
    0: "Benign",
    1: "Malicious - Iodine",
    2: "Malicious - DNS2TCP",
    3: "Malicious - Dnscat2",
}

# Convert the 'TimeStamp' column to datetime if it's not already in datetime format
data['TimeStamp'] = pd.to_datetime(data['TimeStamp'])

# Group the data by 'TimeStamp' and 'labels' to get the count of each attack type at each timestamp
grouped_data = data.groupby(['TimeStamp', 'labels']).size().reset_index(name='count')

# Create a new column 'AttackTypeDescription' by mapping the 'labels' to their corresponding descriptions
grouped_data['AttackTypeDescription'] = grouped_data['labels'].map(attack_descriptions)

In [None]:
# Create the plot
fig = px.line(
    grouped_data,
    x='TimeStamp',
    y='count',
    color='AttackTypeDescription',
    markers=True,
    hover_data={'AttackTypeDescription': True},  # Show attack descriptions on hover
)

# Update the layout for better readability (optional)
fig.update_layout(
    title='Attack Type Distribution Over Time',
    xaxis_title='Time',
    yaxis_title='Count',
    legend_title='Attack Type',
)

# Show the plot
fig.show()

Output hidden; open in https://colab.research.google.com to view.

In [None]:
# Create an instance of LabelEncoder
le = LabelEncoder()

# Iterate over all columns in the DataFrame
for column in data.columns:
    # Check if the column is non-numeric (categorical)
    if data[column].dtype == 'object':
        # Fit and transform the column using LabelEncoder
        data[column] = le.fit_transform(data[column])

# Now, the non-numeric columns have been converted to numerical labels
data


In [None]:
data['SourceIP']

In [None]:
data.describe()

In [None]:
# Separate the data into different classes
benign_data = data[data['labels'] == 0].head(100)
malicious_data = data[data['labels'] != 0].head(300)

# Combine the data samples
small_sample = pd.concat([benign_data, malicious_data], ignore_index=True)

# Print the small sample
# data = small_sample.copy()

In [None]:
# Create the feature variables (X) by dropping the "TimeStamp" and "labels" columns from the DataFrame 'data'
#X = data.drop(["TimeStamp", "labels"], axis=1)
X = data.drop(["TimeStamp", "labels"], axis=1)
# 'data.drop(["TimeStamp", "labels"], axis=1)' removes the "TimeStamp" and "labels" columns from 'data' and returns a new DataFrame 'X'
# The 'axis=1' parameter specifies that we want to drop columns, not rows.

# Create the target variable (y) by extracting the values from the "labels" column of the DataFrame 'data'
#y = data['labels'].values
y = data['labels'].values

# 'data['labels']' accesses the "labels" column in 'data', and '.values' extracts the values as a NumPy array.
# The resulting 'y' will be a one-dimensional NumPy array containing the target labels.


In [None]:

imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X)

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_imputed)


In [None]:
# Split the data into training and testing sets using a test size of 50% (0.5) of the entire dataset
# The random_state parameter ensures reproducibility by fixing the random seed used for the split.
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.5, random_state=1)

# Further split the training set into training and validation sets using a test size of 25% (0.25) of the training set
# The random_state parameter ensures consistency between different runs by using the same random seed as before.
# The validation set size will be 25% of 50% (0.25 x 0.5 = 0.125) of the entire dataset.
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=1)


In [None]:

def displayClasificationResults(z, y_test, y_pred, numClasses=4):
    # Calculate and display the number of mislabeled points and accuracy
    print("Number of mislabeled points out of a total %d points: %d"
          % (y_test.shape[0], (y_test != y_pred).sum()))
    accuracy = round(100 - (((y_test != y_pred).sum() / y_test.shape[0]) * 100), 2)
    print(f"Accuracy is {accuracy}%")

    # Calculate and display precision, recall, and F-score (weighted average)
    precision, recall, fscore, support = precision_recall_fscore_support(
        y_test, y_pred, average='weighted')
    precision *= 100
    recall *= 100
    fscore *= 100
    print(f"Precision = {round(precision, 2)}%")
    print(f"Recall = {round(recall, 2)}%")
    print(f"F-score = {round(fscore, 2)}%")

    # Set the labels for x and y axes in the confusion matrix
    if numClasses == 2:
        x = ['benign', 'malicious']
        y = ['benign', 'malicious']
    else:
        x = ['benign', 'iodine', 'dns2tcp', 'dnscat2']
        y = ['benign', 'iodine', 'dns2tcp', 'dnscat2']

    # Change each element of z to type string for annotations in the heatmap
    z_text = [[str(y) for y in x] for x in z]

    # Create an annotated heatmap using Plotly with the confusion matrix
    fig = ff.create_annotated_heatmap(
        z, x=x, y=y, annotation_text=z_text, colorscale='Viridis')

    # Add title and custom axis titles to the heatmap
    fig.update_layout(title_text='<i><b>Confusion matrix</b></i>')

    # Add custom x-axis title
    fig.add_annotation(dict(font=dict(color="black", size=14),
                            x=0.5,
                            y=-0.15,
                            showarrow=False,
                            text="Predicted value",
                            xref="paper",
                            yref="paper"))

    # Add custom y-axis title with angle adjustment
    fig.add_annotation(dict(font=dict(color="black", size=14),
                            x=-0.35,
                            y=0.5,
                            showarrow=False,
                            text="Real value",
                            textangle=-90,
                            xref="paper",
                            yref="paper"))

    # Adjust margins to make room for the y-axis title
    fig.update_layout(margin=dict(t=50, l=200))

    # Add colorbar to the heatmap
    fig['data'][0]['showscale'] = True

    # Show the heatmap
    iplot(fig)




In [None]:
# Create a RandomForestClassifier with 500 estimators and a fixed random state for reproducibility
# rfc_4_classification = RandomForestClassifier(n_estimators=500, random_state=1)

from sklearn.ensemble import RandomForestClassifier

rfc_4_classification = RandomForestClassifier(
    n_estimators=50,
    max_depth=10,
    max_features='sqrt',
    n_jobs=-1,
    random_state=42
)

In [None]:
# Train the RandomForestClassifier on the training data (X_train, y_train) and make predictions on the training data
y_pred = rfc_4_classification.fit(X_train, y_train).predict(X_train)

# Compute the confusion matrix using the actual training labels (y_train) and the predicted labels (y_pred)
z = confusion_matrix(y_train, y_pred)

In [None]:
# Display the classification results using the 'displayClasificationResults' function
# The function will show the number of mislabeled points, accuracy, precision, recall, and an annotated heatmap of the confusion matrix.
displayClasificationResults(z, y_train, y_pred)

In [None]:
# Make predictions on the test data (X_test) using the trained RandomForestClassifier
y_pred = rfc_4_classification.predict(X_test)

# Compute the confusion matrix using the actual test labels (y_test) and the predicted labels (y_pred)
z = confusion_matrix(y_test, y_pred)

# Display the classification results using the 'displayClasificationResults' function
# The function will show the number of mislabeled points, accuracy, precision, recall, and an annotated heatmap of the confusion matrix.
displayClasificationResults(z, y_test, y_pred)


In [None]:
# Make predictions on the validation data (X_val) using the trained RandomForestClassifier
y_pred = rfc_4_classification.predict(X_val)

# Compute the confusion matrix using the actual validation labels (y_val) and the predicted labels (y_pred)
z = confusion_matrix(y_val, y_pred)

# Display the classification results using the 'displayClasificationResults' function
# The function will show the number of mislabeled points, accuracy, precision, recall, and an annotated heatmap of the confusion matrix.
displayClasificationResults(z, y_val, y_pred)


In [None]:


# Calculate permutation feature importance
perm_importance = permutation_importance(rfc_4_classification, X_test, y_test, n_repeats=30, random_state=1)

# Obtain feature names
feature_names = list(X.columns)

# Sort features by importance scores
sorted_idx = perm_importance.importances_mean.argsort()

# Plot feature importance
plt.figure(figsize=(10, 6))
plt.barh(range(len(sorted_idx)), perm_importance.importances_mean[sorted_idx], align='center')
plt.yticks(range(len(sorted_idx)), [feature_names[i] for i in sorted_idx])
plt.xlabel('Permutation Importance')
plt.title('Feature Importance - Permutation Importance')
plt.show()
# Print feature names and their importance values in descending order
for idx in reversed(sorted_idx):
    print(f"{feature_names[idx]}: {perm_importance.importances_mean[idx]}")


In [None]:
# Print feature names and their importance values in descending order
for idx in reversed(sorted_idx):
    print(f"{feature_names[idx]}: {perm_importance.importances_mean[idx]}")


In [None]:
# Extract the top five feature names and their importance values
top_feature_indices = sorted_idx[-5:]
top_feature_names = [feature_names[idx] for idx in top_feature_indices]
top_feature_importances = [perm_importance.importances_mean[idx] for idx in top_feature_indices]

# Print the top five features and their importance values
print("Top Five Features and Their Importance Values:")
for feature, importance in zip(top_feature_names, top_feature_importances):
    print(f"{feature}: {importance}")

In [None]:
# Create a deep copy of data and name it newdata
newdata = data.copy()

# Create new combinations of the top five features using the mean
new_feature_combinations = []
for i in range(5):
    for j in range(i + 1, 5):
        new_combination = f"{top_feature_names[i]}_{top_feature_names[j]}_mean"
        new_feature_combinations.append(new_combination)

# Add the new feature combinations (mean) to the newdata dataframe
for combination in new_feature_combinations:
    feature_indices = [top_feature_names.index(name) for name in combination.split('_')[:-1]]
    newdata[combination] = newdata[top_feature_names].iloc[:, feature_indices].mean(axis=1)


# Print the first few rows of the dataframe to verify the additions
print("Updated DataFrame with New Feature Combinations:")
newdata


In [None]:
# Prepare the new features and labels for machine learning
X_new = newdata[new_feature_combinations].values
y_new = newdata['labels'].values


# Split the data into training, validation, and testing sets
X_train_all, X_temp_all, y_train_all, y_temp_all = train_test_split(X_new, y_new, test_size=0.5, random_state=1)
X_val_all, X_test_all, y_val_all, y_test_all = train_test_split(X_temp_all, y_temp_all, test_size=0.25, random_state=1)

# Create a new RandomForestClassifier for the updated dataset
# rfc_new_all = RandomForestClassifier(n_estimators=500, random_state=1)
rfc_new_all = RandomForestClassifier(
    n_estimators=50,
    max_depth=10,
    max_features='sqrt',
    n_jobs=-1,
    random_state=42
)
# Train the new classifier on the training data
rfc_new_all.fit(X_train_all, y_train_all)

# Make predictions on the training, validation, and testing data
y_pred_train_all = rfc_new_all.predict(X_train_all)
y_pred_val_all = rfc_new_all.predict(X_val_all)
y_pred_test_all = rfc_new_all.predict(X_test_all)


In [None]:
# Display classification results for the training data
print("Classification Results for Training Data:")
displayClasificationResults(confusion_matrix(y_train_all, y_pred_train_all), y_train_all, y_pred_train_all)

In [None]:
# Display classification results for the validation data
print("Classification Results for Validation Data:")
displayClasificationResults(confusion_matrix(y_val_all, y_pred_val_all), y_val_all, y_pred_val_all)

In [None]:
# Display classification results for the testing data
print("Classification Results for Testing Data:")
displayClasificationResults(confusion_matrix(y_test_all, y_pred_test_all), y_test_all, y_pred_test_all)

In [None]:
# Create the feature variables (X) by dropping the "TimeStamp" and "labels" columns from the DataFrame 'data'
X_all = newdata.drop(["TimeStamp", "labels"], axis=1)

# Create the target variable (y) by extracting the values from the "labels" column of the DataFrame 'data'
y_all = newdata['labels'].values
X_imputed_all = imputer.fit_transform(X_all)
X_scaled_all = scaler.fit_transform(X_imputed_all)


# Split the data into training, validation, and testing sets
X_train_all, X_temp_all, y_train_all, y_temp_all = train_test_split(X_scaled_all, y_all, test_size=0.5, random_state=1)
X_val_all, X_test_all, y_val_all, y_test_all = train_test_split(X_temp_all, y_temp_all, test_size=0.25, random_state=1)

# Create a new RandomForestClassifier for the updated dataset
# rfc_new_all = RandomForestClassifier(n_estimators=500, random_state=1)
rfc_new_all = RandomForestClassifier(
    n_estimators=50,
    max_depth=10,
    max_features='sqrt',
    n_jobs=-1,
    random_state=42
)
# Train the new classifier on the training data
rfc_new_all.fit(X_train_all, y_train_all)

# Make predictions on the training, validation, and testing data
y_pred_train_all = rfc_new_all.predict(X_train_all)
y_pred_val_all = rfc_new_all.predict(X_val_all)
y_pred_test_all = rfc_new_all.predict(X_test_all)


In [None]:
# Display classification results for the training data
print("Classification Results for Training Data:")
displayClasificationResults(confusion_matrix(y_train_all, y_pred_train_all), y_train_all, y_pred_train_all)

In [None]:
# Display classification results for the validation data
print("Classification Results for Validation Data:")
displayClasificationResults(confusion_matrix(y_val_all, y_pred_val_all), y_val_all, y_pred_val_all)

In [None]:
# Display classification results for the testing data
print("Classification Results for Testing Data:")
displayClasificationResults(confusion_matrix(y_test_all, y_pred_test_all), y_test_all, y_pred_test_all)

In [None]:

# Calculate permutation feature importance
perm_importance = permutation_importance(rfc_new_all, X_test_all, y_test_all, n_repeats=30, random_state=1)

# Obtain feature names
feature_names = list(X_all.columns)

# Sort features by importance scores
sorted_idx = perm_importance.importances_mean.argsort()

# Plot feature importance
plt.figure(figsize=(10, 6))
plt.barh(range(len(sorted_idx)), perm_importance.importances_mean[sorted_idx], align='center')
plt.yticks(range(len(sorted_idx)), [feature_names[i] for i in sorted_idx])
plt.xlabel('Permutation Importance')
plt.title('Feature Importance - Permutation Importance')
plt.show()
# Print feature names and their importance values in descending order
for idx in reversed(sorted_idx):
    print(f"{feature_names[idx]}: {perm_importance.importances_mean[idx]}")

In [None]:
penalty_enabled=False

# Function to evaluate an individual's fitness
def evaluate(individual, X_train_selected, X_test_selected, y_train_all, y_test_all, penalty_enabled=True):
    selected_features = [i for i, is_selected in enumerate(individual) if is_selected]
    X_train_selected = X_train_selected[:, selected_features]
    X_test_selected = X_test_selected[:, selected_features]

    #classifier = RandomForestClassifier(n_estimators=500, random_state=1)
    classifier = RandomForestClassifier(
        n_estimators=50,
        max_depth=10,
        max_features='sqrt',
        n_jobs=-1,
        random_state=42
    )
    classifier.fit(X_train_selected, y_train_all)
    y_pred = classifier.predict(X_test_selected)

    accuracy = f1_score(y_test_all, y_pred, average='weighted')  # Use 'micro', 'macro', or 'weighted' as needed

    # Calculate the count of ones in the gene (number of selected features)
    ones_count = sum(individual)

    # Penalize having more ones (features) if penalty_enabled is True
    if penalty_enabled:
        ones_penalty = ones_count / len(individual)
        fitness = accuracy - ones_penalty
    else:
        fitness = accuracy

    return fitness,

# Skip the creator creation if already defined
try:
    creator.FitnessMax
except AttributeError:
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))

# Skip the creator creation if already defined
try:
    creator.Individual
except AttributeError:
    creator.create("Individual", list, fitness=creator.FitnessMax)

# Create the DEAP Toolbox
toolbox = base.Toolbox()

# Create the attributes and register them with the toolbox
n_features = len(X_test_all[0])
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=n_features)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# Register the evaluation function with the toolbox and pass dataset
toolbox.register("evaluate", evaluate, X_train_selected=X_train_all, X_test_selected=X_test_all, y_train_all=y_train_all, y_test_all=y_test_all)

# Create a new generation of individuals using tournament selection and one-point crossover
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

# Define the number of generations and population size
# n_generations = 10
# population_size = 500

n_generations = 2
population_size = 100


# Create an initial population with only two features
initial_population = [toolbox.individual() for _ in range(2)]

# Evaluate the initial population
initial_fitnesses = list(map(toolbox.evaluate, initial_population))
for ind, fit in zip(initial_population, initial_fitnesses):
    ind.fitness.values = fit

# Combine the initial population with the main population
population = initial_population + toolbox.population(n=population_size - 2)

# Track the best individual across generations
best_individual = None
best_fitness = float('-inf')

# Create a list to store results
results = []

# Start the evolution process
for generation in range(n_generations):
    print(f"Generation {generation + 1}/{n_generations}")

    start_time = time.time()  # Record the start time for the current generation

    # Select the next generation of individuals
    offspring = toolbox.select(population, len(population))

    # Clone the selected individuals
    offspring = list(map(toolbox.clone, offspring))

    # Apply crossover and mutation to the offspring
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < 0.5:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    # Ensure that no new individuals have all zeros
    for mutant in offspring:
        if random.random() < 0.2:
            toolbox.mutate(mutant)
            del mutant.fitness.values

            # Check if the new individual has all zeros
            if sum(mutant) == 0:
                # If it has all zeros, randomly set one gene to 1
                random_index = random.randint(0, len(mutant) - 1)
                mutant[random_index] = 1

    # Evaluate the fitness of the offspring
    fitnesses = list(map(toolbox.evaluate, offspring))
    for ind, fit in zip(offspring, fitnesses):
        ind.fitness.values = fit

    # Replace the old population with the offspring
    population[:] = offspring

    # Update the best individual and fitness
    for ind in population:
        if ind.fitness.values[0] > best_fitness:
            best_individual = ind
            best_fitness = ind.fitness.values[0]

    # Print the best fitness value in this generation
    print(f"Best Fitness: {best_fitness:.4f}")

    end_time = time.time()  # Record the end time for the current generation
    time_taken = end_time - start_time  # Calculate the time taken for the current generation

    # Print the time taken for the current generation
    print(f"Time taken for Generation {generation + 1}: {time_taken:.2f} seconds")

    selected_features_indices = [i for i, is_selected in enumerate(best_individual) if is_selected]
    selected_feature_names = [feature_names[i] for i in selected_features_indices]

    # Store results for this generation
    results.append({'Generation': generation + 1, 'Iteration': generation + 1,
                     'Best Fitness': best_fitness, 'Time Taken': time_taken,
                     'Feature Len':len(selected_feature_names),'Feature Names':selected_feature_names})

    # Check if best_fitness reached 1.0, and stop if true
    if best_fitness == 1.0:
        print("Accuracy reached 1.0. Stopping evolution.")
        break

# Print the selected features in the best individual
selected_features_indices = [i for i, is_selected in enumerate(best_individual) if is_selected]
selected_feature_names = [feature_names[i] for i in selected_features_indices]
print("Selected features in the best individual:", selected_feature_names)

results_df = pd.DataFrame(results)

# Display the results DataFrame
print(results_df)

# Save the results to an Excel file
results_df.to_excel("evolution_results.xlsx", index=False)

# Save the best individual to a file
best_individual_file_name = "best_individual.pkl"
with open(best_individual_file_name, "wb") as best_individual_file:
    pickle.dump(best_individual, best_individual_file)

# Save the feature names to a file
feature_names_file_name = "feature_names.pkl"
with open(feature_names_file_name, "wb") as feature_names_file:
    pickle.dump(feature_names, feature_names_file)

# Print final results
print("Final Best Fitness:", best_fitness)


In [None]:
best_individual_file_name = "best_individual.pkl"
feature_names_file_name = "feature_names.pkl"

# Load the best individual from the file
with open(best_individual_file_name, "rb") as best_individual_file:
    loaded_best_individual = pickle.load(best_individual_file)

# Load the feature_names from the file
with open(feature_names_file_name, "rb") as feature_names_file:
    feature_names = pickle.load(feature_names_file)

# Test the loaded best individual on a random instance from the testing data
random_instance_index = random.randint(0, len(X_test_all) - 1)
X_random_instance = X_test_all[random_instance_index, :]
y_random_instance = y_test_all[random_instance_index]

selected_features_indices = [i for i, is_selected in enumerate(loaded_best_individual) if is_selected]
selected_feature_names = [feature_names[i] for i in selected_features_indices]
X_random_instance_selected = X_random_instance[selected_features_indices]

# classifier = RandomForestClassifier(n_estimators=500, random_state=1)
classifier = RandomForestClassifier(
    n_estimators=50,
    max_depth=10,
    max_features='sqrt',
    n_jobs=-1,
    random_state=42
)
classifier.fit(X_train_all[:, selected_features_indices], y_train_all)


y_pred_random_instance = classifier.predict([X_random_instance_selected])
# Define class names
class_names = ['benign', 'iodine', 'dns2tcp', 'dnscat2']

# Print the actual instance label and predicted instance label with class names
print(f"Actual instance:    ({y_random_instance})-{class_names[y_random_instance]}")
print(f"Predicted instance: ({y_pred_random_instance[0]})-{class_names[y_pred_random_instance[0]]}")
print("Selected features in the best individual:", selected_feature_names)


In [None]:
penalty_enabled=True

# Function to evaluate an individual's fitness
def evaluate(individual, X_train_selected, X_test_selected, y_train_all, y_test_all, penalty_enabled=True):
    selected_features = [i for i, is_selected in enumerate(individual) if is_selected]
    X_train_selected = X_train_selected[:, selected_features]
    X_test_selected = X_test_selected[:, selected_features]

    # classifier = RandomForestClassifier(n_estimators=500, random_state=1)
    classifier = RandomForestClassifier(
    n_estimators=50,
    max_depth=10,
    max_features='sqrt',
    n_jobs=-1,
    random_state=42
)
    classifier.fit(X_train_selected, y_train_all)
    y_pred = classifier.predict(X_test_selected)

    accuracy = f1_score(y_test_all, y_pred, average='weighted')  # Use 'micro', 'macro', or 'weighted' as needed

    # Calculate the count of ones in the gene (number of selected features)
    ones_count = sum(individual)

    # Penalize having more ones (features) if penalty_enabled is True
    if penalty_enabled:
        ones_penalty = ones_count / len(individual)
        fitness = accuracy - ones_penalty
    else:
        fitness = accuracy

    return fitness,

# Skip the creator creation if already defined
try:
    creator.FitnessMax
except AttributeError:
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))

# Skip the creator creation if already defined
try:
    creator.Individual
except AttributeError:
    creator.create("Individual", list, fitness=creator.FitnessMax)

# Create the DEAP Toolbox
toolbox = base.Toolbox()

# Create the attributes and register them with the toolbox
n_features = len(X_test_all[0])
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=n_features)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# Register the evaluation function with the toolbox and pass dataset
toolbox.register("evaluate", evaluate, X_train_selected=X_train_all, X_test_selected=X_test_all, y_train_all=y_train_all, y_test_all=y_test_all)

# Create a new generation of individuals using tournament selection and one-point crossover
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

# Define the number of generations and population size
n_generations = 2
population_size = 100

# Create an initial population with only two features
initial_population = [toolbox.individual() for _ in range(2)]

# Evaluate the initial population
initial_fitnesses = list(map(toolbox.evaluate, initial_population))
for ind, fit in zip(initial_population, initial_fitnesses):
    ind.fitness.values = fit

# Combine the initial population with the main population
population = initial_population + toolbox.population(n=population_size - 2)

# Track the best individual across generations
best_individual = None
best_fitness = float('-inf')

# Create a list to store results
results = []

# Start the evolution process
for generation in range(n_generations):
    print(f"Generation {generation + 1}/{n_generations}")

    start_time = time.time()  # Record the start time for the current generation

    # Select the next generation of individuals
    offspring = toolbox.select(population, len(population))

    # Clone the selected individuals
    offspring = list(map(toolbox.clone, offspring))

    # Apply crossover and mutation to the offspring
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < 0.5:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    # Ensure that no new individuals have all zeros
    for mutant in offspring:
        if random.random() < 0.2:
            toolbox.mutate(mutant)
            del mutant.fitness.values

            # Check if the new individual has all zeros
            if sum(mutant) == 0:
                # If it has all zeros, randomly set one gene to 1
                random_index = random.randint(0, len(mutant) - 1)
                mutant[random_index] = 1

    # Evaluate the fitness of the offspring
    fitnesses = list(map(toolbox.evaluate, offspring))
    for ind, fit in zip(offspring, fitnesses):
        ind.fitness.values = fit

    # Replace the old population with the offspring
    population[:] = offspring

    # Update the best individual and fitness
    for ind in population:
        if ind.fitness.values[0] > best_fitness:
            best_individual = ind
            best_fitness = ind.fitness.values[0]

    # Print the best fitness value in this generation
    print(f"Best Fitness: {best_fitness:.4f}")

    end_time = time.time()  # Record the end time for the current generation
    time_taken = end_time - start_time  # Calculate the time taken for the current generation

    # Print the time taken for the current generation
    print(f"Time taken for Generation {generation + 1}: {time_taken:.2f} seconds")

    selected_features_indices = [i for i, is_selected in enumerate(best_individual) if is_selected]
    selected_feature_names = [feature_names[i] for i in selected_features_indices]

    # Store results for this generation
    results.append({'Generation': generation + 1, 'Iteration': generation + 1,
                     'Best Fitness': best_fitness, 'Time Taken': time_taken,
                     'Feature Len':len(selected_feature_names),'Feature Names':selected_feature_names})

    # Check if best_fitness reached 1.0, and stop if true
    if best_fitness == 1.0:
        print("Accuracy reached 1.0. Stopping evolution.")
        break

# Print the selected features in the best individual
selected_features_indices = [i for i, is_selected in enumerate(best_individual) if is_selected]
selected_feature_names = [feature_names[i] for i in selected_features_indices]
print("Selected features in the best individual:", selected_feature_names)

results_df = pd.DataFrame(results)

# Display the results DataFrame
print(results_df)

# Save the results to an Excel file
results_df.to_excel("evolution_results.xlsx", index=False)

# Save the best individual to a file
best_individual_file_name = "best_individual.pkl"
with open(best_individual_file_name, "wb") as best_individual_file:
    pickle.dump(best_individual, best_individual_file)

# Save the feature names to a file
feature_names_file_name = "feature_names.pkl"
with open(feature_names_file_name, "wb") as feature_names_file:
    pickle.dump(feature_names, feature_names_file)

# Print final results
print("Final Best Fitness:", best_fitness)


In [None]:
best_individual_file_name = "best_individual.pkl"
feature_names_file_name = "feature_names.pkl"

# Load the best individual from the file
with open(best_individual_file_name, "rb") as best_individual_file:
    loaded_best_individual = pickle.load(best_individual_file)

# Load the feature_names from the file
with open(feature_names_file_name, "rb") as feature_names_file:
    feature_names = pickle.load(feature_names_file)

# Test the loaded best individual on a random instance from the testing data
random_instance_index = random.randint(0, len(X_test_all) - 1)
X_random_instance = X_test_all[random_instance_index, :]
y_random_instance = y_test_all[random_instance_index]

selected_features_indices = [i for i, is_selected in enumerate(loaded_best_individual) if is_selected]
selected_feature_names = [feature_names[i] for i in selected_features_indices]
X_random_instance_selected = X_random_instance[selected_features_indices]

# classifier = RandomForestClassifier(n_estimators=500, random_state=1)
classifier = RandomForestClassifier(
    n_estimators=50,
    max_depth=10,
    max_features='sqrt',
    n_jobs=-1,
    random_state=42
)

classifier.fit(X_train_all[:, selected_features_indices], y_train_all)


y_pred_random_instance = classifier.predict([X_random_instance_selected])
# Define class names
class_names = ['benign', 'iodine', 'dns2tcp', 'dnscat2']

# Print the actual instance label and predicted instance label with class names
print(f"Actual instance:    ({y_random_instance})-{class_names[y_random_instance]}")
print(f"Predicted instance: ({y_pred_random_instance[0]})-{class_names[y_pred_random_instance[0]]}")
print("Selected features in the best individual:", selected_feature_names)


# Comparing Standard GA and Enhanced GA on Benchmark Functions

## Introduction

This project compares the **Standard Genetic Algorithm (GA)** with an **Enhanced Genetic Algorithm (Enhanced GA)**. The Enhanced GA includes penalties to improve optimization performance. These algorithms are tested on common mathematical benchmark functions.

---

## Benchmark Functions

### 1. Sphere Function (Unimodal)
$$
f(x) = \sum_{i=1}^n x_i^2
$$
- **Type**: Unimodal  
- **Domain**: \( x_i \in [-5.12, 5.12] \)  
- **Global Minimum**: \( f(0, 0, \dots, 0) = 0 \)

---

### 2. Rastrigin Function (Multimodal)
$$
f(x) = 10n + \sum_{i=1}^n \left( x_i^2 - 10\cos(2\pi x_i) \right)
$$
- **Type**: Multimodal  
- **Domain**: \( x_i \in [-5.12, 5.12] \)  
- **Global Minimum**: \( f(0, 0, \dots, 0) = 0 \)

---

## Genetic Algorithm Overview

### Standard GA
A standard genetic algorithm follows these steps:
1. **Initialization**: Randomly generate a population of solutions.
2. **Selection**: Select individuals based on fitness.
3. **Crossover**: Combine two individuals to create offspring.
4. **Mutation**: Introduce random changes to maintain diversity.
5. **Evaluation**: Evaluate the fitness of offspring.
6. **Iteration**: Repeat the above steps for several generations.

### Enhanced GA
The Enhanced GA modifies the standard GA by:
1. **Penalty Mechanism**: Adds a penalty to fitness based on undesirable traits (e.g., large values).
2. **Dynamic Mutation**: Adjusts mutation probability during evolution.

---

## Implementation


In [3]:
!pip install deap
import random
import numpy as np
from deap import base, creator, tools, algorithms
import plotly.graph_objects as go

# Define Benchmark Functions
def sphere(individual):
    """Sphere Function: f(x) = sum(x_i^2)"""
    return sum(x**2 for x in individual),

def rastrigin(individual):
    """Rastrigin Function: f(x) = 10n + sum(x_i^2 - 10*cos(2*pi*x_i))"""
    return 10 * len(individual) + sum(x**2 - 10 * np.cos(2 * np.pi * x) for x in individual),

# Setup Genetic Algorithm (GA)
def setup_ga(bounds, n_dimensions):
    """Sets up the Genetic Algorithm."""
    lower_bound, upper_bound = bounds

    # Skip re-creating classes if they are already defined
    if not hasattr(creator, "FitnessMin"):
        creator.create("FitnessMin", base.Fitness, weights=(-1.0,))  # Minimize the function
    if not hasattr(creator, "Individual"):
        creator.create("Individual", list, fitness=creator.FitnessMin)

    # Toolbox Registration
    toolbox = base.Toolbox()
    toolbox.register("attr_float", random.uniform, lower_bound, upper_bound)
    toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_float, n=n_dimensions)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)

    toolbox.register("mate", tools.cxBlend, alpha=0.5)
    toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.2)
    toolbox.register("select", tools.selTournament, tournsize=3)
    return toolbox

# Run Standard GA
def run_standard_ga(toolbox, evaluate_func, n_population, n_generations):
    """Runs the Standard Genetic Algorithm."""
    toolbox.register("evaluate", evaluate_func)
    population = toolbox.population(n=n_population)
    hof = tools.HallOfFame(1)  # Track the best solution
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("min", np.min)
    stats.register("avg", np.mean)

    population, logbook = algorithms.eaSimple(
        population, toolbox, cxpb=0.7, mutpb=0.2, ngen=n_generations,
        stats=stats, halloffame=hof, verbose=False
    )

    return hof[0], logbook

# Run Enhanced GA
def run_enhanced_ga(toolbox, evaluate_func, penalty_func, n_population, n_generations):
    """Runs the Enhanced Genetic Algorithm with penalties."""
    def evaluate_with_penalty(ind):
        base_fitness = evaluate_func(ind)[0]
        penalty = penalty_func(ind)
        return base_fitness + penalty,

    toolbox.register("evaluate", evaluate_with_penalty)
    population = toolbox.population(n=n_population)
    hof = tools.HallOfFame(1)  # Track the best solution
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("min", np.min)
    stats.register("avg", np.mean)

    population, logbook = algorithms.eaSimple(
        population, toolbox, cxpb=0.7, mutpb=0.2, ngen=n_generations,
        stats=stats, halloffame=hof, verbose=False
    )

    return hof[0], logbook

# Define Penalty Functions
def simple_penalty(individual):
    """Penalizes large values."""
    return sum(abs(x) for x in individual) / len(individual)

# Visualization using Plotly
def plot_convergence(logbooks, labels):
    """Plots the convergence of fitness values using Plotly."""
    fig = go.Figure()

    for logbook, label in zip(logbooks, labels):
        gen = logbook.select("gen")
        fit_mins = logbook.select("min")
        fig.add_trace(go.Scatter(x=gen, y=fit_mins, mode='lines+markers', name=label))

    fig.update_layout(
        title="Fitness Convergence Over Generations",
        xaxis_title="Generation",
        yaxis_title="Fitness (Lower is Better)",
        legend_title="Algorithm",
        template="plotly"
    )
    fig.show()

# Experiment Parameters
n_dimensions = 10
bounds = (-5.12, 5.12)  # Range for Rastrigin and Sphere functions
n_population = 100
n_generations = 50

# Initialize GA
toolbox = setup_ga(bounds, n_dimensions)

# Run Experiments
print("Running Standard GA on Sphere...")
best_standard_sphere, log_standard_sphere = run_standard_ga(toolbox, sphere, n_population, n_generations)

print("Running Enhanced GA on Sphere...")
best_enhanced_sphere, log_enhanced_sphere = run_enhanced_ga(toolbox, sphere, simple_penalty, n_population, n_generations)

print("Running Standard GA on Rastrigin...")
best_standard_rastrigin, log_standard_rastrigin = run_standard_ga(toolbox, rastrigin, n_population, n_generations)

print("Running Enhanced GA on Rastrigin...")
best_enhanced_rastrigin, log_enhanced_rastrigin = run_enhanced_ga(toolbox, rastrigin, simple_penalty, n_population, n_generations)

# Results
print("\nBest Individual (Standard GA - Sphere):", best_standard_sphere)
print("Best Individual (Enhanced GA - Sphere):", best_enhanced_sphere)

print("\nBest Individual (Standard GA - Rastrigin):", best_standard_rastrigin)
print("Best Individual (Enhanced GA - Rastrigin):", best_enhanced_rastrigin)

# Plot Convergence
plot_convergence(
    [log_standard_sphere, log_enhanced_sphere],
    ["Standard GA - Sphere", "Enhanced GA - Sphere"]
)

plot_convergence(
    [log_standard_rastrigin, log_enhanced_rastrigin],
    ["Standard GA - Rastrigin", "Enhanced GA - Rastrigin"]
)


Collecting deap
  Downloading deap-1.4.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading deap-1.4.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deap
Successfully installed deap-1.4.2
Running Standard GA on Sphere...
Running Enhanced GA on Sphere...
Running Standard GA on Rastrigin...
Running Enhanced GA on Rastrigin...

Best Individual (Standard GA - Sphere): [0.017717713309582255, 0.006752983026720055, -0.008911605440491535, -0.0019591881721735547, 0.002516347935905736, -0.008050789057978227, -0.002385153912009599, 0.014085414529486067, 0.009649104007259988, 0.0035597415531461356]
Best Individual (Enhanced GA - Sphere): [0.0010301783324623164, -0.0007391449540424463, -0.0017994391037613527, 