# 08 — Sensor Fusion MLP (Multi-Sensor → Classification)

**Scenario**: Industrial vibration monitoring — classify machine state from IMU sensor data.

Architecture: `Dense(6→32) → ReLU → Dense(32→16) → ReLU → Dense(16→4)`

Input: 6 features (accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z)
Output: 4 classes (normal, bearing_fault, misalignment, imbalance)

### Setup
Ensure **NanoRust (venv)** kernel is selected.

In [None]:
%run _setup.py
setup_all()

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from nano_rust_utils import quantize_to_i8, quantize_weights, calibrate_model
import nano_rust_py

CLASSES = ['Normal', 'Bearing Fault', 'Misalignment', 'Imbalance']
print('✅ Modules loaded')

In [None]:
# Generate synthetic IMU sensor data
np.random.seed(42)
N_PER_CLASS = 500
N_FEATURES = 6  # accel_xyz + gyro_xyz

def generate_sensor_data(n, class_id):
    """Generate synthetic vibration data with class-specific patterns."""
    base = np.random.randn(n, N_FEATURES) * 0.3
    if class_id == 0:  # Normal: low amplitude, balanced
        base *= 0.5
    elif class_id == 1:  # Bearing fault: high-freq spikes in accel
        base[:, :3] += np.random.choice([-1, 1], (n, 3)) * np.random.exponential(0.8, (n, 3))
    elif class_id == 2:  # Misalignment: correlated accel/gyro
        base[:, 3:] = base[:, :3] * 0.7 + np.random.randn(n, 3) * 0.2
    elif class_id == 3:  # Imbalance: periodic in one axis
        base[:, 0] += np.sin(np.linspace(0, 4*np.pi, n)) * 1.2
        base[:, 3] += np.cos(np.linspace(0, 4*np.pi, n)) * 0.8
    return base

X_all, y_all = [], []
for c in range(4):
    X_all.append(generate_sensor_data(N_PER_CLASS, c))
    y_all.extend([c] * N_PER_CLASS)

X = np.vstack(X_all).astype(np.float32)
y = np.array(y_all, dtype=np.int64)

# Shuffle and split
idx = np.random.permutation(len(X))
X, y = X[idx], y[idx]
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

train_ds = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
test_ds = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))
train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)

print(f'Train: {len(train_ds)}, Test: {len(test_ds)}')
print(f'Features: {N_FEATURES}, Classes: {len(CLASSES)}')

In [None]:
# MLP Model

# GPU Setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
if device.type == 'cuda':
    print(f'GPU: {torch.cuda.get_device_name(0)}')

model = nn.Sequential(
    nn.Linear(6, 32),
    nn.ReLU(),
    nn.Linear(32, 16),
    nn.ReLU(),
    nn.Linear(16, 4),
)

optimizer = optim.Adam(model.parameters(), lr=0.005)
criterion = nn.CrossEntropyLoss()

for epoch in range(20):
    model.train()
    correct, total = 0, 0
    for data, target in train_loader:
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, target)
        loss.backward()
        optimizer.step()
        correct += out.argmax(1).eq(target).sum().item()
        total += target.size(0)
    if (epoch + 1) % 5 == 0:
        print(f'Epoch {epoch+1}/20 — Acc: {100.*correct/total:.1f}%')

model.eval()
print('✅ Training complete')

model = model.to(device)
print("Model moved to device")


In [None]:
# Quantize & calibrate
q_weights = quantize_weights(model)
cal_input = torch.from_numpy(X_test[:1])
q_cal, cal_scale = quantize_to_i8(cal_input.numpy().flatten())
requant = calibrate_model(model, cal_input, q_weights, cal_scale)

def build_nano():
    nano = nano_rust_py.PySequentialModel(input_shape=[6], arena_size=4096)
    m, s, bc = requant['0']
    nano.add_dense_with_requant(q_weights['0']['weights'].flatten().tolist(), bc, m, s)
    nano.add_relu()
    m, s, bc = requant['2']
    nano.add_dense_with_requant(q_weights['2']['weights'].flatten().tolist(), bc, m, s)
    nano.add_relu()
    m, s, bc = requant['4']
    nano.add_dense_with_requant(q_weights['4']['weights'].flatten().tolist(), bc, m, s)
    return nano

# Test both
correct_pt, correct_nano, match_count = 0, 0, 0
max_diffs = []

for i in range(len(X_test)):
    x_f = torch.from_numpy(X_test[i:i+1]).to(device)
    label = int(y_test[i])
    q_x, _ = quantize_to_i8(X_test[i])
    
    with torch.no_grad():
        pt_out = model(x_f).numpy().flatten()
    pt_cls = int(np.argmax(pt_out))
    
    nano_out = build_nano().forward(q_x.tolist())
    nano_cls = int(np.argmax(nano_out))
    
    q_pt, _ = quantize_to_i8(pt_out)
    diff = np.abs(q_pt.astype(np.int32) - np.array(nano_out, dtype=np.int8).astype(np.int32))
    max_diffs.append(int(np.max(diff)))
    
    if pt_cls == label: correct_pt += 1
    if nano_cls == label: correct_nano += 1
    if nano_cls == pt_cls: match_count += 1

N = len(X_test)
print('=' * 60)
print('       SENSOR FUSION MLP RESULTS')
print('=' * 60)
print(f'PyTorch Accuracy:     {100.*correct_pt/N:.1f}%')
print(f'NANO-RUST Accuracy:   {100.*correct_nano/N:.1f}%')
print(f'Classification Match: {100.*match_count/N:.1f}%')
print(f'Max Diff (median):    {int(np.median(max_diffs))}')
print(f'Max Diff (95th):      {int(np.percentile(max_diffs, 95))}')
print('=' * 60)
print(f'\n📊 Memory: {sum(q["weights"].nbytes for q in q_weights.values() if q["weights"] is not None)} bytes (weights only)')
print(f'This MLP fits on ANY microcontroller — even ATmega328 (2KB RAM)!')
