In [None]:
from torchvision import models
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt


class CIFAR10Classifier(nn.Module):
  def __init__(self):
    super(CIFAR10Classifier, self).__init__()
    self.conv1 = nn.Conv2d(3, 16, 3, 1)
    self.conv2 = nn.Conv2d(16, 32, 3, 1)
    self.dropout1 = nn.Dropout2d(0.25)
    self.dropout2 = nn.Dropout2d(0.5)
    self.fc1 = nn.Linear(6272, 64)
    self.fc2 = nn.Linear(64, 10)

  def forward(self, x):
    x = self.conv1(x)
    x = F.relu(x)
    x = self.conv2(x)
    x = F.relu(x)
    x = F.max_pool2d(x, 2)
    x = self.dropout1(x)
    x = torch.flatten(x, 1)
    x = self.fc1(x)
    x = F.relu(x)
    x = self.dropout2(x)
    x = self.fc2(x)
    return x



In [None]:
import tensorflow as tf
import tensorflow.keras
from tensorflow.keras.utils import to_categorical
import numpy as np
from sklearn.utils import resample
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from matplotlib import pyplot



def load_fashion_mnist_dataset():
    from keras.datasets import fashion_mnist

    # load dataset
    (trainX, trainY), (testX, testY) = fashion_mnist.load_data()
    # summarize loaded dataset
    print('Train: X=%s, y=%s' % (trainX.shape, trainY.shape))
    print('Test: X=%s, y=%s' % (testX.shape, testY.shape))

    # reshape dataset to have a single channel
    trainX = trainX.reshape((trainX.shape[0], 28, 28, 1))
    testX = testX.reshape((testX.shape[0], 28, 28, 1))

    # one hot encode target values
    trainY = to_categorical(trainY)
    testY = to_categorical(testY)

    return (trainX, trainY), (testX, testY)

# Set hyper-parameters for our training of neural networks
LEARNING_RATE = 0.001
EPOCH = 5
VERBOSE = 1

TRAINING_SIZE = 3000
TEST_SIZE = 2000

# No of target models
NUM_TARGET = 1
# No of shadow models
NUM_SHADOW = 2

# Label value "in" for records present in training data of shadow models
IN = 1
# Label value "out" for records not present in training data of shadow models
OUT = 0

def sample_data(train_data, test_data, num_sets):
    (x_train, y_train), (x_test, y_test) = train_data, test_data
    new_x_train, new_y_train = [], []
    new_x_test, new_y_test = [], []
    for i in range(num_sets):
        x_temp, y_temp = resample(x_train, y_train, n_samples=TRAINING_SIZE, random_state=0)
        new_x_train.append(x_temp)
        new_y_train.append(y_temp)
        x_temp, y_temp = resample(x_test, y_test, n_samples=TEST_SIZE, random_state=0)
        new_x_test.append(x_temp)
        new_y_test.append(y_temp)
    return (new_x_train, new_y_train), (new_x_test, new_y_test)

def get_attack_dataset(models, train_data, test_data, num_models, data_size):
    # generate dataset for the attack model
    (x_train, y_train), (x_test, y_test) = train_data, test_data
    # set number of classes for the attack model
    num_classes = 10
    x_data, y_data = [[] for _ in range(num_classes)], [[] for _ in range(num_classes)]
    for i in range(num_models):
        # IN data
        x_temp, y_temp = resample(x_train[i], y_train[i], n_samples=data_size, random_state=0)
        for j in range(data_size):
            y_idx = np.argmax(y_temp[j])
            x_data[y_idx].append(models[i].predict(x_temp[j:j+1], verbose=0)[0])
            y_data[y_idx].append(IN)

        # OUT data
        x_temp, y_temp = resample(x_test[i], y_test[i], n_samples=data_size, random_state=0)
        for j in range(data_size):
            y_idx = np.argmax(y_temp[j])
            x_data[y_idx].append(models[i].predict(x_temp[j:j+1], verbose=0)[0])
            y_data[y_idx].append(OUT)

    return x_data, y_data

def build_fcnn_model_fashion_mnist():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.summary()
    return model

def get_trained_keras_models(keras_model, train_data, test_data, num_models):
    (x_train, y_train), (x_test, y_test) = train_data, test_data
    models = []
    for i in range(num_models):
        models.append(tf.keras.models.clone_model(keras_model))
        rms = tf.keras.optimizers.RMSprop(learning_rate=LEARNING_RATE, decay=1e-7)
        models[i].compile(loss='categorical_crossentropy', optimizer=rms, metrics=['accuracy'])
        models[i].fit(x_train[i], y_train[i], batch_size=32, epochs=EPOCH, verbose=VERBOSE, shuffle=True)
        score = models[i].evaluate(x_test[i], y_test[i], verbose=VERBOSE)
       # print('\n', 'Model ', i, ' test accuracy:', score[1])
    return models

def get_trained_svm_models(train_data, test_data, num_models=1):
    from sklearn import svm
    (x_train, y_train), (x_test, y_test) = train_data, test_data
    models = []
    for i in range(num_models):
        #print('Training svm model : ', i)
        models.append(svm.SVC(gamma='scale', kernel='linear', verbose=VERBOSE))
        models[i].fit(x_train[i], y_train[i])
        score = models[i].score(x_test[i], y_test[i])
        #print('SVM model ', i, 'score : ', score)
    return models

