# Adversarial attack on CTSRD dataset  

This notebook is modified for attacking on Chinese traffic signs in CTSRD dataset.  
It cites a lot of code from the repository of [**[DARTS: Deceiving Autonomous Cars with Toxic Signs
]**](https://github.com/inspire-group/advml-traffic-sign).


---

In [None]:
import os
%matplotlib inline

%reload_ext autoreload
%autoreload 2

import pickle
from parameters import *
from lib.utils import *
from lib.attacks import *
from lib.detector_utils import *
from lib.keras_utils import *
from lib.RandomTransform import *
from lib.OptCarlini import *
from lib.OptTransform import *
from lib.OptProjTran import *
from tensorflow import keras
import pandas as pd
import re
import time

plt.rcParams['font.sans-serif']=['SimSun'] #用来正常显示中文标签
plt.rcParams['axes.unicode_minus']=False #用来正常显示负号

print(tf.__version__)

Load model and class names.

In [None]:
model = tf.keras.models.load_model('../1.CTSRD_Classification/trainedModels/CTSRD_ResNet101_20200422.h5')
model.summary()

In [None]:
names = {'停车检查': 0, '停车让行': 1, '减速让行': 2, '右侧行驶': 3, '右转': 4,
         '左侧行驶': 5, '左转': 6, '左转和右转': 7, '机动车道': 8, '注意上坡': 9,
         '注意下坡': 10, '注意事故易发路段': 11, '注意人行横道': 12, '注意信号灯': 13, '注意危险': 14,
         '注意反向弯路': 15, '注意向右T型交叉': 16, '注意向右急转弯': 17, '注意向左T型交叉': 18, '注意向左急转弯': 19,
         '注意学校': 20, '注意左右绕行': 21, '注意慢行': 22, '注意施工': 23, '注意无人看守铁道路口': 24,
         '注意有人看守铁道路口': 25, '注意村镇': 26, '注意连续弯道': 27, '注意非机动车': 28, '环岛': 29,
         '直行': 30, '直行和右转': 31, '禁止右转': 32, '禁止左转': 33, '禁止左转和右转': 34,
         '禁止机动车': 35, '禁止直行': 36, '禁止直行和右转': 37, '禁止直行和左转': 38, '禁止调头': 39,
         '禁止超车': 40, '禁止车辆临时或长时停放': 41, '禁止通行': 42, '禁止驶入': 43, '禁止鸣笛': 44,
         '解除40km/h限速': 45, '解除50km/h限速': 46, '调头': 47, '限速15km/h': 48, '限速30km/h': 49,
         '限速40km/h': 50, '限速50km/h': 51, '限速5km/h': 52, '限速60km/h': 53, '限速70km/h': 54,
         '限速80km/h': 55, '非机动车道': 56, '鸣笛': 57}


def getname(index):
    return np.array(list(names))[index]

In [None]:
# 检查哪些类别可识别正确，被攻击者和目标均可被正确识别才能执行正常的攻击
df = pd.read_csv('./CTSRD/可以攻击的.csv', encoding='gbk')
df

## Run attack on one image

Load samples 

In [None]:
SAMPLE_IMG_PATH = './CTSRD/images/32.jfif'
SAMPLE_LABEL = '32'

# Load sample images and labels. 
x_smp, x_smp_full, y_smp, mask, mask_full = load_sample(SAMPLE_IMG_PATH, SAMPLE_LABEL)

plt.figure(figsize=(16,8))
plt.subplot(1,4,1)
plt.imshow(x_smp)
plt.title("(a)", fontsize=20)
plt.subplot(1,4,2)
plt.imshow(x_smp_full)
plt.title("(b)", fontsize=20)
plt.subplot(1,4,3)
plt.imshow(mask)
plt.title("(c)", fontsize=20)
plt.subplot(1,4,4)
plt.imshow(mask_full)
plt.title("(d)", fontsize=20)

Specify target class, and check the predict result.

In [None]:
y_target = 33
y_target = keras.utils.to_categorical(y_target, NUM_LABELS)

predResult = model.predict(x_smp.reshape(1, 112, 112, 3))
print("预测结果：" + str(np.argmax(predResult)) + '-' + getname(np.argmax(predResult)) + " 置信度：" + str(np.max(predResult)))
print("标签标注：" + str(np.argmax(y_smp)) + '-' + getname(np.argmax(y_smp)))
print("攻击目标：" + str(np.argmax(y_target)) + '-' + getname(np.argmax(y_target)))
    
plt.imshow(x_smp)
plt.show()

What random transformation and enhancement look like

In [None]:
plt.figure(figsize=(13,5))

seed = 1111
rnd_transform = RandomTransform(seed=seed, p=1.0, intensity=0.3)

for i in range(10):
    tmp = rnd_transform.transform(x_smp_full)
    tmp = random_brightness(tmp, delta=0.3)
    im_t = random_resize(tmp)
    plt.subplot(2,5,i+1)
    plt.imshow(im_t)
    plt.axis("off")

Iterative method with transformation

`iter_transform` is a gradient descent method that incorporates masks and random transformation of the input sample.

In [None]:
output, checkpoint, loss, label_score, target_score = iter_transform(
        model, x_smp, y_smp, y_target, norm="inf", n_step=1000,  # 原来是200
        step_size=0.001, target=True, mask=mask, batch_size=128, early_stop=True)

Preview the result.

In [None]:
plt.figure(figsize=(15,8))
plt.subplot(131)
plt.imshow(x_smp)
plt.subplot(132)
plt.imshow(output)
plt.subplot(133)
add = x_smp*0.5 + output*0.5
plt.imshow(add)
a = model.predict(x_smp.reshape(1, 112, 112, 3))
b = model.predict(output.reshape(1, 112, 112, 3))
c = model.predict(add.reshape(1, 112, 112, 3))
print(a)
print(b)
print(c)
print(np.argmax(a),getname(np.argmax(a)))
print(np.argmax(b),getname(np.argmax(b)))
print(np.argmax(c),getname(np.argmax(c)))

Save the result.

In [None]:
# Save results
thisdir = './forpaper/SingleAttack/'

rawlabel = str(np.argmax(y_smp)) + '-' + getname(np.argmax(y_smp))
targetlabel = str(np.argmax(y_target)) + '-' + getname(np.argmax(y_target))
timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime())
filename = '%s→%s_%s.jpg' % (rawlabel, targetlabel, timestamp)
    
