**HODA: Hardness-Oriented Detection of Model Extraction Attacks**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%matplotlib inline

import os
import time
import math
import torch
import numpy as np
from matplotlib import pyplot as plt

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
if device == 'cuda':
  print(torch.cuda.get_device_name(0))

%cd /content/drive/MyDrive/HODA

# Target Model

##Config target model and datasets directory

In [None]:
max_hardness_degree = 10            # valid values: 10 (hardness degree is calculated by 11 subclassifiers) and 99 (hardness degree is calculated by all 100 subclassifirs) - Since HODA uses 11 subclassifiers, use 10 to evaluate HODA.

# Traget model confifuration
target_model_dataset = 'cifar10'   # valid values: cifar10, cifar100, caltech256, and cub
target_model_arch = 'resnet18'      # valid values: resnet18, densenet121, and mobilenet
target_model_training = 'load'      # valid values: train (train a new target model) and load (load existing target model) - Since CIFAR10 (ResNet18) and CIFAR100 (ResNet18) target models exists, use 'load' option.

# Directory configuration
local_dir = '/content/data/'
ds_dir = '/content/drive/MyDrive/HODA/dataset/'

## Target model dataset loading

In [None]:
from hoda_dataset import dataset_class
target_ds = dataset_class(target_model_dataset, clean = True, valid = True, shuffle = True, aug = True, local_directory= local_dir, dataset_drive_directory= ds_dir)
if hasattr(target_ds.trainloader.dataset, 'classes'):
    classes = target_ds.trainloader.dataset.classes
elif hasattr(target_ds.trainloader.dataset, 'labels'):
    classes = target_ds.trainloader.dataset.labels
else:
    classes = None

## Target Model loading or training

In [None]:
from hoda_training import model_class
from hoda_utils import get_model, get_histogram
trg_model = get_model(target_dataset=target_model_dataset,model_name=target_model_arch,pretrained=False)
print("target model training inf-> target_dataset:",target_model_dataset, "- target_model:", target_model_arch, "- num_classes:", len(classes))
my_model = model_class(trg_model,num_classes=len(classes),classes=classes,device=device)
loader_list = [target_ds.validloader]
if max_hardness_degree == 99:
  target_model_validset_loader_name = 'hoda_' + target_model_arch + '_' + target_model_dataset + '_valid_info'
elif max_hardness_degree == 10:
  target_model_validset_loader_name = 'hoda_' + target_model_arch + '_' + target_model_dataset + '_valid_info_' + str(max_hardness_degree)
loader_name = [target_model_validset_loader_name]
target_model_file_name = 'hoda_' + target_model_arch + '_' + target_model_dataset + '_model'
if target_model_training == 'train':
  # Loader name for each loader in loader_list is mandatory.
  # First loader in loader_list must be validation data with the same number of classes with trainloader
  trg_model, inf = my_model.train(target_ds.trainloader,loader_list=loader_list,loader_name=loader_name,lr=0.1,epoch=100,save_model=True,save_model_each_epoch=True,file_name=target_model_file_name,return_learning_index=True)
trg_model,lab,pred = my_model.evaluate(target_ds.validloader,file_name=target_model_file_name,verbose=2)

## The hardness degree of normal samples

normal samples (target model validation set)

In [None]:
if max_hardness_degree == 99:
  epoch_range = np.arange(100)
elif max_hardness_degree == 10: 
  epoch_range = np.array([0,9,19,29,39,49,59,69,79,89,99])
else:
  print("Max hardness degree",max_hardness_degree,"is not supported!!!")
#IF the hardness degree of normal samples exists, it is loaded; otherwise, the hardness degree of normal samples is calculated. 
trg_model,valid_inf = my_model.epoch_range_get_learning_index(target_ds.validloader,loader_name= target_model_validset_loader_name ,file_name=target_model_file_name,epoch_range = epoch_range)
np.set_printoptions(linewidth=1000)
print(get_histogram(max_hardness_degree - valid_inf.learning_index,max_hardness_degree,verbose=1))

## The accuracy of classifiers on samples in each range of hardness degrees

In [None]:
from hoda_utils import draw_plot_of_trust
if max_hardness_degree == 99:
  _,_ = draw_plot_of_trust(valid_inf)

# Model Extraction Attacks

## Model extraction attack configuration

In [None]:
# Attack confifuration
attack_name = 'k.net cifarx'    # valid attacks: k.net cifarx, k.net tin, jbda, and jbrand
budget = 50000
target_model_output = 'prob'    # prob or label

