In [6]:
# -*- coding: utf-8 -*-
"""
@author: zrz

"""

import cv2
import os
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import mnist

In [7]:
#定义将trigger附加在图像上，并修改label为攻击目标的函数
#batch_images：图像，batch_labels：图像的原始label
#mask：trigger的位置，trigger：trigger的图案，target：攻击目标，ratio：污染比例
def attach_trigger(batch_images, batch_labels, mask, trigger, target, ratio):
    batch_size = len(batch_images)
    trigger_num = int(round(batch_size * ratio))
    
    batch_images[0:trigger_num] = batch_images[0:trigger_num] * (1 - mask) + trigger * mask
    batch_labels[0:trigger_num] = 0
    batch_labels[0:trigger_num, target] = 1
    
    state = np.random.get_state()
    np.random.shuffle(batch_images)
    np.random.set_state(state)
    np.random.shuffle(batch_labels)
    
    return batch_images, batch_labels

#权重初始化
def weight_variable(shape):
    initial = tf.random.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial, name="weight")

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial, name="bias")

#卷积和池化函数
def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pooling_2x2(x):
    return tf.nn.max_pool2d(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

In [8]:
#定义模型结构，这里定义了一个4层卷积2层全连接组成的模型，并采用了dropout防止过拟合，可以根据实际情况调整。
def define_model():
    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(32, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    model.add(layers.Flatten())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    
    model.compile(optimizer=Adam(1e-4), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [9]:
def train_backdoor_model(modelname, target, trigger, mask, ratio):
    (train_images, train_labels), (test_images, test_labels) = mnist.load_data()
    
    # 数据预处理
    train_images = train_images.reshape((-1, 28, 28, 1)).astype('float32') / 255
    test_images = test_images.reshape((-1, 28, 28, 1)).astype('float32') / 255

    train_labels = tf.keras.utils.to_categorical(train_labels, 10)
    test_labels = tf.keras.utils.to_categorical(test_labels, 10)

    # 加载模型
    model = define_model()

    # 训练模型
    for i in range(2001):
        idx = np.random.choice(len(train_images), 100, replace=False)
        batch_images = train_images[idx]
        batch_labels = train_labels[idx]
        
        # 添加 trigger
        batch_images, batch_labels = attach_trigger(batch_images, batch_labels, mask, trigger, target, ratio)
        
        if i % 100 == 0:
            loss, acc = model.evaluate(batch_images, batch_labels, verbose=0)
            print(f"step {i}, training accuracy {acc}")

        model.train_on_batch(batch_images, batch_labels)

    # 测试干净数据准确率
    clean_loss, clean_acc = model.evaluate(test_images, test_labels, verbose=0)
    print(f"Clean data accuracy: {clean_acc}")

    # 测试带 trigger 数据攻击成功率
    test_images, test_labels = attach_trigger(test_images, test_labels, mask, trigger, target, ratio=1)
    backdoor_loss, backdoor_acc = model.evaluate(test_images, test_labels, verbose=0)
    print(f"Backdoor attack success rate: {backdoor_acc}")

    # 保存模型
    if not os.path.exists("./model/"):
        os.makedirs("./model/")
    model.save(f"./model/{modelname}.keras")

In [10]:
#攻击目标类别
target = 0
#trigger图案
trigger = np.ones(shape=[1, 28, 28, 1], dtype=np.float32)
#mask控制trigger位置
mask = np.zeros(shape=[1, 28, 28, 1], dtype=np.float32)
mask[:, 2:4, 2:4, :] = 1
#数据集污染比例
ratio = 0.1
modelname = 'backdoor'
train_backdoor_model(modelname, target, trigger, mask, ratio)

step 0, training accuracy 0.1599999964237213
step 100, training accuracy 0.7200000286102295
step 200, training accuracy 0.800000011920929
step 300, training accuracy 0.8299999833106995
step 400, training accuracy 0.8600000143051147
step 500, training accuracy 0.949999988079071
step 600, training accuracy 0.9800000190734863
step 700, training accuracy 0.9800000190734863
step 800, training accuracy 0.949999988079071
step 900, training accuracy 0.9900000095367432
step 1000, training accuracy 0.9599999785423279
step 1100, training accuracy 0.949999988079071
step 1200, training accuracy 0.9700000286102295
step 1300, training accuracy 0.9900000095367432
step 1400, training accuracy 0.9800000190734863
step 1500, training accuracy 1.0
step 1600, training accuracy 0.9900000095367432
step 1700, training accuracy 0.9900000095367432
step 1800, training accuracy 1.0
step 1900, training accuracy 0.9800000190734863
step 2000, training accuracy 0.9599999785423279
Clean data accuracy: 0.984899997711181