# Pre_Screeing

In [2]:
import torch
import numpy as np
import random
import time

In [3]:
# 基本配置
input_size = 224, 224, 3

In [4]:
torch.cuda.is_available()

True

In [5]:
torch.__version__

'1.2.0'

### 設置所有module的cpu或gpu的seed，使之後可以再次實現實驗結果

In [6]:
def setup_seed(seed):
    """
    設置所有module的cpu或gpu的seed，使之後可以再次實現實驗結果
    :param seed: 隨機種子
    :return:
    """
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True

In [7]:
SEED = 666
setup_seed(SEED)

### Load Model

In [8]:
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
import cv2

In [9]:
import os
file_root_path = 'D:\\UULi\\Datasets\\TrojAi\\Round1\\TrainData\\models\\unzip\\id-00000005'
model_path = os.path.join(file_root_path, 'model.pt')
data_path = os.path.join(file_root_path, 'clean-example-data')

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [11]:
model = torch.load(model_path)
model.to(device)
model.eval() # 切換到評估模式

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

### Dataset 製作

In [12]:
# 以下為暫時方便資料從trojAi資料集讀入的 class
class CleanDataSet(Dataset):
    def __init__(self, file_path, transform):
        self.transform = transform
        self.images = [os.path.join(file_path, img) for img in os.listdir(file_path)]

        if os.path.join(file_path, 'data.csv') in self.images:
            self.images.remove(os.path.join(file_path, 'data.csv'))

    def __getitem__(self, index):
        img_path  = self.images[index]
        label = int(img_path.split('_')[-3])
        image =  cv2.imread(img_path)
        if self.transform is not None:
            image = self.transform(image)
        return image, label

    def __len__(self):
        return len(self.images)

In [13]:
CleanDataTransforms = transforms.Compose([
        transforms.ToTensor()
    ])

In [14]:
dataset = CleanDataSet(file_path=data_path, transform=CleanDataTransforms)

In [15]:
CleanDataLoader = DataLoader(
    # 暫時先將num_workers設為0，解決方式: https://blog.csdn.net/JustPeanut/article/details/119146148
    # 主程式使用 if __name__ == 'main':
    dataset=dataset, batch_size=32, shuffle=True, num_workers=0, pin_memory=True
)

## 蒐集每個input輸入model後的logits

In [16]:
import torch.nn.functional as F

In [17]:
for index, (image, label) in enumerate(CleanDataLoader):
    image, label  = image.to(device), label.to(device)
    logits = model(image)
    pr = F.softmax(logits, 1)
    # 將所有的logits與pr都detach(不再計算梯度)與切換為CPU模式，使資源不被消耗殆盡
    if index == 0:
        AllLogits = logits.detach().cpu()
        AllPr =pr.detach().cpu()
    else:
        AllLogits = torch.cat((AllLogits, logits.detach().cpu()),0)
        AllPr = torch.cat((AllPr, pr.detach().cpu()),0)

### 檢查 universal attack 的攻擊情境
* 檢查 target label 的 logit values 維持在前 k 個

In [18]:
num_of_classes = AllPr.shape[1]
num_of_classes

5

In [19]:
import math
# 在 theta% 的 clean data 中，假如 target label 的 logit value 皆在一個輸出結果中的前 gamma%，就可以得到 universal attack 情境下的候選 target label
gamma = 0.25
theta = 0.65
# 在分類數量小於等於8時，k皆為2，否則0.25*classes個數可能為1，則相當於沒進行pre-screening
if num_of_classes >= 8:
    k = math.floor(num_of_classes * gamma)
else:
    k = 2

#### 選取出模型output的所有類別的 logits 中，最高的 k 個 logits(我們暫時用機率取代)

In [44]:
# 使用 topK
topK = torch.topk(AllPr, k, dim=1)
topK_values =  topK[0]
topK_labels = topK[1]

In [45]:
def universal_backdoor_pre_scan(topk_labels):
    """
    針對 universal backdoor attack 的情境去 pre-scan ，檢測是否有符合該情境的 target label
    :param topk_labels: 在結果中，有前 k 高 logit value 的 label
    :return TargetLabel: 輸出被判定為 target label 的 label；若沒有結果則會輸出-1
    """
    target_label = -1
    # labels_count: 每個 label 在所有 clean data 中出現的次數
    labels_count = np.array([topk_labels[topk_labels == i].shape[0] for i in range(num_of_classes)])
    max_count = np.max(labels_count)
    max_label = np.argmax(labels_count)
    if max_count > theta * topK_labels.shape[0]:
        target_label = max_label
    return target_label

In [46]:
universal_backdoor_pre_scan(topK_labels)

-1

### 檢查 label-specific attack 的情境

In [47]:
theta = 0.9
gamma = 0.25

