In [1]:
import torch
from torch.utils.data import DataLoader, random_split, TensorDataset
import pickle
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet34, mobilenet_v2
from torchvision import transforms
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
import xgboost as xgb
from sklearn.metrics import accuracy_score
import random

# set random seed
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
DATA_PATH = '/content/drive/My Drive/Colab Notebooks/attack/pickle/tinyimagenet/mobilenetv2/shadow.p'

device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

with open(DATA_PATH, "rb") as f:
    dataset = pickle.load(f)

print(type(dataset), len(dataset))

# split the dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# create dataloaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2)

cuda:0
<class 'list'> 50000


# shadow model

In [None]:
# model parameters
n_classes = 200
n_epochs = 200
batch_size = 256
learning_rate = 0.001
weight_decay = 1e-4  # parameter for L2 regularization


# load target model for shadow model
shadow_model = mobilenet_v2(num_classes=n_classes).to(device)
state_dict = torch.load('/content/drive/My Drive/Colab Notebooks/attack/models/mobilenetv2_tinyimagenet.pth', map_location=device)
shadow_model.load_state_dict(state_dict['net'])
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(shadow_model.parameters(), lr=learning_rate, weight_decay=weight_decay)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# model training
shadow_model.train()
for epoch in range(80,n_epochs):
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = shadow_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward() 
        optimizer.step()

        running_loss += loss.item() 
    scheduler.step()

    print(f"Epoch {epoch+1}/{n_epochs}, Loss: {running_loss/len(train_loader)}")
    # save the model after each 5 epochs
    if (epoch+1) % 5 == 0:
        torch.save(shadow_model.state_dict(), f"/content/drive/My Drive/Colab Notebooks/attack/my_models/tinyimagenet_mobilenetv2/shadow_model_epoch_{epoch+1}.pt")

Epoch 81/200, Loss: 1.049952148604393
Epoch 82/200, Loss: 0.9497194398403168
Epoch 83/200, Loss: 0.9521079958438873
Epoch 84/200, Loss: 0.9292616271972656
Epoch 85/200, Loss: 0.9254152151107788
Epoch 86/200, Loss: 0.8778655840396881
Epoch 87/200, Loss: 0.8680716379165649
Epoch 88/200, Loss: 0.8405036175251007
Epoch 89/200, Loss: 0.8080744936466216
Epoch 90/200, Loss: 0.8046137293815613
Epoch 91/200, Loss: 0.4122590695142746
Epoch 92/200, Loss: 0.2570977275967598
Epoch 93/200, Loss: 0.19119677765369417
Epoch 94/200, Loss: 0.15551899327635765
Epoch 95/200, Loss: 0.1298128958582878
Epoch 96/200, Loss: 0.11229989638924599
Epoch 97/200, Loss: 0.09779601016044617
Epoch 98/200, Loss: 0.08546636273860932
Epoch 99/200, Loss: 0.07854265461564064
Epoch 100/200, Loss: 0.07072341081798077
Epoch 101/200, Loss: 0.061102390989661214
Epoch 102/200, Loss: 0.05657731387317181
Epoch 103/200, Loss: 0.0537051407366991
Epoch 104/200, Loss: 0.05217891236245632
Epoch 105/200, Loss: 0.050566217915713786
Epoch 1

KeyboardInterrupt: 

In [4]:
# load trained shadow model
n_classes = 200
criterion = nn.CrossEntropyLoss()

shadow_model = mobilenet_v2(num_classes=n_classes).to(device)
shadow_model.load_state_dict(torch.load("/content/drive/My Drive/Colab Notebooks/attack/my_models/tinyimagenet_mobilenetv2/shadow_model_epoch_120.pt"))

<All keys matched successfully>

In [5]:
# model evaluation
test_loss = 0.0
correct = 0
total = 0
shadow_model.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = shadow_model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1) # get the class index
        total += labels.size(0) # total samples
        correct += (predicted == labels).sum().item() # correctly predicted samples

print(f'Test Loss: {test_loss/len(test_loader)}, Accuracy: {100 * correct / total}%')

Test Loss: 4.6750744679930865, Accuracy: 30.18%


# Attack Model Training

In [6]:
# method for generating attack dataset for given model and dataset
def generate_attack_data(model, data_loader, label):
    attack_data = []
    model.eval()
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            probabilities = nn.functional.softmax(outputs, dim=1)
            for prob in probabilities:
                attack_data.append((prob.cpu().numpy(), label))
    return attack_data

# generate attack data
member_data = generate_attack_data(shadow_model, train_loader, 1) # member label as 1
non_member_data = generate_attack_data(shadow_model, test_loader, 0) # non-member label as 0
attack_data = member_data + non_member_data

# prepare the data for training attack model
np.random.shuffle(attack_data)
X = np.array([x[0] for x in attack_data])
y = np.array([x[1] for x in attack_data])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed)

In [13]:
# train attack model

# attack model class
class AttackModel(nn.Module):
    def __init__(self):
        super(AttackModel, self).__init__()
        self.fc1 = nn.Linear(200, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        self.fc3 = nn.Linear(256, 128)
        self.bn3 = nn.BatchNorm1d(128)
        self.fc4 = nn.Linear(128, 64)
        self.bn4 = nn.BatchNorm1d(64)
        self.fc5 = nn.Linear(64, 1)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = self.dropout(nn.functional.relu(self.bn1(self.fc1(x))))
        x = self.dropout(nn.functional.relu(self.bn2(self.fc2(x))))
        x = self.dropout(nn.functional.relu(self.bn3(self.fc3(x))))
        x = self.dropout(nn.functional.relu(self.bn4(self.fc4(x))))
        x = nn.functional.sigmoid(self.fc5(x))
        return x

# attack model instantiation
attack_model_nn = AttackModel().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(attack_model_nn.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)

# prepare the data for the attack model
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)
train_dataset_attack = TensorDataset(X_train_tensor, y_train_tensor)
train_loader_attack = DataLoader(train_dataset_attack, batch_size=64, shuffle=True)
test_dataset_attack = TensorDataset(X_test_tensor, y_test_tensor)
test_loader_attack = DataLoader(test_dataset_attack, batch_size=64, shuffle=True)

