# A tutorial on adversarial attacks on fault diagnosis systems

In [None]:
import zipfile
import requests
from tqdm.auto import trange, tqdm

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import Adam

First, download the [TEP](https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/6C3JR1) dataset. It is a small version of TEP, the original is ~2GB.

In [None]:
url = 'https://industrial-makarov.obs.ru-moscow-1.hc.sbercloud.ru/small_tep.zip'
resp = requests.get(url)
with open('small_tep.zip', 'wb') as file:
    file.write(resp.content)
with zipfile.ZipFile('small_tep.zip', 'r') as zip_ref:
    zip_ref.extractall('data/')

In [None]:
sensor_data = pd.read_csv('data/dataset.csv', index_col=(0, 1))
target = pd.read_csv('data/labels.csv', index_col=(0, 1), names=['fault'], skiprows=1)
train_mask = pd.read_csv('data/train_mask.csv', index_col=(0, 1))['train_mask']
sensor_data.shape

There are sensor data and targets with faults. Fault 0 is the normal condition.

In [None]:
sensor_data

In [None]:
target

Split runs into train, validation and test.

In [None]:
np.random.seed(0)
random_runs = np.random.permutation(sensor_data.index.get_level_values(0).unique())
n = len(random_runs)
(train_runs, val_runs, test_runs) = (
    random_runs[:int(0.7*n)], random_runs[int(0.7*n):int(0.8*n)], random_runs[int(0.8*n):])
len(train_runs), len(val_runs), len(test_runs)

In [None]:
train_sensor_data = sensor_data.loc[train_runs]
val_sensor_data = sensor_data.loc[val_runs]
test_sensor_data = sensor_data.loc[test_runs]

train_target = target.loc[train_runs]
val_target = target.loc[val_runs]
test_target = target.loc[test_runs]

len(train_target), len(val_target), len(test_target)

Next, we normalize sensors' values by the standard scaler.

In [None]:
scaler = StandardScaler()
train_sensor_data[:] = scaler.fit_transform(train_sensor_data)
val_sensor_data[:] = scaler.transform(val_sensor_data)
test_sensor_data[:] = scaler.transform(test_sensor_data)

We apply sliding window approach to create a sequence of data samples.

<img src='https://raw.githubusercontent.com/airi-industrial-ai/yandex-studcamp-2024-adv/main/images/window_step_size.png' width=800>

Since we create a fault diagnosis system based on MLP, we should convert samples from $\mathbb{R}^{m \times n}$ into $\mathbb{R}^{mn}$.

<img src='https://raw.githubusercontent.com/airi-industrial-ai/yandex-studcamp-2024-adv/main/images/matrix_to_vector.png' width=200>

In [None]:
def sliding_window_data(sensor_data, target, runs, window_size=10, step_size=1):
    output_sensor_data = []
    output_target = []
    for run in runs:
        run_sensor_data = sensor_data.loc[run].values
        run_target = target.loc[run].values
        for i in range(0, len(run_sensor_data)-window_size, step_size):
            output_sensor_data.append(run_sensor_data[i:i+window_size].reshape(-1))
            output_target.append(run_target[i+window_size])
    return np.array(output_sensor_data), np.array(output_target)

In [None]:
X_train, y_train = sliding_window_data(train_sensor_data, train_target, train_runs)
X_train = torch.FloatTensor(X_train)
y_train = torch.LongTensor(y_train)

X_val, y_val = sliding_window_data(val_sensor_data, val_target, val_runs)
X_val = torch.FloatTensor(X_val)
y_val = torch.LongTensor(y_val)

X_test, y_test = sliding_window_data(test_sensor_data, test_target, test_runs)
X_test = torch.FloatTensor(X_test)
y_test = torch.LongTensor(y_test)

X_train.shape, X_val.shape, X_test.shape

Create a simple diagnosis system based on 3-layer MLP with 128 hidden dim. Output dim 21 corresponds to the number of faults.

In [None]:
def create_model():
    return nn.Sequential(
        nn.Linear(520, 128),
        nn.ReLU(),
        nn.Linear(128, 21),
    )

Train the model.