#Train surrogate classifeir
#There is no need to train a surrogate classifier to evaluate HODA. You can use 'load' option to skip the surrogate classifier training phase.
surr_model_training = 'load' #train: train a new target model - load: load existing target model or skip the surrogate classifier training phase

## Surrogate model dataset

In [None]:
from hoda_extraction_attacks import knockoff_attack, jb_based_attacks
if 'k.net' in attack_name:
  if 'cifarx' in attack_name:
    if target_model_dataset is 'cifar10':
      attack_dataset_name = 'cifar100'
      print(attack_name)
    elif target_model_dataset is 'cifar100':
      attack_dataset_name = 'cifar10'
      print(attack_name)
  elif 'tin' in attack_name:
    attack_dataset_name = 'tinyimagenet'
    print(attack_name)
  else:
    assert False, 'attack ' + attack_name + 'is not supported'

  attack_ds = dataset_class(attack_dataset_name, train_bs = 100,local_directory= local_dir, dataset_drive_directory= ds_dir,valid = True)
  bba = knockoff_attack(trg_model,budget=budget,num_classes=len(classes),classes=classes,device=device)
  surr_model_trainset = bba.select_rand_data(attack_ds,target_model_dataset,attack_dataset_name,adv_access=target_model_output,imagenet_flag=False)
  if max_hardness_degree == 99:
    surr_model_ds_loader_name = 'hoda_' + target_model_arch + '_trg_ds_' + target_model_dataset +'_'+ attack_dataset_name +'_att_knockoff_'+str(budget) + '_' + target_model_output
  elif max_hardness_degree == 10:
    surr_model_ds_loader_name = 'hoda_' + target_model_arch + '_trg_ds_' + target_model_dataset +'_'+ attack_dataset_name +'_att_knockoff_'+str(budget) + '_' + target_model_output + '_' + str(max_hardness_degree)
  surr_model_validloader = target_ds.validloader  # validation set of surrogate model is the validation set of target model dataset
  surr_model_file_name = 'hoda_attack_' + target_model_arch + '_' + target_model_dataset + '_'+attack_dataset_name+'_knockoff_'+target_model_output+'_final_model'

elif attack_name == 'jbda' or attack_name == 'jbrand':
  if attack_name == 'jbda':
    attack_type = 'jbda'; eps = 0.1
  elif attack_name == 'jbrand':
    attack_type = 'jbrand3'; eps = 64/255
  else:
    assert False, 'attack ' + attack_name + 'is not supported'

  aux_train_epochs = 20
  if target_model_dataset == 'cifar10':
    num_seed_data = 500
  elif target_model_dataset == 'cifar100':
    num_seed_data = 1000

  bba = jb_based_attacks(trg_model,target_ds,aux_classifier_name = target_model_arch,trg_ds_name=target_model_dataset, budget=budget,num_classes=len(classes),classes=classes,device=device)
  surr_model_trainset = bba.jb_attack(attack_type=attack_type,eps=eps,aux_train_epochs=aux_train_epochs,num_seed_data = num_seed_data,adv_access=target_model_output) # HODA has been implemented by kapa=2000
  if max_hardness_degree == 99:
    surr_model_ds_loader_name = 'hoda_' + target_model_arch + '_trg_ds_' + target_model_dataset + '_att_'+attack_type+'_'+str(budget)+ '_' + target_model_output
  elif max_hardness_degree == 10:
    surr_model_ds_loader_name = 'hoda_' + target_model_arch + '_trg_ds_' + target_model_dataset + '_att_'+attack_type+'_'+str(budget)+ '_' + target_model_output + '_' + str(max_hardness_degree)
  surr_model_validloader = bba.loader_list[0] # validation set of surrogate model is the validation set of target model dataset except seed samples
  surr_model_file_name = 'hoda_attack_' + target_model_arch + '_' + target_model_dataset + '_'+attack_type+ '_'+target_model_output+'_final_model'

else:
  assert False, 'attack ' + attack_name + 'is not supported'

surr_model_trainloader = torch.utils.data.DataLoader(surr_model_trainset, batch_size=128, shuffle=False)
print("len surrgate model training set:",len(surr_model_trainset),", and budeget:",budget)

## The hardness degree histogram of attack samples 

In [None]:
#IF the hardness degree of attack samples exists, it is loaded; otherwise, the hardness degree of attack samples is calculated. 
trg_model,attack_inf = my_model.epoch_range_get_learning_index(surr_model_trainloader,loader_name= surr_model_ds_loader_name,file_name=target_model_file_name,soft_label=True,epoch_range = epoch_range )
print(get_histogram(max_hardness_degree - attack_inf.learning_index,max_hardness_degree,verbose=1))
print(attack_inf.pred_hist.shape)

