In [8]:
import os
import shutil
from scipy import io #for loadmat, matlab conversion
import numpy as np
import matplotlib.pyplot as plt # for plotting training curves
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import requests #for downloading zip file
from torch.utils.data import DataLoader, TensorDataset
#imports for computing and displaying output metrics
import urllib.request # to get files from web w/o !wget

# temp - needed for SHL split
from sklearn.model_selection import train_test_split

In [2]:
def namestr(obj, namespace):
    return [name for name in namespace if namespace[name] is obj]

def get_shapes(np_arr_list):
    """Returns text, each line is shape and dtype for numpy array in list
       example: print(get_shapes([X_train, X_test, y_train, y_test]))"""
    shapes = ""
    for i in np_arr_list:
        my_name = namestr(i,globals())
        shapes += (my_name[0] + " shape is " + str(i.shape) \
            + " data type is " + str(i.dtype) + "\n")
    return shapes

In [3]:
def get_py_file(fname, url):
    """checks for local file, if none downloads from URL.
    :return: nothing"""
    if (os.path.exists(fname)):
        print ("Local",fname, "found, skipping download")
    else:
        print("Downloading",fname, "from IMICS git repo")
        urllib.request.urlretrieve(url, filename=fname)

get_py_file(fname = 'load_data_utils.py', url = 'https://raw.githubusercontent.com/imics-lab/load_data_time_series/main/load_data_utils.py')


Downloading load_data_utils.py from IMICS git repo


In [4]:
import load_data_utils as utils # ldu just seemed confusing!
print('My env_info: \n' + utils.get_env_info()) # using + vs , gets rid of space

My env_info: 
model name	: Intel(R) Xeon(R) CPU @ 2.20GHz
model name	: Intel(R) Xeon(R) CPU @ 2.20GHz
GPU: 


