In [None]:
# Autoencoder Anomaly Detection in Manufacturing Data

# ===================
# SETUP
# ===================

# Step 1: Imports and seed
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import random

# Set random seeds for reproducibility
def set_seed(seed=42):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

set_seed()

# Step 2: Generate synthetic noisy manufacturing data
def generate_synthetic_data(num_samples=1000, seq_len=100):
    data = []
    for _ in range(num_samples):
        base_signal = np.sin(np.linspace(0, 2 * np.pi, seq_len))
        noise = np.random.normal(0, 0.3, seq_len)
        signal = base_signal + noise
        data.append(signal)
    return np.array(data, dtype=np.float32)

data = generate_synthetic_data()

# Train-test split
split_idx = int(0.8 * len(data))
train_data = torch.tensor(data[:split_idx]).unsqueeze(1)
test_data = torch.tensor(data[split_idx:]).unsqueeze(1)

# ===================
# BEGINNER SECTION
# ===================

# Define a basic Denoising Autoencoder
class DenoisingAutoencoder(nn.Module):
    def __init__(self, input_size=100, hidden_size=32):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(hidden_size, input_size),
            nn.Tanh()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

model = DenoisingAutoencoder()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
def train_model(model, data, epochs=20):
    losses = []
    for epoch in range(epochs):
        model.train()
        noisy_input = data + 0.1 * torch.randn_like(data)
        output = model(noisy_input.squeeze(1))
        loss = criterion(output, data.squeeze(1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
        print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")
    return losses

losses = train_model(model, train_data)

# Visualize results
def plot_results(model, data, idx=0):
    model.eval()
    noisy_input = data + 0.1 * torch.randn_like(data)
    with torch.no_grad():
        denoised = model(noisy_input[idx].squeeze(0)).numpy()
    plt.figure(figsize=(10,4))
    plt.plot(data[idx].squeeze().numpy(), label='Original')
    plt.plot(noisy_input[idx].squeeze().numpy(), label='Noisy')
    plt.plot(denoised, label='Denoised')
    plt.legend()
    plt.title("Signal Denoising")
    plt.show()

plot_results(model, test_data)

# Guiding Questions (Beginner):
# Q1: What patterns do you notice in the noisy vs. denoised data?
# Q2: How does the model improve predictions compared to raw inputs?

# ===================
# INTERMEDIATE SECTION
# ===================

# Students: Modify the hidden layer size, try different learning rates, or use different optimizers.
# For example:
# - Change hidden_size to 64 or 16
# - Use optim.SGD or optim.RMSprop
# - Try nn.Sigmoid instead of nn.ReLU

# Guiding Questions (Intermediate):
# Q1: How does increasing or decreasing hidden_size affect accuracy?
# Q2: What happens with different optimizers (SGD, RMSprop)?

# ===================
# ADVANCED SECTION
# ===================

# Students: Add dropout, experiment with learning rate schedulers, implement anomaly detection.
# Sample advanced idea: Compute reconstruction error and flag outliers

def compute_reconstruction_error(model, data):
    model.eval()
    with torch.no_grad():
        output = model(data.squeeze(1))
        error = torch.mean((output - data.squeeze(1))**2, dim=1)
    return error.numpy()

errors = compute_reconstruction_error(model, test_data)

# Visualize reconstruction error distribution
plt.hist(errors, bins=30, alpha=0.7)
plt.title("Reconstruction Error Distribution")
plt.xlabel("Error")
plt.ylabel("Frequency")
plt.show()

# Thresholding for anomaly detection
threshold = np.percentile(errors, 95)
anomalies = errors > threshold
print(f"Detected {np.sum(anomalies)} anomalies out of {len(errors)} samples.")

# Guiding Questions (Advanced):
# Q1: How can reconstruction errors be used to flag anomalies?
# Q2: What are the limitations of using autoencoders for anomaly detection?

# ===================
# OPTIONAL CHALLENGE
# ===================

# Load a real-world dataset (e.g., NASA bearing data, or UCI sensor data) and apply similar preprocessing.
# Apply the same pipeline for denoising and anomaly detection.
# Compare results with synthetic dataset performance.

# End of Notebook
