In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data, DataLoader
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Generate synthetic time series sensor data
np.random.seed(42)
time = np.arange(0, 100)
sensor_data = np.sin(0.1 * time) + np.random.normal(0, 0.1, size=len(time))

# Create a graph representation of the time series data
edge_index = torch.tensor([[i, i + 1] for i in range(len(sensor_data) - 1)], dtype=torch.long).t().contiguous()
x = torch.tensor(sensor_data, dtype=torch.float32)
data = Data(x=x, edge_index=edge_index)

# Split the data into training and testing sets
train_data = data[:80]
test_data = data[80:]

# Define the GNN model
class GNN(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(GNN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, input_dim)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return x

# Create a PyTorch geometric DataLoader for batch training
train_loader = DataLoader([train_data], batch_size=1, shuffle=True)

# Initialize the GNN model and define the optimizer
model = GNN(input_dim=1, hidden_dim=64)
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Train the GNN model
model.train()
for epoch in range(100):
    for batch in train_loader:
        optimizer.zero_grad()
        out = model(batch.x, batch.edge_index)
        loss = F.mse_loss(out[:-1], batch.x[1:])  # Mean Squared Error loss
        loss.backward()
        optimizer.step()

# Test the GNN model on the test data
model.eval()
with torch.no_grad():
    test_output = model(test_data.x, test_data.edge_index)

# Compute the anomaly scores
anomaly_threshold = 0.5  # Adjust this threshold as needed
anomaly_scores = torch.abs(test_data.x - test_output)

# Plot the original data and detected anomalies
plt.figure(figsize=(12, 6))
plt.plot(test_data.x, label='Original Data', color='blue')
plt.plot(test_output, label='Predicted Data', color='green')
plt.scatter(torch.nonzero(anomaly_scores > anomaly_threshold).squeeze().tolist(), 
            test_output[anomaly_scores > anomaly_threshold].squeeze().tolist(), 
            color='red', label='Anomalies')
plt.xlabel('Time')
plt.ylabel('Sensor Data')
plt.legend()
plt.title('Anomaly Detection using GNN')
plt.show()

# Print the indices of detected anomalies
anomaly_indices = torch.nonzero(anomaly_scores > anomaly_threshold).squeeze().tolist()
print("Detected anomalies at indices:", anomaly_indices)