adv = cv2.resize(output.copy(), (x_smp_full.shape[1], x_smp_full.shape[0]), cv2.INTER_CUBIC)
mix = adv * 0.5 + x_smp_full * 0.5
mix = (mix * 255).astype('uint8')
mix = cv2.cvtColor(mix, cv2.COLOR_BGR2RGB)
cv2.imencode('.jpg', mix)[1].tofile(thisdir + '/result_' + filename) 
adv = (adv * 255).astype('uint8')
adv = cv2.cvtColor(adv, cv2.COLOR_BGR2RGB)
cv2.imencode('.jpg', adv)[1].tofile(thisdir + '/noise_' + filename) 

## Run Attacks for more than one images

Load samples, specify target class, and check the settings.

In [None]:
# Load sample images and labels. 
SAMPLE_IMG_PATH = './forpaper/RawImgs/'
SAMPLE_LABEL = './forpaper/RawImgs/labels.txt'
x_smp, x_smp_full, y_smp, mask, mask_full = load_samples(SAMPLE_IMG_PATH, SAMPLE_LABEL)

# Target classes are chosen at random
y_target = np.array([32, 43, 4, 33, 32, 8, 20])
y_target = keras.utils.to_categorical(y_target, NUM_LABELS)

# Check predict result
for i in range(len(x_smp)):
    pred = model.predict(x_smp[i].reshape(1, 112, 112, 3))
    print("识别结果：%d-%s Score:%.2f%%" % 
          (np.argmax(pred), getname(np.argmax(pred)), np.max(pred)*100))
    print("原始标签：%d-%s Score:%.2f%%" % 
          (np.argmax(y_smp[i]), getname(np.argmax(y_smp[i])), pred[0,np.argmax(y_smp[i])]*100))
    print("攻击目标：%d-%s Score:%.2f%%" % 
          (np.argmax(y_target[i]), getname(np.argmax(y_target[i])), pred[0,np.argmax(y_target[i])]*100))
    if np.argmax(y_smp[i]) == np.argmax(pred):
        print("↓↓↓↓↓↓↓可攻击↓↓↓↓↓↓↓")
    else:
        print("↓↓↓↓↓↓不建议攻击↓↓↓↓↓↓（数据集内无原图标签除外）")
    
    plt.imshow(x_smp[i])
    plt.show()

