In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from scipy import sparse
import pickle

# Load and preprocess data
data = pd.read_csv(r'P:\project ides for final ptoject\project\IDS_Federated learning model\model\cleaned_NSL_KDD_dataset.csv', low_memory=False)
columns_to_keep = ['duration', 'protocol_type', 'service', 'src_bytes', 'dst_bytes', 'num_failed_logins', 'logged_in', 'attack']
data_filtered = data[columns_to_keep]
X = data_filtered.drop(columns=['attack'])
y = data_filtered['attack'].apply(lambda x: 0 if x == 'normal' else 1)

categorical_columns = ['protocol_type', 'service', 'logged_in']
numerical_columns = X.select_dtypes(include=['float64', 'int64']).columns.tolist()
preprocessor = ColumnTransformer(transformers=[
    ('num', StandardScaler(), numerical_columns),
    ('cat', OneHotEncoder(sparse_output=False), categorical_columns)
])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_train = preprocessor.fit_transform(X_train)
X_test = preprocessor.transform(X_test)

if sparse.issparse(X_train):
    X_train = X_train.toarray()
if sparse.issparse(X_test):
    X_test = X_test.toarray()

# Activation functions and loss
def leaky_relu(Z, alpha=0.01):
    return np.where(Z > 0, Z, alpha * Z)

def sigmoid(Z):
    return 1 / (1 + np.exp(-Z))

# Initialize weights and biases
def initialize_weights(layers):
    np.random.seed(42)
    weights = []
    biases = []
    for i in range(1, len(layers)):
        weight = np.random.randn(layers[i-1], layers[i]) * np.sqrt(2 / layers[i-1])
        bias = np.zeros((1, layers[i]))
        weights.append(weight)
        biases.append(bias)
    return weights, biases

# Forward propagation
def forward_propagation(X, weights, biases):
    A = X
    caches = []
    for i in range(len(weights) - 1):  # Loop through all layers except the final one
        Z = np.dot(A, weights[i]) + biases[i]
        A = leaky_relu(Z)
        caches.append((A, Z))
    Z_final = np.dot(A, weights[-1]) + biases[-1]
    A_final = sigmoid(Z_final)
    caches.append((A_final, Z_final))
    return A_final, caches

# Backward propagation
def backward_propagation(X_batch, y_batch, caches, weights, biases, learning_rate):
    m = X_batch.shape[0]
    A_final, Z_final = caches[-1]
    y_batch = y_batch.to_numpy().reshape(-1, 1)
    dZ_final = A_final - y_batch
    dW_final = np.dot(caches[-2][0].T, dZ_final) / m
    db_final = np.sum(dZ_final, axis=0, keepdims=True) / m
    weights[-1] -= learning_rate * dW_final
    biases[-1] -= learning_rate * db_final

    dA = dZ_final
    for i in reversed(range(len(weights) - 1)):
        A_prev, Z_prev = caches[i]
        dZ = dA.dot(weights[i+1].T) * (Z_prev > 0)
        dW = np.dot(caches[i-1][0].T, dZ) / m if i > 0 else np.dot(X_batch.T, dZ) / m
        db = np.sum(dZ, axis=0, keepdims=True) / m
        weights[i] -= learning_rate * dW
        biases[i] -= learning_rate * db
        dA = dZ
    return weights, biases

# Training function
def train(X_train, y_train, layers, epochs=30, batch_size=64, learning_rate=0.001):
    weights, biases = initialize_weights(layers)
    m = X_train.shape[0]

    for epoch in range(epochs):
        for i in range(0, m, batch_size):
            X_batch = X_train[i:i + batch_size]
            y_batch = y_train[i:i + batch_size]
            y_pred, caches = forward_propagation(X_batch, weights, biases)
            weights, biases = backward_propagation(X_batch, y_batch, caches, weights, biases, learning_rate)

        # Evaluate accuracy
        y_train_pred, _ = forward_propagation(X_train, weights, biases)
        y_test_pred, _ = forward_propagation(X_test, weights, biases)
        train_accuracy = np.mean((y_train_pred > 0.5).astype(int) == y_train.values.reshape(-1, 1))
        test_accuracy = np.mean((y_test_pred > 0.5).astype(int) == y_test.values.reshape(-1, 1))

        if epoch % 5 == 0:
            print(f"Epoch {epoch + 1}/{epochs}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")

    # Save final weights and biases to a pickle file
    final_model_data = {
        'weights': weights,
        'biases': biases
    }
    with open('model_weights_and_biases2.pickle', 'wb') as f:
        pickle.dump(final_model_data, f)
    print("Final model weights and biases saved to 'model_weights_and_biases2.pickle'.")

# Define layer structure
layers = [X_train.shape[1], 512, 256, 128, 1]  # Input layer, hidden layers, output layer

# Train the model
train(X_train, y_train, layers, epochs=30, batch_size=64, learning_rate=0.001)


In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import seaborn as sns
import matplotlib.pyplot as plt

# Load the saved model
with open('model_weights_and_biases2.pickle', 'rb') as f:
    model_data = pickle.load(f)

weights = model_data['weights']
biases = model_data['biases']

# Predict function
def predict(X, weights, biases):
    y_pred, _ = forward_propagation(X, weights, biases)
    return (y_pred > 0.5).astype(int)

# Get predictions for the test set
y_test_pred_binary = predict(X_test, weights, biases)

# Metrics calculation
accuracy = accuracy_score(y_test, y_test_pred_binary)
precision = precision_score(y_test, y_test_pred_binary)
recall = recall_score(y_test, y_test_pred_binary)
f1 = f1_score(y_test, y_test_pred_binary)

# Print metrics
print("Metrics on Test Set:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Confusion matrix
cm = confusion_matrix(y_test, y_test_pred_binary)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=["Normal", "Attack"], yticklabels=["Normal", "Attack"])
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()
