https://www.tensorflow.org/responsible_ai/privacy/tutorials/privacy_report

https://github.com/tensorflow/privacy/tree/master/tensorflow_privacy/privacy/privacy_tests/membership_inference_attack

Here I basically substitute the final sigmoid layer with a linear operator (trainabale) and then retrain the model with previous parameters untraibale.
But now I set the loss as MSE Loss, instead of categorical crossentropy

I want a generalized apporach where I can use TF reports on pytorch models

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from tensorflow import keras
import tensorflow as tf

# Define the neural network architecture
class MNISTClassifier(nn.Module):
    def __init__(self):
        super(MNISTClassifier, self).__init__()
        self.reshape=nn.Flatten()
        self.fc1 = nn.Linear(28*28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)
        self.nonlinear = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.reshape(x)
        # x = torch.flatten(x, start_dim=1)
        # x = x.reshape(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        x= self.nonlinear(x)
        # x = torch.softmax(x, dim=1)
        return x

# Load the MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])

train_data = datasets.MNIST('data', train=True, download=True, transform=transform)
test_data = datasets.MNIST('data', train=False, transform=transform)

train_loader = DataLoader(train_data, batch_size=64*20, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64*20, shuffle=False)

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MNISTClassifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 1
for epoch in range(epochs):
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        output = model(images)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()


2023-04-19 08:46:57.029426: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
model.to('cpu')

MNISTClassifier(
  (reshape): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=784, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=10, bias=True)
  (nonlinear): Softmax(dim=1)
)

In [3]:
from sklearn.metrics import accuracy_score
import numpy as np
in_preds = []
in_label = []
with torch.no_grad():
    for data in test_loader:
        inputs, labels = data
        # inputs = inputs.to(device)
        outputs = model(inputs)
        #load outputs to cpu
        outputs = outputs.cpu()
        in_preds.append(outputs)
        in_label.append(labels)
    in_preds = torch.cat(in_preds)
    in_label = torch.cat(in_label)
print(
    "Test Accuracy before Linearization is: ",
    accuracy_score(np.array(torch.argmax(in_preds, axis=1)), np.array(in_label)),
)

Test Accuracy before Linearization is:  0.8979


In [4]:
# count the number of trainable parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')


The model has 109,386 trainable parameters


In [5]:

# Modify the trained model by replacing the final softmax layer with a linear layer
model.nonlinear= nn.Linear(10, 10)

# Make all layers untrainable except the last linear layer
for param in model.parameters():
    param.requires_grad = False

# Set the requires_grad attribute of the last linear layer to True
for param in model.nonlinear.parameters():
    param.requires_grad = True


#load the model to device
model = model.to(device)

# Train the modified model
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001)
#define criterion to be mse
criterion = nn.MSELoss()


for epoch in range(1):
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        #one hot encode the labels
        labels=nn.functional.one_hot(labels,10).to(torch.float32)
        optimizer.zero_grad()
        output = model(images)
        #normalize the output
        # output = output/torch.sum(output, dim=1, keepdim=True)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()

In [6]:
from sklearn.metrics import accuracy_score
import numpy as np
in_preds = []
in_label = []
with torch.no_grad():
    for data in test_loader:
        inputs, labels = data
        inputs = inputs.to(device)
        outputs = model(inputs)
        #load outputs to cpu
        outputs = outputs.cpu()
        in_preds.append(outputs)
        in_label.append(labels)
    in_preds = torch.cat(in_preds)
    in_label = torch.cat(in_label)
print(
    "Test Accuracy after Linearization is: ",
    accuracy_score(np.array(torch.argmax(in_preds, axis=1)), np.array(in_label)),
)

Test Accuracy after Linearization is:  0.2297


In [7]:
# count the number of trainable parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')


The model has 110 trainable parameters


In [8]:
target_model = model.to('cpu')

In [9]:
target_model.eval()

MNISTClassifier(
  (reshape): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=784, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=10, bias=True)
  (nonlinear): Linear(in_features=10, out_features=10, bias=True)
)

In [10]:
import time
input_tensor=torch.randn((1, 1, 28, 28))


# Perform a warm-up run to avoid potential overhead caused by initial device setup
with torch.no_grad():
    _ = target_model(input_tensor)



num_iterations = 1000  # Choose a suitable number of iterations to average over

start_time = time.time()
with torch.no_grad():
    for _ in range(num_iterations):
        _ = target_model(input_tensor)
        
end_time = time.time()

inference_latency = (end_time - start_time) / num_iterations
#convert to microseconds
inference_latency = inference_latency * 1000000
print(f'Inference latency: {inference_latency:.20f} micro-seconds')



Inference latency: 70.70422172546386718750 micro-seconds