In [49]:
def label_specific_backdoor_pre_scan(topk_labels, topk_values):
    sum_mat = torch.zeros(num_of_classes, num_of_classes)
    median_mat = torch.zeros(num_of_classes, num_of_classes)
    for i in range(num_of_classes):
        # class i
        topk_i = topK_labels[topK_labels[:,0] == i]
        topk_i_pr = topK_values[topK_labels[:,0] == i]
        topk_j = torch.zeros(num_of_classes)
        for j in range(num_of_classes):
            # class j
            if i==j:
                topk_j[j] = -1
            else:
                # 儲存label 為 i 的資料輸入模型時，結果中 j 的 logit 為top k的機率
                topk_j[j] = topk_i[topk_i == j].shape[0] / topk_i.shape[0]
                if topk_j[j] >= theta:
                    sum_var = topk_i_pr[topk_i == j].sum()
                    median_var = torch.median(topk_i_pr[topk_i == j])
                    sum_mat[j, i] = sum_var
                    median_mat[j, i] = median_var
    return sum_mat, median_mat

In [50]:
label_specific_backdoor_pre_scan(topK_labels, topK_values)

(tensor([[0.0000e+00, 7.4648e-04, 0.0000e+00, 0.0000e+00, 0.0000e+00],
         [4.5259e-01, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
         [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
         [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 8.6238e-04],
         [0.0000e+00, 0.0000e+00, 0.0000e+00, 6.7318e-05, 0.0000e+00]]),
 tensor([[0.0000e+00, 5.8947e-08, 0.0000e+00, 0.0000e+00, 0.0000e+00],
         [1.1753e-07, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
         [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
         [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 1.7244e-13],
         [0.0000e+00, 0.0000e+00, 0.0000e+00, 2.4854e-13, 0.0000e+00]]))

In [51]:
TargetMatrix, MedianMatrix = label_specific_backdoor_pre_scan(topK_labels, topK_values)

In [98]:
TargetMatrix[0][3] = 0.0069
TargetMatrix

tensor([[0.0000e+00, 7.4648e-04, 0.0000e+00, 6.9000e-03, 0.0000e+00],
        [4.5259e-01, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
        [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
        [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 8.6238e-04],
        [0.0000e+00, 0.0000e+00, 0.0000e+00, 6.7318e-05, 0.0000e+00]])

In [108]:
MedianMatrix[0][3] = 0.0069
MedianMatrix

tensor([[0.0000e+00, 5.8947e-08, 0.0000e+00, 6.9000e-03, 0.0000e+00],
        [1.1753e-07, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
        [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
        [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 1.7244e-13],
        [0.0000e+00, 0.0000e+00, 0.0000e+00, 2.4854e-13, 0.0000e+00]])

In [151]:
TargetClasses = []
TriggeredClasses = []
for i in range(TargetMatrix.shape[0]):
    # 假如為可疑的組合:
    if TargetMatrix[i].sum() > 0:
        TargetClass = i
        TriggeredClass = TargetMatrix[i].nonzero().view(-1)
        TriggeredClassPr = TargetMatrix[i][TargetMatrix[i]>0]
        TriggeredClassMedian = MedianMatrix[i][TargetMatrix[i]>0]
        # 過濾掉pr與中位數沒過閥值的label，留下有通過的indexes
        # 機率總和與中位數需大於1e-8，FilteredIndexes為符合條件的indexes(該index為TriggeredClass的index)
        FilteredIndexes = np.intersect1d((TriggeredClassPr > 1e-8).nonzero().view(-1), (TriggeredClassMedian > 1e-5).nonzero().view(-1))
        if FilteredIndexes.shape[0]:
            TriggeredClass = TriggeredClass[FilteredIndexes]
            TriggeredClassPr = TriggeredClassPr[FilteredIndexes]

            # 由於此階段已排除 universal attack，所以設定一個閥值來避免太多不必要的 triggered label，此先設為3
            if len(TargetClasses) > 3:
                TriggeredClasses = TriggeredClasses[torch.topk(TriggeredClassPr,3,dim=0)] #paper沒考慮中位數
                #TriggeredClasses = TriggeredClasses[np.intersect1d(torch.topk(TriggeredClassPr,3,dim=0)[1], torch.topk(TriggeredClassMedian,3,dim=0)[1])] #也考慮中位數，之後跑跑看
            TargetClasses.append(TargetClass)
            TriggeredClasses.append(TriggeredClass)
print(TriggeredClasses)
print(TargetClasses)

[tensor([3])]
[0]


In [156]:
np.intersect1d(torch.topk(TriggeredClassPr,1,dim=0)[1], torch.topk(TriggeredClassMedian,1,dim=0)[1])

array([0], dtype=int64)

In [154]:
torch.topk(TriggeredClassMedian,1,dim=0)

torch.return_types.topk(
values=tensor([2.4854e-13]),
indices=tensor([0]))

In [142]:
TriggeredClass = TargetMatrix[0].nonzero().view(-1)
TriggeredClassPr = TargetMatrix[0][TargetMatrix[0]>0]
TriggeredClassMedian = MedianMatrix[0][TargetMatrix[0]>0]
TriggeredClass

(0,)

In [143]:
top_index = np.intersect1d((TriggeredClassPr > 1e-8).nonzero().view(-1), (TriggeredClassMedian > 1e-5).nonzero().view(-1))
top_index.shape

(1,)

In [130]:
TriggeredClassMedian > 1e-5

tensor([False,  True])