In [1]:
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
# generate dataset
# 1000 lines of benign training data
# 100 lines of benign testing data, followed by 40 lines anomalies, followed by 60 lines of benign

def gaussian_noise(data, mean=0.0, std=0.1):
    noise = torch.randn_like(data) * std + mean
    return data + noise

# Now testing on anomaly data (salt-and-pepper noise)
def salt_and_pepper_noise(data, prob=0.25):
    noisy_data = data.clone()
    rand = torch.rand_like(data)
    noisy_data[rand < prob / 2] = 0.0
    noisy_data[rand > 1 - prob / 2] = 1.0
    return noisy_data

data = gaussian_noise(torch.randn(1160, 16) * 0.1, mean=0.0, std=0.1)
anomaly_data = salt_and_pepper_noise(torch.randn(100, 16), prob=0.25)
# split into train and test
train_data = data[:1000]
test_data = torch.cat([data[1000:], anomaly_data], dim=0)
test_labels = torch.cat([torch.zeros(160), torch.ones(100)])


In [3]:
# setup autoencoder

class Autoencoder(nn.Module):
    def __init__(self, input_size=784, hidden_sizes=[64, 32, 8, 32]):
        super(Autoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_size, hidden_sizes[0]),
            nn.LeakyReLU(True),
            nn.Linear(hidden_sizes[0], hidden_sizes[1]),
            nn.LeakyReLU(True),
            nn.Linear(hidden_sizes[1], hidden_sizes[2]),
            nn.LeakyReLU(True),
            nn.Linear(hidden_sizes[2], hidden_sizes[3]),
            nn.LeakyReLU(True)
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(hidden_sizes[3], hidden_sizes[2]),
            nn.LeakyReLU(True),
            nn.Linear(hidden_sizes[2], hidden_sizes[1]),
            nn.LeakyReLU(True),
            nn.Linear(hidden_sizes[1], hidden_sizes[0]),
            nn.ReLU(True),
            nn.Linear(hidden_sizes[0], input_size)
        )

    def forward(self, x):
        x = self.encoder(x)
        return self.decoder(x)

In [4]:
# Train autoencoder using ten-fold cross-validation

autoencoder = Autoencoder(input_size=16, hidden_sizes=[12, 8, 4, 8])
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)


num_epochs = 100
batch_size = 64
num_samples = train_data.size(0)
num_batches = (num_samples + batch_size - 1) // batch_size
for epoch in range(num_epochs):
    autoencoder.train()
    epoch_loss = 0.0
    for i in range(num_batches):
        batch_data = train_data[i * batch_size:(i + 1) * batch_size]
        optimizer.zero_grad()
        outputs = autoencoder(batch_data)
        loss = criterion(outputs, batch_data)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item() * batch_data.size(0)
    epoch_loss /= num_samples
    # if (epoch + 1) % 50 == 0:
    if epoch == 0 or (epoch + 1) % 50 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}')

Epoch [1/100], Loss: 0.0488
Epoch [50/100], Loss: 0.0159
Epoch [100/100], Loss: 0.0157


In [5]:
# Compute reconstruction errors on training data

autoencoder.eval()
with torch.no_grad():
    recon = autoencoder(train_data)
    errors = (train_data - recon)**2               # shape: (N, 16)
    sample_errors = errors.mean(dim=1)            # shape: (N,) â€“ per-sample MSE

err_np = errors.cpu().numpy()                      # shape (N,16)

feature_median = np.median(err_np, axis=0)         # shape (16,)
feature_mad = np.median(np.abs(err_np - feature_median), axis=0)

k = 3.0
thresholds_feature = feature_median + k * feature_mad

print("Feature-level thresholds:", thresholds_feature)


Feature-level thresholds: [0.01970199 0.02574495 0.03248791 0.02874535 0.02742706 0.0244266
 0.02699526 0.0315773  0.02706124 0.02571756 0.03147859 0.03194313
 0.02230235 0.02254503 0.02363099 0.0281459 ]


In [6]:
# Evaluate on test data
# get reconstruction error matrix for data per feature

autoencoder.eval()
with torch.no_grad():
    reconstructed = autoencoder(test_data)
    reconstruction_errors = (test_data - reconstructed) ** 2  # MSE per feature
    spikes = (reconstruction_errors.cpu().numpy() > thresholds_feature).astype(int)

In [7]:
# split spikes into normal and anomaly
# count average number of spikes per sample for normal and anomaly
normal_spikes = spikes[test_labels == 0]
anomaly_spikes = spikes[test_labels == 1]
avg_normal_spikes = normal_spikes.mean(axis=0)
avg_anomaly_spikes = anomaly_spikes.mean(axis=0)
print("Average spikes per feature for normal data:", avg_normal_spikes)
print("Average spikes per feature for anomaly data:", avg_anomaly_spikes)


Average spikes per feature for normal data: [0.23125 0.2625  0.20625 0.19375 0.23125 0.175   0.21875 0.21875 0.2125
 0.1375  0.1375  0.18125 0.25    0.24375 0.2625  0.2375 ]
Average spikes per feature for anomaly data: [0.86 0.83 0.83 0.84 0.87 0.81 0.84 0.85 0.83 0.83 0.81 0.79 0.88 0.82
 0.77 0.86]


In [8]:
class LIFNeuron(torch.nn.Module):
    def __init__(self, tau=10.0, v_th=1.0):
        super(LIFNeuron, self).__init__()
        self.tau = tau
        self.v_th = v_th
        self.v = torch.zeros(1)
 
    def forward(self, input_spikes):
        # Update membrane potential
        self.v = (1 - 1/self.tau) * self.v + input_spikes
        # Check if spike is fired
        spikes = (self.v >= self.v_th).float()
        # Reset membrane potential if spike is fired
        self.v = (1 - spikes) * self.v
        return spikes

class SpikingNeuralNework(nn.Module):
    def __init__(self, hidden_sizes=[16], input_size=16, output_size=2, tau=10.0, v_th=1.0):
        super(SpikingNeuralNework, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_sizes[0])
        self.lif1 = LIFNeuron(tau=tau, v_th=v_th)
        self.fc2 = nn.Linear(hidden_sizes[0], output_size)
        self.lif2 = LIFNeuron(tau=tau, v_th=v_th)

    def forward(self, x):
        x = self.fc1(x)
        x = self.lif1(x)
        x = self.fc2(x)
        x = self.lif2(x)
        return x

In [14]:
snn = SpikingNeuralNework(hidden_sizes=[16], input_size=16, output_size=2, tau=10.0, v_th=0.2)
output = snn(torch.FloatTensor(spikes))
# count spikes and non-spikes, per normal and anomaly
output_spikes = (output.detach().numpy() > 0).astype(int)
normal_output_spikes = output_spikes[test_labels.numpy() == 0]
anomaly_output_spikes = output_spikes[test_labels.numpy() == 1]
print("Output layer spikes for normal data:", normal_output_spikes.sum(axis=0))
print("Output layer spikes for anomaly data:", anomaly_output_spikes.sum(axis=0))


Output layer spikes for normal data: [18 43]
Output layer spikes for anomaly data: [17 12]


In [10]:
# Do a simple test of the spiking neural network
# Create a 2 input dataset and the expected output.
# inputs: [0,0], [0,1], [1,0], [1,1]