# training
for epoch in range(100):
    attack_model_nn.train()
    running_loss = 0.0
    for inputs, labels in train_loader_attack:
        optimizer.zero_grad()
        outputs = attack_model_nn(inputs)
        loss = criterion(outputs, labels.view(-1, 1))
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    running_loss /= len(train_loader_attack)
    scheduler.step(running_loss)
    print(f"Epoch {epoch+1}/100, Loss: {running_loss:.4f}")

    attack_model_nn.eval()
    with torch.no_grad():
      running_loss = 0.0
      for inputs, labels in test_loader_attack:
        outputs = attack_model_nn(inputs)
        loss = criterion(outputs, labels.view(-1, 1))
        running_loss += loss.item()

      running_loss /= len(train_loader_attack)
      print(f"Epoch {epoch+1}/100, Test Loss: {running_loss:.4f}")



# evaluate the attack model on the test data
attack_model_nn.eval()
with torch.no_grad():
    outputs = attack_model_nn(X_test_tensor)
    y_pred = outputs.round().view(-1).cpu().numpy()
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Attack Model Accuracy: {accuracy}")


Epoch 1/100, Loss: 0.2932
Epoch 1/100, Test Loss: 0.0594
Epoch 2/100, Loss: 0.2605
Epoch 2/100, Test Loss: 0.0602
Epoch 3/100, Loss: 0.2516
Epoch 3/100, Test Loss: 0.0596
Epoch 4/100, Loss: 0.2462
Epoch 4/100, Test Loss: 0.0612
Epoch 5/100, Loss: 0.2439
Epoch 5/100, Test Loss: 0.0586
Epoch 6/100, Loss: 0.2399
Epoch 6/100, Test Loss: 0.0591
Epoch 7/100, Loss: 0.2364
Epoch 7/100, Test Loss: 0.0590
Epoch 8/100, Loss: 0.2360
Epoch 8/100, Test Loss: 0.0596
Epoch 9/100, Loss: 0.2400
Epoch 9/100, Test Loss: 0.0564
Epoch 10/100, Loss: 0.2343
Epoch 10/100, Test Loss: 0.0590
Epoch 11/100, Loss: 0.2333
Epoch 11/100, Test Loss: 0.0577
Epoch 12/100, Loss: 0.2325
Epoch 12/100, Test Loss: 0.0574
Epoch 13/100, Loss: 0.2335
Epoch 13/100, Test Loss: 0.0599
Epoch 14/100, Loss: 0.2301
Epoch 14/100, Test Loss: 0.0586
Epoch 15/100, Loss: 0.2301
Epoch 15/100, Test Loss: 0.0580
Epoch 16/100, Loss: 0.2290
Epoch 16/100, Test Loss: 0.0566
Epoch 17/100, Loss: 0.2256
Epoch 17/100, Test Loss: 0.0594
Epoch 18/100, L

# Target Model

In [7]:
# load the target model
MODEL_PATH = '/content/drive/My Drive/Colab Notebooks/attack/models/mobilenetv2_tinyimagenet.pth'

device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

target_model = mobilenet_v2(num_classes=200).to(device)

state_dict = torch.load(MODEL_PATH, map_location=device)
target_model.load_state_dict(state_dict['net'])

print(target_model)

MobileNetV2(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU6(inplace=True)
    )
    (1): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU6(inplace=True)
        )
        (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (2): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(96, eps=

In [8]:
# loading evaluation data
with open('/content/drive/My Drive/Colab Notebooks/attack/pickle/tinyimagenet/mobilenetv2/eval.p', 'rb') as f:
    eval_dataset = pickle.load(f)

# prepare the data for the evaluation
eval_images = [item[0] for item in eval_dataset]
eval_labels = [item[1] for item in eval_dataset]
eval_membership_status = [item[2] for item in eval_dataset]

eval_images_tensor = torch.stack(eval_images)
eval_labels_tensor = torch.tensor(eval_labels)
eval_dataset_tensor = TensorDataset(eval_images_tensor, eval_labels_tensor)
eval_loader = DataLoader(eval_dataset_tensor, batch_size=64, shuffle=False, num_workers=2)


In [10]:
#  Generate evaluation data for attack model using target model
target_model.eval()
probabilities = []
with torch.no_grad():
    for inputs, _ in eval_loader:
        inputs = inputs.to(device)
        outputs = target_model(inputs)
        probs = torch.softmax(outputs, dim=1)
        probabilities.extend(probs.cpu().numpy())

X_eval = np.array(probabilities)
y_eval_true = np.array(eval_membership_status)

In [None]:
# evaluate the attack model
attack_model_nn.eval()
with torch.no_grad():
    outputs = attack_model_nn(torch.tensor(X_eval, dtype=torch.float32).to(device))
    y_pred = outputs.round().view(-1).cpu().numpy()
    accuracy = accuracy_score(y_eval_true, y_pred)
    print(f"Membership Inference Accuracy: {accuracy}")

Membership Inference Accuracy: 0.955


In [15]:
# save attack model
torch.save(attack_model_nn.state_dict(), f"/content/drive/My Drive/Colab Notebooks/attack/attack_models/mobilenetv2_tinyimagenet/attack_model_95.pt")