In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from collections import deque
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import recall_score, f1_score, accuracy_score
from tensorflow.keras import Input

In [3]:
# Hyperparameters
STATE_SIZE = 8  # Select relevant features
ACTION_SIZE = 2  # Genuine (0) or Misbehaving (1)
GAMMA = 0.99
LEARNING_RATE = 0.001
MEMORY_SIZE = 2000
BATCH_SIZE = 64
TARGET_UPDATE_FREQUENCY = 5  # Episodes

In [5]:
# Load and preprocess dataset
def load_data(filepath):
    df = pd.read_csv(filepath)
    features = ['spdx', 'spdy', 'aclx', 'acly', 'hedx', 'hedy', 'posx', 'posy']
    target = 'class'
    X = df[features].values
    y = df[target].values
    scaler = MinMaxScaler()
    X = scaler.fit_transform(X)
    return X, y

In [7]:
# DQN Model
def create_model(state_size, action_size):
    model = Sequential([
        Input(shape=(state_size,)), 
        Dense(64, activation='relu'),
        Dense(64, activation='relu'),
        Dense(action_size, activation='linear')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE), loss='mse')
    return model

In [9]:
# DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=MEMORY_SIZE)
        self.model = create_model(state_size, action_size)
        self.target_model = create_model(state_size, action_size)
        self.update_target_model()
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.1

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def replay(self):
        if len(self.memory) < BATCH_SIZE:
            return

        batch = random.sample(self.memory, BATCH_SIZE)
        for state, action, reward, next_state, done in batch:
            target = self.model.predict(state)
            if done:
                target[0][action] = reward
            else:
                t = self.target_model.predict(next_state)
                target[0][action] = reward + GAMMA * np.amax(t[0])

            self.model.fit(state, target, epochs=1, verbose=0)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [11]:
# Train and Evaluate
def train_dqn(X, y, episodes=100):
    agent = DQNAgent(STATE_SIZE, ACTION_SIZE)
    for e in range(episodes):
        indices = np.random.choice(len(X), 1000, replace=False)
    state = np.reshape(X[indices[0]], [1, STATE_SIZE])
    for t in range(len(indices)):
        action = agent.act(state)
        reward = 1 if action == y[indices[t]] else -1
        next_state = np.reshape(X[indices[t]], [1, STATE_SIZE]) if t < len(indices) - 1 else None
        done = t == len(indices) - 1
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            print(f"Episode {e+1}/{episodes}, Epsilon: {agent.epsilon:.2f}")
            break


        # Replay and update target network
        agent.replay()
        if e % TARGET_UPDATE_FREQUENCY == 0:
            agent.update_target_model()

    return agent


In [13]:
# Evaluate
from sklearn.metrics import recall_score, f1_score, accuracy_score

def evaluate(agent, X_test, y_test):
    y_pred = []
    for state in X_test:
        state = np.reshape(state, [1, state.shape[0]])
        action = agent.act(state)
        y_pred.append(action)
    
    recall = recall_score(y_test, y_pred, average='macro')  # Use macro for multiclass
    f1 = f1_score(y_test, y_pred, average='macro')
    accuracy = accuracy_score(y_test, y_pred)
    
    print(f"Recall: {recall:.2f}, F1 Score: {f1:.2f}, Accuracy: {accuracy:.2f}")
    return recall, f1, accuracy


In [None]:
# Main
if __name__ == "__main__":
    filepath = "C:/Users/Elroofey/Desktop/mixalldata_clean.csv"  # Replace with your dataset path
    X, y = load_data(filepath)
    split = int(0.8 * len(X))
    X_train, X_test, y_train, y_test = X[:split], X[split:], y[:split], y[split:]

    agent = train_dqn(X_train, y_train)
    evaluate(agent, X_test, y_test)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 531ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 179ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 155ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 159ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 164ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 162ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 159ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 158ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 156ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 155ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 160ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 165ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1