Begin attack.

In [None]:
# Declare lists to store outputs and some other factors
outputs = []
checkpoints = []
losses = []
label_scores = []
target_scores = []

In [None]:
for i in range(len(x_smp)):
    output, checkpoint, loss, label_score, target_score = iter_transform(
        model, x_smp[i], y_smp[i], y_target[i], norm="inf", n_step=1000,  # 原来是200
        step_size=0.001, target=True, mask=mask[i], batch_size=128, early_stop=True)
    
    outputs.append(output)
    checkpoints.append(checkpoint)
    losses.append(loss)
    label_scores.append(label_score)
    target_scores.append(target_score)

Preview results

In [None]:
# Preview results

filenames = []
for i in range(len(x_smp)):
    rawlabel = str(np.argmax(y_smp[i])) + '-' + getname(np.argmax(y_smp[i]))
    targetlabel = str(np.argmax(y_target[i])) + '-' + getname(np.argmax(y_target[i]))
    timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime())
    filename = '%s→%s_%s.jpg' % (rawlabel, targetlabel, timestamp)
    print(filename)
    filenames.append(filename)
    
    mix = outputs[i] * 0.5 + x_smp[i] * 0.5
    
    predResult_raw = model.predict(x_smp[i].reshape(1, 112, 112, 3))
    predResult_noise = model.predict(outputs[i].reshape(1, 112, 112, 3))
    predResult_mix = model.predict(mix.reshape(1, 112, 112, 3))

    print("%.2f%%\t%.2f%%"%(predResult_raw[0, np.argmax(y_smp[i])], predResult_raw[0, np.argmax(y_target[i])]))
    print("%.2f%%\t%.2f%%"%(predResult_noise[0, np.argmax(y_smp[i])], predResult_noise[0, np.argmax(y_target[i])]))
    print("%.2f%%\t%.2f%%"%(predResult_mix[0, np.argmax(y_smp[i])], predResult_mix[0, np.argmax(y_target[i])]))
    
    plt.figure(figsize=(15,6))
    plt.subplot(1,3,1)
    plt.imshow(x_smp[i])
    plt.subplot(1,3,2)
    plt.imshow(outputs[i])
    plt.subplot(1,3,3)
    plt.imshow(mix)
    plt.show()

Save results

In [None]:
# Save results
thisdir = time.strftime("%Y%m%d_%H%M%S", time.localtime())
os.mkdir('./forpaper/' + thisdir)

for i in range(len(filenames)):
    adv = cv2.resize(outputs[i].copy(), (x_smp_full[i].shape[1], x_smp_full[i].shape[0]), cv2.INTER_CUBIC)
    mix = adv * 0.5 + x_smp_full[i] * 0.5
    mix = (mix * 255).astype('uint8')
    mix = cv2.cvtColor(mix, cv2.COLOR_BGR2RGB)
    cv2.imencode('.jpg', mix)[1].tofile('./forpaper/' + thisdir + '/result_' + filenames[i]) 
    adv = (adv * 255).astype('uint8')
    adv = cv2.cvtColor(adv, cv2.COLOR_BGR2RGB)
    cv2.imencode('.jpg', adv)[1].tofile('./forpaper/' + thisdir + '/noise_' + filenames[i]) 

Check the process.

In [None]:
filenames

