##GAN ARCH

###Data Creation

In [None]:
import torch
import torch.utils.data as data
import numpy as np

# Example sensor data
# No data points are given
# Assuming each data point has 10 sensor readings (features)

sensor_data = np.random.rand(1000, 10)
sensor_data_dim = sensor_data.shape[1]

sensor_data_tensor = torch.tensor(sensor_data, dtype=torch.float32)

# Create Batch for Training
batch_size = 64
dataloader = data.DataLoader(sensor_data_tensor, batch_size=batch_size, shuffle=True)

sensor_data_dim

10

In [None]:
import torch.nn as nn
import torch.optim as optim

###Generator Definition

In [None]:
class Generator(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.Linear(256, output_dim),
            nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)

###Discriminator Definition

In [None]:
class Discriminator(nn.Module):
    def __init__(self, input_dim):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(True),
            nn.Linear(256, 128),
            nn.ReLU(True),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

###Defining Hyperparameter

In [None]:
input_dim = 100
output_dim = sensor_data_dim
batch_size = 64
num_epochs = 100
lr = 0.0002

# Initialize the models
generator = Generator(input_dim, output_dim)
discriminator = Discriminator(output_dim)

# Loss function and optimizers
criterion = nn.BCELoss()
optimizer_g = optim.Adam(generator.parameters(), lr=lr)
optimizer_d = optim.Adam(discriminator.parameters(), lr=lr)


###Training

In [None]:
# Training Loop
for epoch in range(num_epochs):
    for real_data in dataloader:
        current_batch_size = real_data.size(0)

        # Train Discriminator

        optimizer_d.zero_grad()
        real_labels = torch.ones(current_batch_size, 1)
        fake_labels = torch.zeros(current_batch_size, 1)

        outputs = discriminator(real_data)
        d_loss_real = criterion(outputs, real_labels)

        z = torch.randn(current_batch_size, input_dim)
        fake_data = generator(z)
        outputs = discriminator(fake_data.detach())
        d_loss_fake = criterion(outputs, fake_labels)

        d_loss = d_loss_real + d_loss_fake
        d_loss.backward()
        optimizer_d.step()

        # Train Generator

        optimizer_g.zero_grad()
        z = torch.randn(current_batch_size, input_dim)
        fake_data = generator(z)
        outputs = discriminator(fake_data)
        g_loss = criterion(outputs, real_labels)

        g_loss.backward()
        optimizer_g.step()

    print(f"Epoch [{epoch+1}/{num_epochs}] D Loss: {d_loss.item()} G Loss: {g_loss.item()}")

Epoch [1/100] D Loss: 1.284868836402893 G Loss: 0.6848291158676147
Epoch [2/100] D Loss: 1.3614052534103394 G Loss: 0.5779516100883484
Epoch [3/100] D Loss: 1.4518139362335205 G Loss: 0.6057993769645691
Epoch [4/100] D Loss: 1.3735291957855225 G Loss: 0.8725172281265259
Epoch [5/100] D Loss: 1.2598413228988647 G Loss: 1.0636274814605713
Epoch [6/100] D Loss: 1.2149579524993896 G Loss: 0.9045301675796509
Epoch [7/100] D Loss: 1.4091651439666748 G Loss: 0.730186939239502
Epoch [8/100] D Loss: 1.1350481510162354 G Loss: 0.9606666564941406
Epoch [9/100] D Loss: 1.2005908489227295 G Loss: 0.6679550409317017
Epoch [10/100] D Loss: 1.4977399110794067 G Loss: 0.49771609902381897
Epoch [11/100] D Loss: 1.1668481826782227 G Loss: 0.8041625022888184
Epoch [12/100] D Loss: 1.1140117645263672 G Loss: 0.7353340983390808
Epoch [13/100] D Loss: 1.1719766855239868 G Loss: 0.7852832078933716
Epoch [14/100] D Loss: 1.4245593547821045 G Loss: 0.5952092409133911
Epoch [15/100] D Loss: 1.4148306846618652 G 

##REINFORCEMENT LEARNING AGENT