In [5]:
def get_dataset(dataset):
    if (dataset == 'MobiAct HAR'):
        if (os.path.exists('MobiAct_Dataset_v1.0.zip')):
            print ("Local MobiAct zip found, skipping download")
        else:
            !gdown "1kt9wtIt7N7SIFQAbav7zcZ_PqTa5HegA&confirm=t" # MobiAct alternate file source
        # original share is https://drive.google.com/uc?id=0B5VcW5yHhWhibWxGRTZDd0dGY2s'
        # please see https://bmi.hmu.gr/the-mobifall-and-mobiact-datasets-2/ if not working
        get_py_file(fname = 'mobiact_adl_load_dataset.py',
                url = 'https://raw.githubusercontent.com/imics-lab/load_data_time_series/main/HAR/MobiAct/mobiact_adl_load_dataset.py')
        from mobiact_adl_load_dataset import mobiact_adl_load_dataset
        x_train, y_train, x_valid, y_valid, x_test, y_test = mobiact_adl_load_dataset(incl_val_group = True)
        k_size = 50
        EPOCHS = 50
        t_names = ['JOG','JUM','STD','STN','STU','WAL']
    elif (dataset == 'UniMiB SHAR'):
        get_py_file(fname = 'unimib_adl_load_dataset.py',
                url = 'https://raw.githubusercontent.com/imics-lab/load_data_time_series/main/HAR/UniMiB_SHAR/unimib_shar_adl_load_dataset.py')
        from unimib_adl_load_dataset import unimib_load_dataset
        x_train, y_train, x_valid, y_valid, x_test, y_test = unimib_load_dataset(incl_val_group = True)
        t_names = ['StandingUpFS','StandingUpFL','Walking','Running','GoingUpS','Jumping','GoingDownS','LyingDownFS','SittingDown']
        k_size = 50
        EPOCHS = 2
    elif (dataset == 'UCI HAR'):
        get_py_file(fname = 'uci_har_load_dataset.py',
                    url = 'https://raw.githubusercontent.com/imics-lab/load_data_time_series/main/HAR/UCI_HAR/uci_har_load_dataset.py')
        from uci_har_load_dataset import uci_har_load_dataset
        x_train, y_train, x_valid, y_valid, x_test, y_test = uci_har_load_dataset(incl_val_group = True, incl_xyz_accel= True)
        t_names = ['WALKING','WALKING_UPSTAIRS','WALKING_DOWNSTAIRS','SITTING','STANDING','LAYING']
        k_size = 50
        EPOCHS = 30
    elif (dataset == 'TWristAR'):
        # Note TWristAR is more updated than the previous datasets so the accesses
        # and defaults are a bit different, e.g. t_names is pulled from the .py
        get_py_file(fname = 'twristar_load_dataset.py',
                    url = 'https://raw.githubusercontent.com/imics-lab/load_data_time_series/main/HAR/TWristAR/twristar_load_dataset.py')
        import twristar_load_dataset as twristar_load_dataset # diff to get label map
        x_train, y_train, x_valid, y_valid, x_test, y_test \
                                = twristar_load_dataset.twristar_load_dataset(
                                    incl_val_group = True,
                                    one_hot_encode = True)
        t_names = list(twristar_load_dataset.label_map_twristar.get('label').keys())
        t_names.remove('Undefined')
        k_size = 16
        EPOCHS = 20
    elif (dataset == 'Leotta_2021'):
        get_py_file(fname = 'leotta_2021_load_dataset.py',
                    url = 'https://raw.githubusercontent.com/imics-lab/load_data_time_series/main/ADL/Leotta_2021/leotta_2021_load_dataset.py')
        #full_filename = my_path+os.path.join('/ADL/Leotta_2021/'+'leotta_2021_load_dataset.py')
        #shutil.copy(full_filename,'leotta_2021_load_dataset.py')

        import leotta_2021_load_dataset as leotta_2021_load_dataset
        x_train, y_train, x_valid, y_valid, x_test, y_test = leotta_2021_load_dataset.leotta_2021_load_dataset(incl_val_group = True, one_hot_encode = True)
        t_names = list(leotta_2021_load_dataset.label_map_leotta.get('label').keys())
        k_size = 100
        EPOCHS = 50
    elif (dataset == 'SHL'):
        # SHL takes about 30 minutes to process due to size, using saved arrays for now
        !gdown "1ApHVQ-P2reO6ctNVxeHHxCHeoNlp6c9P&confirm=t" # SHL 20Hz
        utils.unzip_into_dir('SHL_20hz_for_gentry.zip','SHL')
        input_dir = './SHL'
        x_train = np.load(input_dir + '/'+'x_train.npy')
        x_train = np.delete(x_train, [0,1,2], 2) # delete component accel
        x_test = np.load(input_dir + '/'+'x_test.npy')
        x_test = np.delete(x_test, [0,1,2], 2) # delete component accel
        y_train = np.load(input_dir + '/'+'y_train.npy')
        y_test = np.load(input_dir + '/'+'y_test.npy')
        # also don't have validate working in SHL so just using stratify
        x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.10, random_state=42, stratify=y_train)

        t_names = ['Still', 'Walking', 'Run', 'Bike', 'Car', 'Bus', 'Train', 'Subway']
        k_size = 15
        EPOCHS = 100
    elif (dataset == 'Gesture Phase Segmentation'):
        get_py_file(fname = 'gesture_phase_segmentation_load_dataset.py',
                    url = 'https://raw.githubusercontent.com/imics-lab/load_data_time_series/main/Gesturing_Signing/gesture_phase_segmentation_load_dataset.py')
        from gesture_phase_segmentation_load_dataset import gesture_phase_segmentation_load_dataset
        x_train, y_train, x_valid, y_valid, x_test, y_test, log_info \
                                = gesture_phase_segmentation_load_dataset(
                                    incl_val_group = True,
                                    return_info_dict = True)
        print("Note: Due to the size of the Gesture Phase Segmentation and for",
            "\ncompatibility, the test arrays are copies of the valid arrays")
        x_test = x_valid.copy()
        y_test = y_valid.copy()
        t_names = ["Rest", "Preparation", "Stroke","Hold", "Retraction"]
        k_size = 9
        EPOCHS = 30
    else:
        print('ERROR: dataset unknown')
    print(utils.tabulate_numpy_arrays({'x_train':x_train,'y_train':y_train,
                                    'x_valid':x_valid,'y_valid':y_valid,
                                    'x_test':x_test,'y_test':y_test}))
    return x_train, y_train, x_valid, y_valid, x_test, y_test, k_size, EPOCHS, t_names



In [6]:
ds_list = ["UniMiB SHAR",
           "UCI HAR",
           "TWristAR",
           "Gesture Phase Segmentation"
           ]