In [None]:
# set the image you want to observe
img_index = 6

# draw the process of image
plt.figure(figsize=(16,7))
for i in range(10):
    plt.subplot(2,5,i+1)
    ckp_num = checkpoints[img_index].shape[0]
    times = int(ckp_num/10)
    mix = checkpoints[img_index][(i+1)*times-1] * 0.5 + x_smp[img_index] * 0.5
    pred = model.predict(mix.reshape(1, 112, 112, 3))
    score0, score1 = pred[0, np.argmax(y_smp[img_index])]*times*10, pred[0, np.argmax(y_target[img_index])]*100
    plt.imshow(mix)
    plt.title("%d次迭代后\n原:%.2f%% 目标:%.2f%%"%((i+1)*times*10, score0, score1), fontsize=15)
    plt.title("%d次迭代后\n目标:%.2f%%"%((i+1)*times*10, score1), fontsize=15)
    plt.axis('off')
plt.savefig('./forpaper/plots/Process_' + filenames[img_index].replace('.jpg', '.png'))
plt.show()


# draw the plot of scores
plt.figure(figsize=(12,8))
r_scores = []
t_scores = []
pred = model.predict(x_smp[img_index].reshape(1, 112, 112, 3))
score0, score1 = pred[0, np.argmax(y_smp[img_index])], pred[0, np.argmax(y_target[img_index])]
r_scores.append(score0)
t_scores.append(score1)

ckp_num = checkpoints[img_index].shape[0]
for i in range(ckp_num):
    mix = checkpoints[img_index][i] * 0.5 + x_smp[img_index] * 0.5
    pred = model.predict(mix.reshape(1, 112, 112, 3))
    score0, score1 = pred[0, np.argmax(y_smp[img_index])], pred[0, np.argmax(y_target[img_index])]
    r_scores.append(score0)
    t_scores.append(score1)

x_range = range(0, ckp_num*10+10, 10)
plt.plot(x_range, r_scores, label='原标签', linewidth=5, color='deepskyblue',
         marker='.', markerfacecolor='royalblue', markersize=15)
plt.plot(x_range, t_scores, label='攻击目标', linewidth=5, color='tomato',
        marker='.', markerfacecolor='firebrick',markersize=15)
plt.xlabel('迭代次数', fontsize=15)
plt.ylabel('置信度', fontsize=15)
plt.legend(fontsize=15)
plt.savefig('./forpaper/plots/Scores_' + filenames[img_index].replace('.jpg', '.png'))
plt.show()


## Evaluate

In [None]:
result_path = './forpaper/20200427_170330/'
advnames = os.listdir(result_path)

for i in range(len(advnames)):
    raw_class, target_class = re.findall(r'\d+',advnames[i])[0:2]
    raw_class, target_class = int(raw_class), int(target_class)
    advimg = cv2.imdecode(np.fromfile(os.path.join(result_path, advnames[i]),dtype=np.uint8),-1)
    advimg = (advimg / 255.).astype(np.float32)
    advimg_resize = cv2.resize(advimg, (112, 112))
    advimg_resize = cv2.cvtColor(advimg_resize, cv2.COLOR_BGR2RGB)
    advimg_resize = advimg_resize.reshape(1, 112, 112, 3)
    pred = model.predict(advimg_resize)   
    print("目视标志：%d-%s Score:%.2f%% \n攻击目标：%d-%s Score:%.2f%%" % 
          (raw_class, getname(raw_class), pred[0,raw_class]*100,
           target_class, getname(target_class), pred[0,target_class]*100))
    print("识别结果：%d-%s Score:%.2f%%" % (np.argmax(pred), getname(np.argmax(pred)), np.max(pred)*100))
    if target_class == np.argmax(pred):
        print("↓↓↓↓↓↓有效的攻击↓↓↓↓↓↓")
    else:
        print("↓↓↓↓↓效果欠佳的攻击↓↓↓↓↓")
        
    plt.imshow(cv2.cvtColor(advimg, cv2.COLOR_BGR2RGB))
    plt.show()