###RL AGENT DEFINITION

In [None]:
import numpy as np

class QLearningAgent:
    def __init__(self, state_dim, action_dim, lr=0.1, gamma=0.9, epsilon=0.1):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.q_table = np.zeros((state_dim, action_dim))

    def choose_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)
        state_array = np.array(state.detach())
        state_index = state_array.astype(int)
        return np.argmax(self.q_table[state_index])

    def update(self, state, action, reward, next_state):
        next_state_array = np.array(next_state.detach()).flatten().astype(int)
        best_next_action = np.argmax(self.q_table[next_state_array])
        td_target = reward + self.gamma * self.q_table[next_state_array, best_next_action]

        state_array = np.array(state.detach()).flatten().astype(int)
        td_error = td_target - self.q_table[state_array, action]
        self.q_table[state_array, action] += self.lr * td_error

In [None]:
def is_anomaly(data):
    with torch.no_grad():
        output = discriminator(data)
    return output.mean().item() < 0.5

###TRAINING RL AGENT

In [None]:
state_dim = sensor_data_dim
action_dim = 2  # 0: normal, 1: anomaly

# Initialize the Q-learning agent
agent = QLearningAgent(state_dim, action_dim)

# Training loop
for epoch in range(num_epochs):
    for real_data in dataloader:
        state = real_data

        # Agent chooses an action
        action = agent.choose_action(state)

        # Calculate reward based on action
        if action == 1 and is_anomaly(real_data):
            reward = 1
        elif action == 0 and not is_anomaly(real_data):
            reward = 1
        else:
            reward = -1

        next_state = preprocess(real_data)
        agent.update(state, action, reward, next_state)

    print(f"Epoch [{epoch+1}/{num_epochs}] Training complete")

Epoch [1/100] Training complete
Epoch [2/100] Training complete
Epoch [3/100] Training complete
Epoch [4/100] Training complete
Epoch [5/100] Training complete
Epoch [6/100] Training complete
Epoch [7/100] Training complete
Epoch [8/100] Training complete
Epoch [9/100] Training complete
Epoch [10/100] Training complete
Epoch [11/100] Training complete
Epoch [12/100] Training complete
Epoch [13/100] Training complete
Epoch [14/100] Training complete
Epoch [15/100] Training complete
Epoch [16/100] Training complete
Epoch [17/100] Training complete
Epoch [18/100] Training complete
Epoch [19/100] Training complete
Epoch [20/100] Training complete
Epoch [21/100] Training complete
Epoch [22/100] Training complete
Epoch [23/100] Training complete
Epoch [24/100] Training complete
Epoch [25/100] Training complete
Epoch [26/100] Training complete
Epoch [27/100] Training complete
Epoch [28/100] Training complete
Epoch [29/100] Training complete
Epoch [30/100] Training complete
Epoch [31/100] Trai

##TESTING PHASE

In [None]:
test_data = torch.randn(10, sensor_data_dim)

actions = [agent.choose_action(state) for state in test_data]

# Check for anomalies using the discriminator
for i, data_point in enumerate(test_data):
    is_anomalous = is_anomaly(data_point.unsqueeze(0))
    action_taken = "Anomaly" if actions[i] == 1 else "Normal"
    print(f"Data point {i+1}: Predicted as {action_taken}, Discriminator says {'Anomaly' if is_anomalous else 'Normal'}")

Data point 1: Predicted as Normal, Discriminator says Anomaly
Data point 2: Predicted as Normal, Discriminator says Anomaly
Data point 3: Predicted as Normal, Discriminator says Anomaly
Data point 4: Predicted as Normal, Discriminator says Anomaly
Data point 5: Predicted as Normal, Discriminator says Anomaly
Data point 6: Predicted as Normal, Discriminator says Anomaly
Data point 7: Predicted as Normal, Discriminator says Normal
Data point 8: Predicted as Normal, Discriminator says Normal
Data point 9: Predicted as Normal, Discriminator says Anomaly
Data point 10: Predicted as Normal, Discriminator says Anomaly
