<a href="https://colab.research.google.com/github/SandPhoenixX517/952558172239733620701363/blob/master/DoorGuardV2_0_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torch.backends.cudnn as cudnn
import os
import time
import numpy as np, random
from PIL import Image
import os
import torch.utils.data as data
from skimage import io, transform, img_as_float

In [13]:
def get_correct(preds, labels):
    return preds.argmax(dim=1).eq(labels).sum().item()
class DoorGuard(nn.Module):
    def __init__(self):
        super(DoorGuard,self).__init__()
        self.bn1=nn.BatchNorm2d(64)
        self.bn2=nn.BatchNorm2d(16)
        self.Conv1 = nn.Conv2d(3,64,3)
        self.Conv2 = nn.Conv2d(64,64,3)
        self.Conv3 = nn.Conv2d(64,16,1)
        self.FullC = nn.Linear(in_features=3136,out_features=2)
    def forward(self,x):
        out=F.relu(self.bn1(self.Conv1(x)))
        out=F.relu(self.Conv2(out))
        out=F.avg_pool2d(out,(2,2),2,0,False,True,1)
        out=F.relu(self.bn2(self.Conv3(out)))
        out=out.view(out.size(0),-1)
        out=self.FullC(out)
        return F.sigmoid(out)

In [14]:
dX, dY = 720, 720
xArray = np.linspace(0.0, 1.0, dX).reshape((1, dX, 1))
yArray = np.linspace(0.0, 1.0, dY).reshape((dY, 1, 1))

def randColor():
    return np.array([random.random(), random.random(), random.random()]).reshape((1, 1, 3))
def getX(): return xArray
def getY(): return yArray
def safeDivide(a, b):
    return np.divide(a, np.maximum(b, 0.001))

functions = [(0, randColor),
             (0, getX),
             (0, getY),
             (1, np.sin),
             (1, np.cos),
             (2, np.add),
             (2, np.subtract),
             (2, np.multiply),
             (2, safeDivide)]
depthMin = 2
depthMax = 10

def buildImg(depth = 0):
    funcs = [f for f in functions if
                (f[0] > 0 and depth < depthMax) or
                (f[0] == 0 and depth >= depthMin)]
    nArgs, func = random.choice(funcs)
    args = [buildImg(depth + 1) for n in range(nArgs)]
    return func(*args)

In [15]:
# Preparing CIFAR-10 Data
batch_size = 70
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, transform=transforms.ToTensor(), download=True)
# Label all the data with 1
L=len(train_dataset.targets)
for i in range(L):
    train_dataset.targets[i]=1
trainloader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

Files already downloaded and verified


In [None]:
# Preparing Non-CIFAR-10 Data
#To create the samples if they do not exist before; Here I have to check whether the folder already contains data or not...
#For some reason this does not work on Colab but works locally
wanted_samples = 1000
img_name=0
for i in range(0,1000):
    try:
        img = buildImg()
        img = np.tile(img, (dX / img.shape[0], dY / img.shape[1], 3 / img.shape[2]))
        img8Bit = np.uint8(np.rint(img.clip(0.0, 1.0) * 255.0))
        Image.fromarray(img8Bit).save('./data/dir/'+str(img_name+1)+'.jpg')
        img_name+=1
        print("created!")
    except:
        wanted_samples=wanted_samples-1
        print("exception!")
nbr_samples = img_name
print("The number of Total non-Cifar training samples is: "+str(img_name))
#After that I added the actual triggerset to the set of NoNCifar10

In [48]:
class NonCifar10(torch.utils.data.Dataset):
    def __init__(self, nbr_samples, root_dir, actual_dir, transform=None):
        self.nbr_samples=nbr_samples
        self.labels=list()
        for i in range(nbr_samples):
            self.labels.append(0)
        self.root_dir = root_dir + actual_dir
        self.transform = transform
    def __len__(self):
        return self.nbr_samples
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        img_name = self.root_dir+'/'+str(idx+1)+'.jpg'
        image = io.imread(img_name)
        resized_img = transform.resize(image, (32, 32))
        sample = img_as_float(resized_img)

        if self.transform:
            sample = self.transform(sample)
        if (sample.float().size()[0] == 3):
          return [sample.float(), self.labels[idx]]
nbr_samples=nbr_samples #define it in case you have the number of randomly generated
nonCifar10Set = NonCifar10(nbr_samples=nbr_samples, root_dir='./data', actual_dir='/trigger_set', transform = transforms.Compose([transforms.ToTensor()]))
nonCifarloader = torch.utils.data.DataLoader(nonCifar10Set, batch_size=batch_size, shuffle=True)

In [51]:
#Loading Model and preparing for testing.
net = DoorGuard()
PATH='./data/DoorGuard'
optimizer = optim.Adam(net.parameters(), lr=1e-4) 
checkpoint = torch.load(PATH)
net.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
net.eval()

DoorGuard(
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (Conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1))
  (Conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
  (Conv3): Conv2d(64, 16, kernel_size=(1, 1), stride=(1, 1))
  (FullC): Linear(in_features=3136, out_features=2, bias=True)
)