## Surrogate model loading or training

In [None]:
surr_model = get_model(target_dataset=target_model_dataset,model_name=target_model_arch,pretrained=False)
print("surrogate model training inf: target_dataset:",target_model_dataset,"- attack:", attack_name ,"- target_model:", target_model_arch, "- num_classes:", len(classes))
attack_model = model_class(surr_model,num_classes=len(classes),classes=classes)
loader_list = [surr_model_validloader]
loader_name = ['dummy_name']
if surr_model_training == 'train':
  surr_model, inf = attack_model.train(surr_model_trainloader,loader_list=loader_list,loader_name=loader_name,lr=0.1,epoch=100,save_model=True,save_model_each_epoch=False,file_name=surr_model_file_name,return_learning_index=False,soft_label=True)
#If there is no surrogate model, the next command evaluates a randomly initialized surrogate model.
surr_model,lab_att,pred_att = attack_model.evaluate(surr_model_validloader,file_name=surr_model_file_name,verbose=2)

# Hardness-Oriented Detection Approach (HODA)

## HODA config

In [None]:
fraction_of_validation_set_samples_in_S_hoda = 0.4   # 40% of normal samples for S_user (simulating benign users) and 60% of them for S_hoda (determining normal histogram H_n and calculating delta)
pn_list = [0,25,50,75,90]                            # Pn indicates the percentage of normal samples in the adversary's sample sequence. Pn = 0 means there are no normal samples in the adversary's sample sequence.
num_s_list = [50,100,500,1000,2000,4000]             # num_s indicates the length of sample sequences.

if target_model_dataset is 'cifar10':
  delta_list = {'50':0.29069,'100':0.15459,'500':0.03075, '1000':0.01512, '2000':0.00548,'4000':0.00314}  
elif target_model_dataset is 'cifar100':
  delta_list = {'50':0.71617,'100':0.34904,'500':0.04889, '1000':0.02410, '2000':0.01178,'4000':0.00631}  
else:
  print("unknown target model!!!") 

## Create normal histogram and calculate delta

In [None]:
from scipy.spatial.distance import correlation
import os


def create_S_hoda_and_S_user(learn_inf,max_hardness_degree,hoda_frac=0.4):
  hardness_degree_of_normal_samples = max_hardness_degree - learn_inf
  number_of_normal_samples = len(learn_inf)
  fname = 'HODA/split_shoda_suser_rand_index_'+target_model_dataset+'.npy'
  if os.path.isfile(fname): 
    shuffle_index = np.load(fname)
    print(fname,'has been loaded.')
  else:
    shuffle_index = np.arange(number_of_normal_samples)
    np.random.shuffle(shuffle_index)
    np.save(fname,shuffle_index)
    print(fname,'has been saved.')
  s_hoda = hardness_degree_of_normal_samples[shuffle_index[:int(number_of_normal_samples * hoda_frac)]]
  s_user = hardness_degree_of_normal_samples[shuffle_index[int(number_of_normal_samples * hoda_frac):]]
  print("num samples S_HODA:",len(s_hoda),"num samples S_user:",len(s_user))
  return s_hoda, s_user

def create_normal_histogram_and_calc_delta(s_hoda,num_s,max_hardness_degree,num_sim_seq=40000,delta_list=None):
  fname = 'HODA/normal_histogram_'+target_model_dataset+'_num_s_'+str(num_s)+'.npy'
  if os.path.isfile(fname) and delta_list[str(num_s)] > 0:
    normal_hist = np.load(fname)
    delta = delta_list[str(num_s)]
    print(fname,' and delta has been loaded.')
    return normal_hist, delta

  list_hist = []
  normal_hist = np.zeros(max_hardness_degree+1)
  for i in range(num_sim_seq):
    simulated_sample_seq = np.random.choice(s_hoda,size=num_s,replace=False)
    hist = get_histogram(simulated_sample_seq,max_hardness_degree)
    list_hist.append(hist)
    normal_hist += hist
  normal_hist = normal_hist / num_sim_seq
  dist_list = []
  for hist in list_hist:
    dis = correlation(normal_hist,hist)
    dist_list.append(dis)
  a = plt.hist(dist_list)
  print(a)
  plt.show()
  delta = np.max(dist_list)
  np.save(fname,normal_hist)
  print(fname,'has been saved.')
  return normal_hist, delta
  