In [7]:
# Setting up the device for Torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [9]:
#credit https://stackoverflow.com/questions/9419162/download-returned-zip-file-from-url
#many other methods I tried failed to download the file properly
def download_url(url, save_path, chunk_size=128):
    r = requests.get(url, stream=True)
    with open(save_path, 'wb') as fd:
        for chunk in r.iter_content(chunk_size=chunk_size):
            fd.write(chunk)

def unimib_load_dataset(
    verbose = True,
    incl_xyz_accel = False, #include component accel_x/y/z in ____X data
    incl_rms_accel = True, #add rms value (total accel) of accel_x/y/z in ____X data
    incl_val_group = False, #True => returns x/y_test, x/y_validation, x/y_train
                           #False => combine test & validation groups
    split_subj = dict
                (train_subj = [4,5,6,7,8,10,11,12,14,15,19,20,21,22,24,26,27,29],
                validation_subj = [1,9,16,23,25,28],
                test_subj = [2,3,13,17,18,30]),
    one_hot_encode = True):

    #Download and unzip original dataset
    if (not os.path.isfile('./UniMiB-SHAR.zip')):
        print("Downloading UniMiB-SHAR.zip file")
        #invoking the shell command fails when exported to .py file
        #redirect link https://www.dropbox.com/s/raw/x2fpfqj0bpf8ep6/UniMiB-SHAR.zip
        #!wget https://www.dropbox.com/s/x2fpfqj0bpf8ep6/UniMiB-SHAR.zip
        download_url('https://www.dropbox.com/s/raw/x2fpfqj0bpf8ep6/UniMiB-SHAR.zip','./UniMiB-SHAR.zip')
    if (not os.path.isdir('./UniMiB-SHAR')):
        shutil.unpack_archive('./UniMiB-SHAR.zip','.','zip')
    #Convert .mat files to numpy ndarrays
    path_in = './UniMiB-SHAR/data'
    #loadmat loads matlab files as dictionary, keys: header, version, globals, data
    adl_data = io.loadmat(path_in + '/adl_data.mat')['adl_data']
    adl_names = io.loadmat(path_in + '/adl_names.mat', chars_as_strings=True)['adl_names']
    adl_labels = io.loadmat(path_in + '/adl_labels.mat')['adl_labels']

    #Reshape data and compute total (rms) acceleration
    num_samples = 151
    #UniMiB SHAR has fixed size of 453 which is 151 accelX, 151 accely, 151 accelz
    adl_data = np.reshape(adl_data,(-1,num_samples,3), order='F') #uses Fortran order
    if (incl_rms_accel):
        rms_accel = np.sqrt((adl_data[:,:,0]**2) + (adl_data[:,:,1]**2) + (adl_data[:,:,2]**2))
        adl_data = np.dstack((adl_data,rms_accel))

    #remove component accel if needed
    if (not incl_xyz_accel):
        adl_data = np.delete(adl_data, [0,1,2], 2)

    act_num = (adl_labels[:,0])-1 #matlab source was 1 indexed, change to 0 indexed
    sub_num = (adl_labels[:,1]) #subject numbers are in column 1 of labels

    if (not incl_val_group):
        train_index = np.nonzero(np.isin(sub_num, split_subj['train_subj'] +
                                        split_subj['validation_subj']))
        x_train = adl_data[train_index]
        y_train = act_num[train_index]
    else:
        train_index = np.nonzero(np.isin(sub_num, split_subj['train_subj']))
        x_train = adl_data[train_index]
        y_train = act_num[train_index]

        validation_index = np.nonzero(np.isin(sub_num, split_subj['validation_subj']))
        x_validation = adl_data[validation_index]
        y_validation = act_num[validation_index]

    test_index = np.nonzero(np.isin(sub_num, split_subj['test_subj']))
    x_test = adl_data[test_index]
    y_test = act_num[test_index]

    if (verbose):
        print("x/y_train shape ",x_train.shape,y_train.shape)
    if (incl_val_group):
        print("x/y_validation shape ",x_validation.shape,y_validation.shape)
    print("x/y_test shape  ",x_test.shape,y_test.shape)

    if (incl_val_group):
        return x_train, y_train, x_validation, y_validation, x_test, y_test
    else:
        return x_train, y_train, x_test, y_test