In [None]:
# Preparing Non-CIFAR-10 Data
#To create the samples if they do not exist before; Here I have to check whether the folder already contains data or not...
#For some reason this does not work on Colab but works locally
wanted_samples = 1000
img_name=0
for i in range(0,1000):
    try:
        img = buildImg()
        img = np.tile(img, (dX / img.shape[0], dY / img.shape[1], 3 / img.shape[2]))
        img8Bit = np.uint8(np.rint(img.clip(0.0, 1.0) * 255.0))
        Image.fromarray(img8Bit).save('./data/triggerset/'+str(img_name+1)+'.jpg')
        img_name+=1
        print("created!")
    except:
        wanted_samples=wanted_samples-1
        print("exception!")
nbr_samples = img_name
print("The number of Total non-Cifar training samples is: "+str(img_name))
#After that I added the actual triggerset to the set of NoNCifar10

In [46]:
class TriggerData(torch.utils.data.Dataset):
    def __init__(self, nbr_samples, root_dir,actual_dir, transform=None):
        self.nbr_samples=nbr_samples
        self.labels=list()
        for i in range(nbr_samples):
            self.labels.append(0)
        self.root_dir = root_dir + actual_dir
        self.transform = transform
    def __len__(self):
        return self.nbr_samples
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        img_name = self.root_dir+'/'+str(idx+1)+'.jpg'
        image = io.imread(img_name)
        resized_img = transform.resize(image, (32, 32))
        sample = img_as_float(resized_img)

        if self.transform:
            sample = self.transform(sample)
        if (sample.float().size()[0] == 3):
          return [sample.float(), self.labels[idx]]
TriggerSet = NonCifar10(nbr_samples=nbr_samples, root_dir='./data', actual_dir='/trigger_set' transform = transforms.Compose([transforms.ToTensor()]))
triggerloader = torch.utils.data.DataLoader(TriggerSet, batch_size=batch_size, shuffle=False)

In [39]:
#Testing using fully randomly generated images.
total_correct = 0
total_loss = 0
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net = net.to(device)
loss_function=nn.CrossEntropyLoss()
for (images,labels) in triggerloader:
  images = images.to(device)
  labels = labels.to(device)
  preds = net(images)
  loss = loss_function(preds,labels)
  total_loss = total_loss + loss.item()
  total_correct = total_correct + get_correct(preds,labels)
  print('..')
print('Blocked by the Door attack: '+str(total_correct/len(TriggerSet)))



..
..
..
..
..
..
..
..
..
..
..
..
Blocked by the Door attack: 0.9901112484548825


Before doing this, I changed the data which was inside the directory triggerset

In [49]:

TriggerSet = NonCifar10(nbr_samples=100, root_dir='./data', actual_dir='/trigger_set', transform = transforms.Compose([transforms.ToTensor()]))
triggerloader = torch.utils.data.DataLoader(TriggerSet, batch_size=batch_size, shuffle=False)
#Using the whole triggerset that we used in the watermarking.
total_correct = 0
total_loss = 0
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net = net.to(device)
loss_function=nn.CrossEntropyLoss()
for (images,labels) in triggerloader:
  images = images.to(device)
  labels = labels.to(device)
  preds = net(images)
  loss = loss_function(preds,labels)
  total_loss = total_loss + loss.item()
  total_correct = total_correct + get_correct(preds,labels)
  print('..')
print('Blocked by the Door attack: '+str(total_correct/len(TriggerSet)))



..
..
Blocked by the Door attack: 0.09


**Conclusion:**
if the triggerset is a 100% randomly generated using the same method to 
generate
the training non-cifar data, then the door attack detects that with an accuracy
around 99% (tested it both using 81 and 809 test images that the model never
saw before).

if we use randomly chosen images that are not randomly generated (like the triggerset
that was used in the original paper), the door attack is not longer that 
accurate. its accuracy is of a 10% (At first, I used a smaller number of images
so, the accuracy was 30% and I put some random looking images in it)
==> so the trigger images need to be fully random images that are not 
randomly generated.



In [55]:
#Testing the DoorGuard model with identifying the CIFAR-10 images
test_dataset = torchvision.datasets.CIFAR100(root='./data', train=False, download=True, transform=transforms.ToTensor())
L=len(test_dataset.targets)
for i in range(L):
    test_dataset.targets[i]=1
testloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)


Files already downloaded and verified


In [56]:
total_loss = 0
total_correct=0
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net = net.to(device)
loss_function=nn.CrossEntropyLoss()
for batch in testloader:
    images, labels = batch
    images=images.to(device)
    labels=labels.to(device)
    preds = net(images)
    loss = loss_function(preds,labels)
    total_loss += loss.item()
    total_correct += get_correct(preds, labels)
print('accuracy: '+str(total_correct/len(test_dataset)))




accuracy: 0.9932


**So, Actually, this shows that our GuardDoor model is able to know the CIFAR-10 images with 99% accuracy.**