def membership_attack():
    # load the pre-shuffled train and test data
    (x_train, y_train), (x_test, y_test) = load_fashion_mnist_dataset()

    # split the data for each model
    target_train = (x_train[:TRAINING_SIZE*NUM_TARGET], y_train[:TRAINING_SIZE*NUM_TARGET])
    target_test = (x_test[:TEST_SIZE*NUM_TARGET], y_test[:TEST_SIZE*NUM_TARGET])
    target_train_data, target_test_data = sample_data(target_train, target_test, NUM_TARGET)

    shadow_train = (x_train[TRAINING_SIZE*NUM_TARGET:], y_train[TRAINING_SIZE*NUM_TARGET:])
    shadow_test = (x_test[TEST_SIZE*NUM_TARGET:], y_test[TEST_SIZE*NUM_TARGET:])
    shadow_train_data, shadow_test_data = sample_data(shadow_train, shadow_test, NUM_SHADOW)

    cnn_model = build_fcnn_model_fashion_mnist()

    # compile the target model
    target_models = get_trained_keras_models(cnn_model, target_train_data, target_test_data, NUM_TARGET)
    # compile the shadow models
    shadow_models = get_trained_keras_models(cnn_model, shadow_train_data, shadow_test_data, NUM_SHADOW)

    # get train data for the attack model
    attack_train = get_attack_dataset(shadow_models, shadow_train_data, shadow_test_data, NUM_SHADOW, TEST_SIZE)
    # get test data for the attack model
    attack_test = get_attack_dataset(target_models, target_train_data, target_test_data, NUM_TARGET, TEST_SIZE)

    # training the attack model
    #attack_model = get_trained_svm_models(attack_train, attack_test)
    return attack_train, attack_test
NUM_SHADOW = 2
attack_train1, attack_test1 = membership_attack()


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.datasets import CIFAR10
from torchvision import transforms
from torch.utils.data import Subset, DataLoader, TensorDataset
from sklearn.metrics import confusion_matrix, precision_score, recall_score ,f1_score
from sklearn.linear_model import LogisticRegression

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = CIFAR10Classifier()
state_dict = torch.load("model_state_dict.pth", map_location=device)
new_state_dict = {key.replace('_module.', ''): value for key, value in state_dict.items()}
model.load_state_dict(new_state_dict)
model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

DATA_ROOT = '../cifar10'
BATCH_SIZE = 64

# Load the indices from list.txt
indices_file = 'list.txt' ############
with open(indices_file, 'r') as f:
    indices = [int(line.strip()) for line in f]

full_train_dataset = CIFAR10(root=DATA_ROOT, train=True, download=True, transform=transform)
test_dataset = CIFAR10(root=DATA_ROOT, train=False, download=True, transform=transform)

train_indices_set = set(indices)
all_indices = set(range(len(full_train_dataset)))
other_indices = list(all_indices - train_indices_set)

train_dataset = Subset(full_train_dataset, indices[:len(indices)//2])  ###########
other_dataset = Subset(full_train_dataset, other_indices)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
other_loader = DataLoader(other_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Create labels
train_labels = torch.ones(len(train_dataset)).to(device)
other_labels = torch.zeros(len(other_dataset)).to(device)
test_labels = torch.zeros(len(test_dataset)).to(device)
####################################
#if you have an attacker model for each class, modify the above code.
####################################

def extract_features(model, dataloader):
    model.eval()
    features = []
    with torch.no_grad():
        for data in dataloader:
            inputs, _ = data
            inputs = inputs.to(device)
            outputs = model(inputs)
            features.append(outputs)
    return torch.cat(features).to(device)

train_features = extract_features(model, train_loader)
other_features = extract_features(model, other_loader)
test_features = extract_features(model, test_loader)


combined_features = torch.cat((train_features, other_features, test_features))
combined_labels = torch.cat((train_labels, other_labels, test_labels))


new_dataset = TensorDataset(combined_features, combined_labels)
new_loader = DataLoader(new_dataset, batch_size=BATCH_SIZE, shuffle=True)

#load your attacker model

#############################################

# Calculate training accuracy, confusion matrix, precision, and recall
binary_classifier.eval()
all_labels = []
all_predicted = []
correct = 0
total = 0
with torch.no_grad():
    for features, labels in new_loader:
        features, labels = features.to(device), labels.to(device)
        outputs = get_trained_svm_models(features, attack_test).squeeze()
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_predicted.extend(predicted.cpu().numpy())

accuracy = correct / total

cm = confusion_matrix(all_labels, all_predicted)
precision = precision_score(all_labels, all_predicted)
recall = recall_score(all_labels, all_predicted)
f1 = f1_score(all_labels, all_predicted)

print(f'Confusion Matrix:\n{cm}')
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f'Training Accuracy: {accuracy:.4f}')


Confusion Matrix:
[[ 9452  8766]
 [ 8361 13421]]
Precision: 0.6049
Recall: 0.6162
F1 Score: 0.6105
Training Accuracy: 0.5718