In [None]:
def train(model, X_train, y_train, X_val, y_val):
    optim = Adam(model.parameters(), lr=0.001)
    train_loss_curve = []
    val_loss_curve = []
    for e in trange(1000):
        random_idx = np.random.randint(0, len(X_train), size=256)
        logits = model(X_train[random_idx])
        loss = F.cross_entropy(logits, y_train[random_idx, 0])
        train_loss_curve.append(loss.item())

        optim.zero_grad()
        loss.backward()
        optim.step()

        with torch.no_grad():
            logits = model(X_val)
        loss = F.cross_entropy(logits, y_val[:, 0])
        val_loss_curve.append(loss.item())

    return train_loss_curve, val_loss_curve

In [None]:
model = create_model()
train_loss_curve, val_loss_curve = train(model, X_train, y_train, X_val, y_val)

plt.plot(train_loss_curve)
plt.plot(val_loss_curve)
plt.show()

### Task 1. Predict the probability

Write a function that takes model and input X, returns a tensor with probability distribution of the shape N x K where N is the number of input examples and K is the number of classes.

In [None]:
def get_proba(model, X_input):
    ### YOUR CODE HERE

In [None]:
proba = get_proba(model, X_test)
assert proba.shape == (len(X_test), 21)
assert np.allclose(proba.numpy().sum(axis=1), np.ones(len(X_test)))
assert proba.grad is None
pd.Series(proba[0]).round(2)

### Task 2. Predict the fault

Write a function that takes model and input X, returns a tensor with predicted fault IDs.

In [None]:
def get_pred(model, X_input):
    ### YOUR CODE HERE

In [None]:
pred = get_pred(model, X_test)
assert len(pred) == len(X_test)
assert pred.grad is None
pred

In [None]:
acc = accuracy_score(pred, y_test[:, 0])
print(f'Accuracy: {acc:.2f}')

### Task 3. FGSM Attack

Define the adversarial attack using Fast Gradient Sign Method as follows:

$$x' = x + \epsilon \text{sign}\left(\nabla_x L(f(x), y)\right),$$

where $L$ is cross-entropy, $f$ is MLP, $sign$ is a function that equals to $-1$ for negative and $1$ for non-negative values.

Write a function that takes X, y, model, epsilon and returns adversarial X.

In [None]:
def adv_attack(X, y, model, eps):
    ### YOUR CODE HERE

In [None]:
X_test_copy = X_test.clone()
X_test_adv = adv_attack(X_test, y_test, model, eps=0.05)
assert X_test_adv.dtype == X_test.dtype
assert X_test_adv.shape == X_test.shape
assert (X_test_copy == X_test).all()
assert X_test_adv.grad is None

In [None]:
adv_pred = get_pred(model, X_test_adv)
acc = accuracy_score(adv_pred, y_test[:, 0])
print(f'Accuracy (FGSM): {acc:.2f}')

Let us take a look at the probability distributions with adversarial attack on a random test run.

In [None]:
def plot_ditribution(real_proba, adv_proba, logscale=True):
    plt.figure(figsize=(10, 3))

    plt.bar(
        np.arange(21)+0.15,
        real_proba,
        width=0.3,
        label=f'real pred is {np.argmax(real_proba)}, conf. {real_proba.max()*100:.0f}%'
    )
    plt.bar(
        np.arange(21)-0.15,
        adv_proba,
        width=0.3,
        label=f'adv pred is {np.argmax(adv_proba)}, conf. {adv_proba.max()*100:.0f}%')
    plt.legend()
    plt.xticks(range(21))
    plt.xlabel('Fault')
    plt.ylabel('Probability')
    if logscale:
        plt.yscale('log')
    plt.show()

In [None]:
#random_run = np.random.choice(test_runs)
random_run = 275055401
X_input, y_input = sliding_window_data(test_sensor_data, test_target, [random_run])
X_input = torch.FloatTensor(X_input)
y_input = torch.LongTensor(y_input)
print(f'run: {random_run}, input shape: {list(X_input.shape)}, target: {y_input[-1].item()}')

In [None]:
real_proba = get_proba(model, X_input)
X_input_adv = adv_attack(X_input, y_input, model, eps=0.05)
adv_proba = get_proba(model, X_input_adv)

