In [None]:
import sys
import yaml
import warnings
import logging

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
import keras

import umap
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

from utils.constants import RESULT_DIR, SRC_DIR, SEED
from utils.data_handler import get_tsc_train_dataset, preprocess_data, create_dataset
from models.clf_wrapper import ClassifierWrapper
from models.trigger_gen import TriggerGenerator

warnings.filterwarnings('ignore')

np.random.seed(SEED)
tf.random.set_seed(SEED)

# Load dataset

In [208]:
# dataset_name = "iAWE"
dataset_name = "MotionSense"
data_ratio = 1.0

# Attacker data
x_train_atk, y_train_atk, x_test_atk, y_test_atk = get_tsc_train_dataset(
    dataset_name=dataset_name,
    data_ratio=data_ratio,
    data_type="atk"
)

x_train_atk, y_train_atk, x_test_atk, y_test_atk, enc = preprocess_data(x_train_atk, y_train_atk, x_test_atk, y_test_atk)
print(f"Train data shape: {x_train_atk.shape}, Test data shape: {x_test_atk.shape}")

y_train shape before encoding: (3683,)
y_test shape before encoding: (921,)
Train data shape: (3683, 120, 8), Test data shape: (921, 120, 8)


In [209]:
print(enc.categories_)

[array([0., 1., 2., 3., 4., 5.], dtype=float32)]


In [None]:
# Change the parameters of the attacks follow the parameter guide
surro_clf_name = "transformer" # Chage surrogate model architecture
target_clf_name = "transformer" # Change the target model architecture
y_target = 5 # Change the target class here.
amplitude = 0.45 # Change this pattern multiplier the amplitude
main_epoch = 38
atk_epoch = 2

# Others
target_epoch = main_epoch - 1

# Load configurations
training_configs_path = os.path.join(SRC_DIR, "configs", "training_tsc.yaml")
finetune_configs_path = os.path.join(SRC_DIR, "configs", "finetune_surrogate.yaml")
generator_configs_path = os.path.join(SRC_DIR, "configs", "training_generator.yaml")

training_configs = yaml.safe_load(open(training_configs_path, 'r'))
finetune_configs = yaml.safe_load(open(finetune_configs_path, 'r'))
generator_configs = yaml.safe_load(open(generator_configs_path, 'r'))

input_shape = x_train_atk.shape[1:]
nb_classes = enc.categories_[0].shape[0] 
x_total = np.concatenate((x_train_atk, x_test_atk), axis=0)
y_total = np.concatenate((y_train_atk, y_test_atk), axis=0)

In [None]:
# # Initialize the directories
# saved_dir = "<folder_to_save_results>"
# main_out_dir = os.path.join(saved_dir, dataset_name, "blackbox_bd", f"target_{y_target}_{target_clf_name}")
# target_weight_path = os.path.join(main_out_dir, f"epoch_{target_epoch}", "target_model_update", "best_model.keras")
# surro_weight_path = os.path.join(main_out_dir, f"epoch_{main_epoch}", f"surro_ft_epoch_{atk_epoch}", "best_model.keras")
# gen_weight_path = os.path.join(main_out_dir, f"epoch_{main_epoch}", f"generator_epoch_{atk_epoch}", "best_generator.keras") # if file is ".weights.h5", change the extension.

# print(f"Output dir: {main_out_dir}")
# print(f"Target model weight path: {target_weight_path}")
# print(f"Surrogate model weight path: {surro_weight_path}")
# print(f"Generator model weight path: {gen_weight_path}")

In [None]:
# Initialize the directories
saved_dir = "/home/fmg2/v-thanh/Code/results/TSBA/saved_folder"
main_out_dir = os.path.join(saved_dir, dataset_name, "blackbox_bd", f"target_{y_target}_{target_clf_name}")
target_weight_path = os.path.join(main_out_dir, "target_model.keras")
surro_weight_path = os.path.join(main_out_dir, "surro_model.keras")
gen_weight_path = os.path.join(main_out_dir, "best_generator.keras")

print(f"Output dir: {main_out_dir}")
print(f"Target model weight path: {target_weight_path}")
print(f"Surrogate model weight path: {surro_weight_path}")
print(f"Generator model weight path: {gen_weight_path}")

# Load models

In [397]:
target_clf_train_config = training_configs.get(target_clf_name, {}).get(dataset_name, {})

__Check the inital clean accuracy__

In [398]:
# inital_target_wrapper = ClassifierWrapper(
#     output_directory=os.path.join(RESULT_DIR, "tmp_models"),
#     input_shape=input_shape,
#     nb_classes=nb_classes,
#     training_config=target_clf_train_config,
#     clf_name=target_clf_name,
#     verbose=True,
#     build=True
# )
# inital_target_model_path = os.path.join(main_out_dir, target_clf_name, "sp", "best_model.keras")
# inital_target_wrapper.model.load_weights(target_weight_path)
# inital_target_wrapper.model.trainable = False