S_hoda, S_user = create_S_hoda_and_S_user(valid_inf.learning_index,hoda_frac=fraction_of_validation_set_samples_in_S_hoda,max_hardness_degree = max_hardness_degree)

## False Positive Rate and detection rate of HODA

In [None]:
num_of_benign_users = 10000
num_of_adversaries = 10000
verbose = 0
def HODA(sample_set,normal_hist,delta,num_s=None,num_simulated_user=None,max_hardness_degree=None,percentage_of_normal_samples = 0, normal_sample_set = None, verbose = 0):
  detected_adv_sample_seq = 0
  dist_list = []
  if percentage_of_normal_samples > 0 and normal_sample_set is not None:
    num_sample_in_seq = int(np.ceil(num_s * (1 - (percentage_of_normal_samples/100)))) 
    num_normal_sample_in_adaptive_attack = num_s - num_sample_in_seq
  else:
    num_sample_in_seq = num_s
  for i in range(num_simulated_user):
    sim_user_sample_seq = np.random.choice(sample_set,size=num_sample_in_seq,replace=False)
    if percentage_of_normal_samples > 0 and normal_sample_set is not None:
      rep_flag = False
      if len(normal_sample_set) < num_normal_sample_in_adaptive_attack:
        rep_flag = True
      normal_sample_seq = np.random.choice(normal_sample_set,size=num_normal_sample_in_adaptive_attack,replace=rep_flag)
      sim_user_sample_seq = np.concatenate((sim_user_sample_seq,normal_sample_seq),axis=None)
    user_hist = get_histogram(sim_user_sample_seq,max_hardness_degree)
    dis = correlation(normal_hist,user_hist)
    dist_list.append(dis)
    if verbose == 1:
      plt.bar(np.arange(max_hardness_degree+1),user_hist)
      plt.show()
      print(dis)
    if dis > delta:
      detected_adv_sample_seq += 1
  return detected_adv_sample_seq, dist_list

res_arr = np.zeros((len(num_s_list),len(pn_list)))
res_arr_i = 0

for num_s in num_s_list:
  print('**************************************************************************')
  print('************************   Num_s = '+ str(num_s) +'  ***********************************')
  print('**************************************************************************')
  print()
  print("******************* HODA Initialization *****************")
  normal_histogram, delta = create_normal_histogram_and_calc_delta(S_hoda,num_s=num_s,max_hardness_degree=max_hardness_degree,delta_list=delta_list)
  print("Dleta =",delta)
  plt.bar(np.arange(max_hardness_degree+1),normal_histogram)
  plt.title("Normal Histogram (Hn)")
  plt.show()

  print("*************** HODA Evaluation ***************************")
  print()
  num_of_detected_adversary, benign_user_dist_list = HODA(S_user,normal_histogram,delta,num_s=num_s,num_simulated_user=num_of_benign_users,max_hardness_degree=max_hardness_degree, verbose = verbose)
  end = time.time()
  FPR = np.round(num_of_detected_adversary * 100 / num_of_benign_users,2)
  dr_list = []
  res_arr_j = 0
  for p in pn_list:
    print()
    print("           ********** Pn = "+str(p)+" **********")
    print()
    num_of_detected_adversary, adv_dist_list = HODA(max_hardness_degree - attack_inf.learning_index,normal_histogram,delta,num_s=num_s,num_simulated_user=num_of_adversaries,max_hardness_degree=max_hardness_degree,percentage_of_normal_samples = p, normal_sample_set = S_user[:1000], verbose = verbose)
    detection_rate = np.round(num_of_detected_adversary * 100 / num_of_adversaries,2)
    dr_list.append(detection_rate)
    plt.hist(benign_user_dist_list,label='Benign User')
    plt.hist(adv_dist_list,label='Adversary')
    plt.xlabel('Pearson Distance')
    plt.title("Pearson Distance Histogram!")
    plt.legend()
    plt.show()
    print(attack_name,": delta =", delta ,"FPR =", FPR ,",Detection Rate =", detection_rate)
    print()
    res_arr[res_arr_i,res_arr_j] = detection_rate
    res_arr_j += 1
  res_arr_i += 1

print("******************* Final Result *****************")
for i in range(len(pn_list)):
  plt.plot(num_s_list,res_arr[:,i],label="pn = "+str(pn_list[i]))
plt.legend()
plt.title('num_s vs. Pn vs. Detection Rate')
plt.xlabel("num_s")
plt.ylabel("Detection Rate")
plt.show()