plot_ditribution(real_proba[-1], adv_proba[-1])

Let us compare the input sample and the adversarial sample.

In [None]:
def plot_samples(X_input, X_input_adv, columns):
    plt.figure(figsize=(5*2, 2*26))
    for i in trange(52):
        plt.subplot(26, 2, i+1)
        plt.plot(X_input[-100:, i], label='original sample')
        plt.plot(X_input_adv[-100:, i], label='adversarial sample')
        plt.title(f'{columns[i]}')
        plt.legend()
        plt.grid()
    plt.tight_layout()
    plt.show()

In [None]:
plot_samples(X_input, X_input_adv, sensor_data.columns)

### Task 4. OTCM Attack

Define the targeted adversarial attack using One-step Target Class Method as follows:

$$x' = x - \epsilon \text{sign}\left(\nabla_x L(f(x), y')\right),$$

where $L$ is cross-entropy, $f$ is MLP, $sign$ is a function that equals to $-1$ for negative and $1$ for non-negative values, $y'$ is a target class.

Write a function that takes X, target class, model, epsilon and returns adversarial X.

In [None]:
def targeted_adv_attack(X, target, model, eps):
    ### YOUR CODE HERE

In [None]:
X_test_copy = X_test.clone()
X_test_adv = targeted_adv_attack(X_test, 0, model, eps=0.05)
assert X_test_adv.dtype == X_test.dtype
assert X_test_adv.shape == X_test.shape
assert (X_test_copy == X_test).all()
assert X_test_adv.grad is None

Consider the predicted probability distribution and compare the sample.

In [None]:
real_proba = get_proba(model, X_input)

X_input_adv = targeted_adv_attack(X_input, 18, model, eps=0.1)
adv_proba = get_proba(model, X_input_adv)

plot_ditribution(real_proba[-1], adv_proba[-1], logscale=True)

### Task 5. Adversarial training

Train the model mixing real and adversarial samples. Modify the loss function as follows:

$$L = L_\text{orig} + \lambda L_\text{adv},$$

where $L_\text{adv}$ is the cross-entropy calculated on the adversarial samples and $\lambda$ is the huperparameter.

Write a function that takes model, data for train, data for validation, epsilon for the FGSM attack and lambda. The function returns a list with train loss values and val loss values.

In [None]:
def adv_train(model, X_train, y_train, X_val, y_val, eps, lambd):
    ### YOUR CODE HERE

In [None]:
def_model = create_model()
train_loss_curve, val_loss_curve = adv_train(
    def_model, X_train, y_train, X_val, y_val, eps=0.1, lambd=1.)
print(f'Train loss: {train_loss_curve[-1]:.2f}')

plt.plot(train_loss_curve)
plt.plot(val_loss_curve)
plt.show()

In [None]:
X_test_adv = adv_attack(X_test, y_test, def_model, eps=0.05)
adv_pred = get_pred(model, X_test_adv)
acc = accuracy_score(adv_pred, y_test[:, 0])
print(f'Accuracy (FGSM and Adversarial training): {acc:.2f}')

We can compare the accuracy without defense and with defense.

In [None]:
acc_list = []
def_acc_list = []

eps_space = np.linspace(0.01, 0.3, 30)
for eps in tqdm(eps_space):
    X_test_adv = adv_attack(X_test, y_test, model, eps=eps)
    adv_pred = get_pred(model, X_test_adv)
    acc_list.append(accuracy_score(adv_pred, y_test[:, 0]))

    X_test_adv = adv_attack(X_test, y_test, def_model, eps=eps)
    adv_pred = get_pred(def_model, X_test_adv)
    def_acc_list.append(accuracy_score(adv_pred, y_test[:, 0]))

assert acc_list[0] > def_acc_list[0]
assert acc_list[-1] < def_acc_list[-1]

plt.figure(figsize=(6, 4))
plt.plot(eps_space, acc_list, label='No defense')
plt.plot(eps_space, def_acc_list, label='Adversarial Training')
plt.xlabel('eps')
plt.ylabel('Accuracy')
plt.legend()
plt.grid()
plt.show()