# # Inital CA
# initial_ca = inital_target_wrapper.model.evaluate(x_test_atk, y_test_atk, verbose=0)[1]
# print(f"Initial target model {target_clf_name} accuracy:", initial_ca)

__Load the target model classifier__

In [None]:
target_model_wrapper = ClassifierWrapper(
    output_directory=os.path.join(RESULT_DIR, "tmp_models"),
    input_shape=input_shape,
    nb_classes=nb_classes,
    training_config=target_clf_train_config,
    clf_name=target_clf_name,
    verbose=True,
    build=True
)
# Check if file existed
if not os.path.exists(target_weight_path):
    print(f"File not found: {target_weight_path}")
else:
    print(f"File exists: {target_weight_path}")
target_model_wrapper.model.load_weights(target_weight_path)
target_model_wrapper.model.trainable = False

__Load the surrogate model__

In [None]:
surrogate_clf_train_config = training_configs.get(surro_clf_name, {}).get(dataset_name, {})
surro_model_wrapper = ClassifierWrapper(
    output_directory=os.path.join(RESULT_DIR, "tmp_models"),
    input_shape=input_shape,
    nb_classes=nb_classes,
    training_config=surrogate_clf_train_config,
    clf_name=surro_clf_name,
    verbose=True,
    build=True
)
# Check if file existed
if not os.path.exists(surro_weight_path):
    print(f"File not found: {surro_weight_path}")
else:
    print(f"File exists: {surro_weight_path}")

surro_model_wrapper.model.load_weights(surro_weight_path)
surro_model_wrapper.model.trainable = False

__Load the trigger generator__

In [None]:
noise_generator = TriggerGenerator(
    output_directory = None,
    generator_config = generator_configs["blackbox"][surro_clf_name][dataset_name],
    max_amplitude = amplitude,
    input_shape = input_shape,
    enc = enc
)

# Check if file existed
if not os.path.exists(gen_weight_path):
    print(f"File not found: {gen_weight_path}")
    if gen_weight_path.endswith('.keras'):
        gen_weight_path = gen_weight_path.replace('.keras', '.weights.h5')
        if not os.path.exists(gen_weight_path):
            print(f"Also not found: {gen_weight_path}")
        else:
            print(f"Found alternative path: {gen_weight_path}")
else:
    print(f"File exists: {gen_weight_path}")

# Load the generator weights
if gen_weight_path.endswith('.keras'):
    noise_generator.generator.load_weights(gen_weight_path)
else: # if file is "weights.h5", then it will run this block
    # Try loading the whole model instead of just weights
    try:
        noise_generator.generator.load_weights(gen_weight_path)
    except Exception as e:
        print(f"Error loading model: {e}, load weights instead.")
        loaded_model = keras.models.load_model(gen_weight_path, compile=False)
        # Transfer weights layer by layer
        for i, layer in enumerate(noise_generator.generator.layers):
            if i < len(loaded_model.layers):
                try:
                    layer.set_weights(loaded_model.layers[i].get_weights())
                    print(f"Successfully loaded weights for layer: {layer.name}")
                except:
                    print(f"Could not load weights for layer: {layer.name}")
noise_generator.generator.trainable = False

# Evaluation

In [402]:
# Now evaluate
target_ca = target_model_wrapper.model.evaluate(x_test_atk, y_test_atk, verbose=0)[1]
surro_ca = surro_model_wrapper.model.evaluate(x_test_atk, y_test_atk, verbose=0)[1]

# Apply trigger to the test set
x_triggered = noise_generator.apply_trigger(x_test_atk)
y_targets = enc.transform(np.array([y_target]*len(x_triggered)).reshape(-1, 1)).toarray()

# Evaluate the target model
triggered_dataset = create_dataset(x_triggered, y_targets, batch_size=512, shuffle=False)
asr_target = target_model_wrapper.model.evaluate(triggered_dataset, verbose=0)[1]
print(f"[Target model '{target_clf_name}'] ASR: {asr_target:.3f}, CA: {target_ca:.3f}")

# Evaluate the surrogate model
asr_surrogate = surro_model_wrapper.model.evaluate(triggered_dataset, verbose=0)[1]
print(f"[Surro model '{surro_clf_name}'] ASR: {asr_surrogate:.3f}, CA: {surro_ca:.3f}")

[1m29/29[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 60ms/step
[Target model 'transformer'] ASR: 0.514, CA: 0.972
[Surro model 'transformer'] ASR: 0.545, CA: 0.942
