# 🧠 Deep Q-Learning for Anomaly Detection (Improved)
This notebook implements Deep Q-Learning to detect anomalies with improved reward design and handling of class imbalance.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report
from sklearn.datasets import make_blobs
from collections import deque
import random
import torch
import torch.nn as nn
import torch.optim as optim


In [None]:
# Generate imbalanced synthetic data
n_samples = 300
outlier_fraction = 0.15
n_outliers = int(n_samples * outlier_fraction)
n_inliers = n_samples - n_outliers

X, _ = make_blobs(n_samples=n_inliers, centers=[[0, 0]], cluster_std=0.5, random_state=42)
X_outliers = np.random.uniform(low=-4, high=4, size=(n_outliers, 2))
X = np.vstack((X, X_outliers))

y = np.array([0] * n_inliers + [1] * n_outliers)  # 0 = normal, 1 = anomaly

# Normalize
scaler = StandardScaler()
X = scaler.fit_transform(X)


In [None]:
class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(QNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, output_dim)
        )

    def forward(self, x):
        return self.fc(x)


In [None]:
class RealDeepQLWrapper:
    def __init__(self, input_dim, episodes=100, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.99, gamma=0.95):
        self.episodes = episodes
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.gamma = gamma
        self.memory = deque(maxlen=10000)
        self.model = QNetwork(input_dim, 2)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.loss_fn = nn.MSELoss()

    def remember(self, state, action, reward, next_state):
        self.memory.append((state, action, reward, next_state))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.randint(2)
        state = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = self.model(state)
        return torch.argmax(q_values).item()

    def replay(self, batch_size=32):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state in minibatch:
            target = reward + self.gamma * torch.max(self.model(torch.FloatTensor(next_state)))
            current = self.model(torch.FloatTensor(state))[action]
            loss = self.loss_fn(current, target)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

    def fit(self, X, y):
        for e in range(self.episodes):
            for i in range(len(X)):
                state = X[i]
                label = y[i]
                action = self.act(state)
                predicted_label = action

                # Custom reward
                if label == 1 and predicted_label == 1:
                    reward = 1.0  # TP
                elif label == 1 and predicted_label == 0:
                    reward = -1.0  # FN
                elif label == 0 and predicted_label == 1:
                    reward = -0.5  # FP
                else:
                    reward = 0.1  # TN

                next_state = X[min(i + 1, len(X) - 1)]
                self.remember(state, action, reward, next_state)
                self.replay()

            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
            print(f"Episode {e+1}/{self.episodes}, Epsilon: {self.epsilon:.4f}")

    def predict(self, X):
        y_pred = []
        for i in range(len(X)):
            state = X[i]
            action = self.act(state)
            y_pred.append(action)
        return np.array(y_pred)


In [None]:
dql = RealDeepQLWrapper(input_dim=2, episodes=100)
dql.fit(X, y)
y_pred = dql.predict(X)

print("\n📊 Final Deep Q-Learning Performance:\n")
print(classification_report(y, y_pred, target_names=["Normal", "Anomaly"]))