In [10]:
X_train, y_train, X_valid, y_valid, X_test, y_test = unimib_load_dataset(
    incl_xyz_accel=True, incl_val_group=True, one_hot_encode=False, verbose=True)

# Number of classes
n_classes = len(np.unique(y_train))
print(f"Number of classes: {n_classes}")

Downloading UniMiB-SHAR.zip file
x/y_train shape  (4601, 151, 4) (4601,)
x/y_validation shape  (1454, 151, 4) (1454,)
x/y_test shape   (1524, 151, 4) (1524,)
Number of classes: 9


In [11]:
X_train = torch.tensor(X_train, dtype=torch.float32)
Y_train = torch.tensor(y_train, dtype=torch.int64)

X_valid = torch.tensor(X_valid, dtype=torch.float32)
Y_valid = torch.tensor(y_valid, dtype=torch.int64)

X_test = torch.tensor(X_test, dtype=torch.float32)
Y_test = torch.tensor(y_test, dtype=torch.int64)

# Swap channel and time dimensions for compatibility with PyTorch Conv1D
X_train = X_train.permute(0, 2, 1)
X_valid = X_valid.permute(0, 2, 1)
X_test = X_test.permute(0, 2, 1)

# DataLoaders
batch_size = 64
train_dataset = TensorDataset(X_train, Y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

valid_dataset = TensorDataset(X_valid, Y_valid)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

test_dataset = TensorDataset(X_test, Y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

In [12]:
# Model hyperparameters
n_features = 4  # Number of input features/channels
WINDOW_SIZE = 151  # Number of timesteps in each sequence
n_outputs = n_classes  # Number of output classes or nodes in the last layer
num_filters = 100  # Number of filters in Conv layers
kernel_size = 9  # Kernel size for Conv layers
pool_size = 2  # Pool size for MaxPooling
dropout_rate = 0.5  # Dropout rate
fc1_output_size = 128  # Output size of the first Dense layer, adjustable


# Define the CNN model
class CNN1D(nn.Module):
    def __init__(self, n_features=3, WINDOW_SIZE=35, n_outputs=1, num_filters=64, kernel_size=3, pool_size=2, dropout_rate=0.5, fc1_output_size=128):
        super(CNN1D, self).__init__()
        self.conv1 = nn.Conv1d(n_features, num_filters,
                               kernel_size=kernel_size, padding=1)
        self.bn1 = nn.BatchNorm1d(num_filters)
        self.conv2 = nn.Conv1d(num_filters, num_filters,
                               kernel_size=kernel_size, padding=1)
        self.pool = nn.MaxPool1d(pool_size)
        self.dropout = nn.Dropout(dropout_rate)

        # Dynamically compute the size of the input to the first fully connected layer
        self.fc1_input_size = self._get_conv_output(WINDOW_SIZE)
        self.fc1 = nn.Linear(self.fc1_input_size, fc1_output_size)
        self.fc2 = nn.Linear(fc1_output_size, n_outputs)

    def _get_conv_output(self, size):
        with torch.no_grad():
            # Dummy input to calculate convolution output size
            dummy_input = torch.zeros(1, n_features, size)
            output = self.conv1(dummy_input)
            output = self.conv2(output)
            output = self.pool(output)
            return np.prod(output.size()[1:])  # total number of features

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.conv2(x))
        x = self.dropout(self.pool(x))
        x = x.view(-1, self.fc1_input_size)  # flatten the tensor
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Instantiate the model
model = CNN1D(n_features=n_features, WINDOW_SIZE=WINDOW_SIZE, n_outputs=n_outputs, num_filters=num_filters,
              kernel_size=kernel_size, pool_size=pool_size, dropout_rate=dropout_rate, fc1_output_size=fc1_output_size)
print(model)

CNN1D(
  (conv1): Conv1d(4, 100, kernel_size=(9,), stride=(1,), padding=(1,))
  (bn1): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv1d(100, 100, kernel_size=(9,), stride=(1,), padding=(1,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc1): Linear(in_features=6900, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=9, bias=True)
)


In [13]:
# Model, loss function, and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# Number of epochs
n_epochs = 2

for epoch in range(n_epochs):
    model.train()  # Set model to training mode
    train_losses = []
    train_correct = 0
    total = 0

    # Training loop
    for inputs, labels in train_loader:
        optimizer.zero_grad()  # Zero the parameter gradients

        predictions = model(inputs)  # Forward pass
        loss = criterion(predictions, labels)  # Calculate loss

        loss.backward()  # Backward pass
        optimizer.step()  # Optimize

        train_losses.append(loss.item())

        # Convert probabilities to binary predictions
        train_correct += (predictions.argmax(1) == labels).sum().item()  # Count the number of correct predictions
        total += labels.size(0)

    train_loss = np.mean(train_losses)
    train_acc = train_correct / total

    # Validation loop (optional, remove if not needed)
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        validation_losses = []
        validation_correct = 0
        total_val = 0

        for inputs, labels in valid_loader:
            predictions = model(inputs)
            loss = criterion(predictions, labels)
            validation_losses.append(loss.item())

            validation_correct += (predictions.argmax(1) == labels).sum().item()
            total_val += labels.size(0)

        validation_loss = np.mean(validation_losses)
        validation_acc = validation_correct / total_val

    print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {validation_loss:.4f}, Val Acc: {validation_acc:.4f}')


Epoch 1, Train Loss: 0.9540, Train Acc: 0.6525, Val Loss: 0.7778, Val Acc: 0.7244
Epoch 2, Train Loss: 0.4080, Train Acc: 0.8435, Val Loss: 0.4543, Val Acc: 0.8374


In [14]:
# Setting up the device for Torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def simba_single(model, x, y, num_iters=1000, epsilon=0.1, targeted=False):
    x = x.clone().detach().to(device)  # Ensure that x is a copy and moved to the correct device
    y = y.to(device)
    batch_size, seq_len, num_features = x.shape
    n_dims = seq_len * num_features  # Total dimensions in a flattened time series per sample

    perm = torch.randperm(n_dims).to(device)  # Random permutation of indices within a time step and features
    last_prob = get_probs(model, x, y)

    for i in range(num_iters):
        index = perm[i % n_dims]  # Ensure we do not go out of bounds
        feature_index = index % num_features  # Feature index
        time_step_index = index // num_features  # Time step index

        diff = torch.zeros_like(x)
        diff[:, time_step_index, feature_index] = epsilon

        # Test perturbation in the negative direction
        neg_x = (x - diff).clamp(0, 1)
        neg_prob = get_probs(model, neg_x, y)

        # Test perturbation in the positive direction
        pos_x = (x + diff).clamp(0, 1)
        pos_prob = get_probs(model, pos_x, y)

        # Decide which direction to keep based on the probability change
        if targeted:  # For targeted attacks, we want to increase the probability of the target class
            improved = pos_prob > neg_prob
        else:  # For untargeted attacks, we want to decrease the probability of the correct class
            improved = neg_prob < pos_prob

        if improved.any():
            x = pos_x if targeted else neg_x
            last_prob = pos_prob if targeted else neg_prob

        # Optional: print progress
        if i % 100 == 0:
            print(f'Iteration {i}: Probability = {last_prob.mean().item()}')

    return x



# Helper functions
def normalize(time_series):
    min_val = torch.min(time_series)
    max_val = torch.max(time_series)
    return (time_series - min_val) / (max_val - min_val)

def get_probs(model, time_series, labels):
    time_series = normalize(time_series)
    output = model(time_series)
    probs = F.softmax(output, dim=1)
    target_probs = probs.gather(1, labels.view(-1, 1)).squeeze()
    return target_probs

# Create dataset and DataLoader
data, labels = create_dataset()
dataset = TensorDataset(data, labels)
loader = DataLoader(dataset, batch_size=10)

# Initialize and train the model
model = CNN1D().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)
loss_function = nn.CrossEntropyLoss()

# Training loop
for epoch in range(5):  # Train for 5 epochs
    for inputs, targets in loader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_function(outputs, targets)
        loss.backward()
        optimizer.step()
    print(f'Epoch {epoch+1}, Loss: {loss.item()}')

# Evaluate using SimBA
model.eval()
success_count = 0
total_count = 0

for inputs, targets in loader:
    inputs, targets = inputs.to(device), targets.to(device)
    adv_inputs = simba_single(model, inputs, targets)
    adv_preds = model(adv_inputs).argmax(dim=1)
    success_count += (adv_preds != targets).sum().item()
    total_count += targets.size(0)

success_rate = (success_count / total_count) * 100
print(f'Success rate of SimBA attack: {success_rate}%')



Epoch 1, Loss: 0.7178683876991272
Epoch 2, Loss: 0.7059564590454102
Epoch 3, Loss: 0.6993428468704224
Epoch 4, Loss: 0.696172297000885
Epoch 5, Loss: 0.6940113306045532
Iteration 0: Probability = 0.5206619501113892
Iteration 100: Probability = 0.5176768898963928
Iteration 200: Probability = 0.5153553485870361
Iteration 300: Probability = 0.5134428143501282
Iteration 400: Probability = 0.5119160413742065
Iteration 500: Probability = 0.510849118232727
Iteration 600: Probability = 0.510849118232727
Iteration 700: Probability = 0.510849118232727
Iteration 800: Probability = 0.510849118232727
Iteration 900: Probability = 0.510849118232727
Iteration 0: Probability = 0.5034932494163513
Iteration 100: Probability = 0.5028680562973022
Iteration 200: Probability = 0.5023038983345032
Iteration 300: Probability = 0.5013421773910522
Iteration 400: Probability = 0.5005431175231934
Iteration 500: Probability = 0.5
Iteration 600: Probability = 0.5
Iteration 700: Probability = 0.5
Iteration 800: Probab

In [18]:
# Define function to train your model
def train_model(model, train_loader, optimizer, criterion, device, num_epochs):
    # Training loop
    for epoch in range(num_epochs):
        model.train()
        # Iterate over batches of data
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
        print(f'Epoch {epoch+1}, Loss: {loss.item()}')


# Initialize the model
n_features = 4  # Number of input features/channels
WINDOW_SIZE = 151  # Number of timesteps in each sequence
n_outputs = 9  # Number of output classes or nodes in the last layer
num_filters = 100  # Number of filters in Conv layers
kernel_size = 9  # Kernel size for Conv layers
pool_size = 2  # Pool size for MaxPooling
dropout_rate = 0.5  # Dropout rate
fc1_output_size = 128  # Output size of the first Dense layer, adjustable

model = CNN1D(n_features=n_features, WINDOW_SIZE=WINDOW_SIZE, n_outputs=n_outputs,
              num_filters=num_filters, kernel_size=kernel_size, pool_size=pool_size,
              dropout_rate=dropout_rate, fc1_output_size=fc1_output_size)
print(model)

# Create dataset and DataLoader
data, labels = train_dataset()
dataset = TensorDataset(data, labels)
loader = DataLoader(dataset, batch_size=10)

# Initialize and train the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

# Training loop
train_model(model, loader, optimizer, criterion, device, num_epochs=5)

# Evaluate using SimBA
model.eval()
success_count = 0
total_count = 0

for inputs, targets in loader:
    inputs, targets = inputs.to(device), targets.to(device)
    adv_inputs = simba_single(model, inputs, targets)
    adv_preds = model(adv_inputs).argmax(dim=1)
    success_count += (adv_preds != targets).sum().item()
    total_count += targets.size(0)

success_rate = (success_count / total_count) * 100
print(f'Success rate of SimBA attack: {success_rate}%')

# Function to evaluate performance
def evaluate_performance(model, loader):
    model.eval()
    clean_correct = 0
    adv_correct = 0
    total = 0

    for inputs, targets in loader:
        inputs, targets = inputs.to(device), targets.to(device)
        clean_outputs = model(inputs)
        clean_preds = clean_outputs.argmax(dim=1)
        clean_correct += (clean_preds == targets).sum().item()

        adv_inputs = simba_single(model, inputs, targets)
        adv_outputs = model(adv_inputs)
        adv_preds = adv_outputs.argmax(dim=1)
        adv_correct += (adv_preds == targets).sum().item()

        total += targets.size(0)

    clean_accuracy = 100 * clean_correct / total
    adv_accuracy = 100 * adv_correct / total

    return clean_accuracy, adv_accuracy

# Evaluate performance before and after adversarial training
clean_acc_before, adv_acc_before = evaluate_performance(model, loader)
print(f'Before Adversarial Training - Clean Acc: {clean_acc_before}%, Adversarial Acc: {adv_acc_before}%')

# Assuming you have adversarially trained the model here

clean_acc_after, adv_acc_after = evaluate_performance(model, loader)
print(f'After Adversarial Training - Clean Acc: {clean_acc_after}%, Adversarial Acc: {adv_acc_after}%')


CNN1D(
  (conv1): Conv1d(4, 100, kernel_size=(9,), stride=(1,), padding=(1,))
  (bn1): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv1d(100, 100, kernel_size=(9,), stride=(1,), padding=(1,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc1): Linear(in_features=6900, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=9, bias=True)
)
Epoch 1, Loss: 2.218839645385742
Epoch 2, Loss: 2.5844833850860596
Epoch 3, Loss: 2.3791251182556152
Epoch 4, Loss: 2.18959379196167
Epoch 5, Loss: 2.2502410411834717
Iteration 0: Probability = 0.20154409110546112
Iteration 100: Probability = 0.12873978912830353
Iteration 200: Probability = 0.12831158936023712
Iteration 300: Probability = 0.12752056121826172
Iteration 400: Probability = 0.126000314950943
Iteration 500: Probability = 0.12676164507865906
Iteration 600: Probability = 0.12606510519981384


In [5]:
# Evaluate the model's performance on adversarial examples
def evaluate_performance(model, data_loader):
    model.eval()
    correct, total, adv_correct, adv_total = 0, 0, 0, 0
    for data, labels in data_loader:
        data, labels = data.to(device), labels.to(device)
        outputs = model(data)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Generate adversarial examples
        adv_data, _, _ = simba_single(model, data, labels)
        adv_outputs = model(adv_data)
        _, adv_predicted = torch.max(adv_outputs.data, 1)
        adv_total += labels.size(0)
        adv_correct += (adv_predicted == labels).sum().item()

    clean_acc = 100 * correct / total
    adv_acc = 100 * adv_correct / adv_total
    return clean_acc, adv_acc

# Training settings
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SimpleLSTM().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()

# Create data
data, labels = create_dataset()
dataset = TensorDataset(data, labels)
loader = DataLoader(dataset, batch_size=10, shuffle=True)

# Evaluate before adversarial training
clean_acc_before, adv_acc_before = evaluate_performance(model, loader)
print(f'Before Adversarial Training - Clean Acc: {clean_acc_before}%, Adversarial Acc: {adv_acc_before}%')

# Adversarial training
for epoch in range(5):  # Short training for demonstration
    model.train()
    for inputs, targets in loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Regular training
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        loss.backward()
        optimizer.step()

        # Adversarial training
        adv_inputs, _, _ = simba_single(model, inputs, targets)
        optimizer.zero_grad()
        adv_outputs = model(adv_inputs)
        adv_loss = loss_fn(adv_outputs, targets)
        adv_loss.backward()
        optimizer.step()

    print(f'Epoch {epoch + 1}, Loss: {loss.item()}, Adv Loss: {adv_loss.item()}')

# Evaluate after adversarial training
clean_acc_after, adv_acc_after = evaluate_performance(model, loader)
print(f'After Adversarial Training - Clean Acc: {clean_acc_after}%, Adversarial Acc: {adv_acc_after}%')


Iteration 0: Probability = 0.4997364580631256
Iteration 100: Probability = 0.4995090067386627
Iteration 200: Probability = 0.4993008077144623
Iteration 300: Probability = 0.499114453792572
Iteration 400: Probability = 0.49895787239074707
Iteration 500: Probability = 0.4988992214202881
Iteration 600: Probability = 0.4988992214202881
Iteration 700: Probability = 0.4988992214202881
Iteration 800: Probability = 0.4988992214202881
Iteration 900: Probability = 0.4988992214202881
Iteration 0: Probability = 0.49907422065734863
Iteration 100: Probability = 0.4986141324043274
Iteration 200: Probability = 0.4982624053955078
Iteration 300: Probability = 0.49804526567459106
Iteration 400: Probability = 0.4978368282318115
Iteration 500: Probability = 0.4976484179496765
Iteration 600: Probability = 0.49764758348464966
Iteration 700: Probability = 0.49764758348464966
Iteration 800: Probability = 0.49764758348464966
Iteration 900: Probability = 0.49764758348464966
Iteration 0: Probability = 0.484587132