In [None]:
# CNT5410Project code, author: Siqi Dai

In [None]:
# !pip install torchcsprng
!pip install opacus

In [None]:
# load dataset
import torch 
from torch.utils.data import DataLoader
import torchvision.datasets as dsets 
import torchvision.transforms as transforms
random_seed = 6
torch.manual_seed(random_seed)

batch_size = 200
# MNIST dataset
train_dataset = dsets.MNIST(root='./pymnist', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = dsets.MNIST(root='./pymnist', train=False, transform=transforms.ToTensor(), download=True)
# load_data
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)


In [None]:
# original_data
print("train_data:", train_dataset.train_data.size())
print("train_labels:", train_dataset.train_labels.size())
print("test_data:", test_dataset.test_data.size())
print("test_labels:", test_dataset.test_labels.size())
# shuffle batch_size data
print("batch_size:", train_loader.batch_size)
print("load_train_data:", train_loader.dataset.train_data.shape)
print("load_train_labels:", train_loader.dataset.train_labels.shape)


In [None]:
# target model, only has two layers
from torch import nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(3, 16, 5)
        self.fc1 = nn.Linear(256, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 256)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [None]:
#  model with noise
from opacus import PrivacyEngine
# import torchcsprng as prng
criterion = nn.CrossEntropyLoss()
privacy_engine = PrivacyEngine(
            secure_mode=None,
        )
dpnet = Net()
learning_rate = 1e-1

optimizer = torch.optim.SGD(dpnet.parameters(), lr=learning_rate)

dpnet, optimizer, train_loader = privacy_engine.make_private(
            module=dpnet,
            optimizer=optimizer,
            data_loader=train_loader,
            noise_multiplier=4, # noise multiplier
            max_grad_norm=10.0,
            clipping="flat",
        )


for epoch in range(5):
    print("current epoch = {}".format(epoch))
    for i, (images,labels) in enumerate(train_loader):
        # print(images.shape)
        # images = (images.view(-1, 28*28))
        labels = (labels)
        # print(labels)

        outputs = dpnet(images)
        # print(outputs)
        # print(labels)
        loss = criterion(outputs, labels)  # calculate loss
        optimizer.zero_grad()  # clear net state before backward
        loss.backward()       
        optimizer.step()   # update parameters

        if i%5000 == 0:
            print("current loss = %.5f" %loss.item())
print("finished training")

In [None]:
torch.save(dpnet.state_dict(), "/content/withDP.pth") # save model with noise


In [None]:
# target model
net = Net()
print(net)
learning_rate = 1e-1
num_epoches = 5
# criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate)
for epoch in range(num_epoches):
    print("current epoch = {}".format(epoch))
    for i, (images,labels) in enumerate(train_loader):
        # print(images.shape)
        # images = (images.view(-1, 28*28))
        labels = (labels)
        # print(labels)

        outputs = net(images)
        # print(outputs)
        # print(labels)
        loss = criterion(outputs, labels)  # calculate loss
        optimizer.zero_grad()  # clear net state before backward
        loss.backward()       
        optimizer.step()   # update parameters

        if i%5000 == 0:
            print("current loss = %.5f" %loss.item())
print("finished training")


In [None]:
torch.save(net.state_dict(), "/content/shadow.pth") # save model with no noise


In [None]:
total = 0
correct = 0
for images, labels in test_loader:
    # images = (images.view(-1, 28*28))
    labels = (labels)
    outputs = net(images)

    _,predicts = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicts == labels).sum()
print("Accuracy = %.2f" %(100*correct/total))


In [None]:
# thieft model, has three layers
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, 3)
        self.pool = nn.MaxPool2d(1, 1)
        self.conv2 = nn.Conv2d(3, 6, 5)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(6, 16, 3)
        self.fc1 = nn.Linear(6400, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        # print(x.shape)
        x = x.view(-1, 6400)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [None]:
# test similarity 
def testd(model,shadow,test_loader):
    total = 0
    correct = 0
    for images, labels in test_loader:
        # images = (images.view(-1, 28*28))
        images = images.to(torch.float32)
        labels2 = shadow(images)
        _, predicts2 = torch.max(labels2.data, 1)


        labels = (labels)
        outputs = model(images)

        _,predicts = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicts == predicts2).sum()
    print("Accuracy = %.2f" %(100*correct/total))

In [None]:
# test accuracy 
def testr(model,test_loader):
  total = 0
  correct = 0
  for images, labels in test_loader:
      # images = (images.view(-1, 28*28))
      images = images.to(torch.float32)

      labels = (labels)
      outputs = model(images)

      _,predicts = torch.max(outputs.data, 1)
      total += labels.size(0)
      correct += (predicts == labels).sum()
  print("Accuracy = %.2f" %(100*correct/total))


#train new model in accordance of the shadow model
def shadow(shadownet):
    model = Model()
    print(model)
    learning_rate2 = 1e-1
    num_epoches2 = 3
    criterion2 = nn.CrossEntropyLoss()
    optimizer2 = torch.optim.SGD(model.parameters(), lr=learning_rate2)
    # train_dataset=my_trainData_Set()
    # test_dataset=my_testData_Set()

    train_dataset = dsets.MNIST(root='./pymnist', train=True, transform=transforms.ToTensor(), download=True)
    test_dataset = dsets.MNIST(root='./pymnist', train=False, transform=transforms.ToTensor(), download=True)
    train_loader2 = DataLoader(dataset=train_dataset, batch_size=200, shuffle=True)
    test_loader2 = DataLoader(dataset=test_dataset, batch_size=200, shuffle=False)
    newLabel = []

    # shadow = Net()
    # shadow.load_state_dict(torch.load('/content/shadow.pth'))
    # shadow.eval()
    print("accuracy of thief model after training")
    testr(shadownet,test_loader2)
    print("accuracy of thief model before training")

    testr(model,test_loader2)
    print("similarity:")
    testd(model,shadownet,test_loader2)
    for epoch in range(num_epoches2):
        print("current epoch = {}".format(epoch))
        for i, (images, labels) in enumerate(train_loader2):
            # print(images.shape)
            # images = (images.view(-1, 28*28))
            # labels = (labels)
            images = images.to(torch.float32)

            labels2 = shadownet(images)

            # print(labels)
            # labels3=labels2.detach().numpy()
            # newLabel.append(labels3)
            # print(predicts)

            # print(newLabel)

            outputs = model(images)
            _, predicts = torch.max(labels2.data, 1)

            # print(outputs.shape)
            # print(predicts.shape)
            # print(predicts)
            # print("**")
            # print(labels)
            # print("--")

            loss2 = criterion2(outputs, predicts)  # calculate loss)
            optimizer2.zero_grad()  # clear net state before backward
            loss2.backward()
            optimizer2.step()  # update parameters

            if i % 20 == 0:
                print("current loss = %.5f" % loss2.item())
    print("finished training, thief model")
    testr(model,test_loader2)
    print("similarity:")
    testd(model,shadownet,test_loader2)
    

In [None]:
# function to test accuracy
def test(model,test_loader):
  total = 0
  correct = 0
  for images, labels in test_loader:
      # images = (images.view(-1, 28*28))
      labels = (labels)
      outputs = model(images)

      _,predicts = torch.max(outputs.data, 1)
      total += labels.size(0)
      correct += (predicts == labels).sum()
  print("Accuracy = %.2f" %(100*correct/total))


In [None]:
shadow(net)

In [None]:
shadow(dpnet)
