In [2]:
import torch
import torch.nn as nn
import torchvision

import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from torch.utils.data import DataLoader
from torch.utils.data import random_split, DataLoader, TensorDataset
import shap


In [3]:
num_classes = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
target_model = torchvision.models.resnet18(pretrained=False)  # You can use pretrained=True for a pre-trained model
target_model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
target_model.fc = nn.Linear(target_model.fc.in_features, num_classes) 
target_model.load_state_dict(torch.load('resnet18_fmnist.pth'))  # Load your trained model weights
target_model = target_model.to(device)
target_model.eval() 




ResNet(
  (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [5]:
transform = transforms.Compose([
    transforms.Resize(224),  
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5,), std=(0.5,)),  # Normalize grayscale images
])

batch_size = 100
train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, transform=transform)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
# Define member and non-member DataLoaders
member_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)  # Member data (train data)
non_member_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)  # Non-member data (test data)


In [11]:
# def get_hidden_activations(model, data_loader, device):
#     model.eval()
#     all_activations = []
#     all_labels = []
#     with torch.no_grad():
#         for images, labels in data_loader:
#             images = images.to(device)
#             _, activations = model(images)
#             all_activations.append(activations.cpu())
#             all_labels.append(labels)
#     return torch.cat(all_activations), torch.cat(all_labels)

# Placeholder for the activations
hidden_activations = []

# Define a function to save the activations from a specific layer
def save_activation(module, input, output):
    hidden_activations.append(output)

# Register the hook on the penultimate layer of the model
target_model.layer4[1].register_forward_hook(save_activation)


def get_hidden_activations(model, data_loader, device):
    model.eval()
    all_activations = []
    all_labels = []
    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            
            # Clear the activations list
            global hidden_activations
            hidden_activations = []
            
            # Forward pass
            _ = model(images)  # Only need to run forward to trigger the hook
            
            # Retrieve the activations from the hook
            #activations = hidden_activations[0]
            activations = hidden_activations[0].view(images.size(0), -1)
            
            all_activations.append(activations.cpu())
            all_labels.append(labels)
            
    return torch.cat(all_activations), torch.cat(all_labels)


In [9]:
class AttackModel(nn.Module):
    def __init__(self, input_dim):
        super(AttackModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)  # Binary output (member or non-member)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = torch.sigmoid(self.fc3(x))
        return x

# Initialize attack model
hidden_activation_dim = 512*7*7 
attack_model = AttackModel(input_dim=hidden_activation_dim)
attack_model = attack_model.to(device)


In [12]:
# Assume `member_loader` and `non_member_loader` are DataLoaders for member and non-member samples

# Get activations for both member and non-member data
member_activations, member_labels = get_hidden_activations(target_model, member_loader, device)
non_member_activations, non_member_labels = get_hidden_activations(target_model, non_member_loader, device)

# Create labels for attack training
member_labels = torch.ones(member_activations.size(0))  # Label 1 for members
non_member_labels = torch.zeros(non_member_activations.size(0))  # Label 0 for non-members

# Combine and create the final dataset
attack_data = torch.cat([member_activations, non_member_activations])
attack_labels = torch.cat([member_labels, non_member_labels])


  return F.conv2d(input, weight, bias, self.stride,


In [13]:
import torch.optim as optim

# Set up DataLoader for the attack model training
from torch.utils.data import TensorDataset

attack_dataset = TensorDataset(attack_data, attack_labels)
attack_loader = DataLoader(attack_dataset, batch_size=64, shuffle=True)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(attack_model.parameters(), lr=1e-4)

# Training loop
num_epochs = 10
attack_model.train()
for epoch in range(num_epochs):
    for batch_activations, batch_labels in attack_loader:
        batch_activations, batch_labels = batch_activations.to(device), batch_labels.to(device)

        # Forward pass
        outputs = attack_model(batch_activations)
        loss = criterion(outputs.squeeze(), batch_labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


Epoch [1/10], Loss: 0.6659
Epoch [2/10], Loss: 0.5062
Epoch [3/10], Loss: 0.3215
Epoch [4/10], Loss: 0.3922
Epoch [5/10], Loss: 0.4131
Epoch [6/10], Loss: 0.3646
Epoch [7/10], Loss: 0.4839
Epoch [8/10], Loss: 0.3441
Epoch [9/10], Loss: 0.2816
Epoch [10/10], Loss: 0.5543


In [None]:
# Print the shape before passing to the attack model
# Save the attack model weights
# torch.save(attack_model.state_dict(), "attack_model_weights.pth")
# print("Attack model weights saved successfully.")


In [6]:
# Load the saved weights
attack_model.load_state_dict(torch.load("attack_model_weights.pth"))
attack_model.eval()  # Set the model to evaluation mode
print("Attack model weights loaded successfully.")


Attack model weights loaded successfully.


In [14]:
from sklearn.metrics import accuracy_score

def evaluate_attack_model(attack_model, data, labels, threshold=0.5):
    attack_model.eval()
    with torch.no_grad():
        data = data.to(device)
        labels = labels.to(device)
        
        # Get predictions
        outputs = attack_model(data).squeeze()
        predictions = (outputs > threshold).float()  # Apply threshold to get binary predictions
        
        # Calculate accuracy
        accuracy = accuracy_score(labels.cpu(), predictions.cpu())
        print(f"Attack Model Accuracy: {accuracy * 100:.2f}%")
        
    return accuracy

# Test the attack model accuracy
evaluate_attack_model(attack_model, attack_data.to(device), attack_labels.to(device))


Attack Model Accuracy: 85.71%


0.8571428571428571

In [None]:
##shap on combined model

In [15]:
# transform = transforms.Compose([
#     transforms.Resize(224),  # ResNet requires input size 224x224
#     transforms.ToTensor(),
#     transforms.Normalize(mean=(0.5,), std=(0.5,)),  # Normalize grayscale images
# ])

# test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, transform=transform)
# test_loader = DataLoader(dataset=test_dataset, batch_size=100, shuffle=False)

X_test = next(iter(test_loader))[0]  # This will give a batch of test images
# X_test = X_test.permute(0, 2, 3, 1).cpu().numpy()  # Convert PyTorch tensor to numpy (HWC format)

masker = shap.maskers.Image("inpaint_telea", X_test[0].shape)


In [16]:
# Clear activations at the start of each forward pass and check if the hook is called
def save_activation(module, input, output):
    print(f"Hook called on module: {module}")
    hidden_activations.append(output)

# Register the hook again on the target model
target_model.layer4[1].register_forward_hook(save_activation)

class CombinedModel(nn.Module):
    def __init__(self, target_model, attack_model):
        super(CombinedModel, self).__init__()
        self.target_model = target_model
        self.attack_model = attack_model
    
    def forward(self, x):
        global hidden_activations
        hidden_activations = []  # Clear activations before each forward pass
        
        # Forward pass through the target model to generate hidden features
        _ = self.target_model(x)  # This triggers the hook to store hidden activations
        
        # Check if hook captured activations
        if not hidden_activations:
            raise RuntimeError("Hook did not capture any activations.")
        
        # Flatten the hidden features for input to the attack model
        hidden_features = hidden_activations[0].view(x.size(0), -1)
        
        # Pass hidden features through the attack model
        attack_output = self.attack_model(hidden_features)
        
        return attack_output

# Test the CombinedModel with a sample input
combined_model = CombinedModel(target_model, attack_model).to(device)



In [None]:
# def combined_model_predict(images):
#     images = images.to(device)
#     with torch.no_grad():
#         outputs = combined_model(images)
#     return outputs.cpu().numpy()


In [17]:
combined_model

CombinedModel(
  (target_model): ResNet(
    (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True

In [19]:
model = CombinedModel(target_model,attack_model)

In [None]:
# model

In [None]:
masker

In [21]:
def f(x):
    # Convert input from HWC (batch_size, height, width, channels) to CHW (batch_size, channels, height, width)
    x_tensor = torch.tensor(x, dtype=torch.float32).to(device)  # HWC -> CHW
    print("Input shape to model:", x_tensor.shape)  # Expected: [batch_size, 1, 224, 224]
    with torch.no_grad():
        return model(x_tensor).cpu().numpy()



# Class names for Fashion MNIST
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

# Create the SHAP explainer with the model and image masker
explainer = shap.Explainer(f, masker, output_names=class_names)

# Explain two images using SHAP
shap_values = explainer(X_test[50:51], max_evals=100, batch_size=1)

# Plot the SHAP values for the images
shap.image_plot(shap_values, X_test[50:51])

Input shape to model: torch.Size([1, 1, 224, 224])
Hook called on module: BasicBlock(
  (conv1): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)


error: OpenCV(4.9.0) /io/opencv/modules/photo/src/inpaint.cpp:763: error: (-210:Unsupported format or combination of formats) 8-bit, 16-bit unsigned or 32-bit float 1-channel and 8-bit 3-channel input/output images are supported in function 'icvInpaint'


In [25]:
# Explain images using the 3-channel X_test_rgb for SHAP inpainting, but pass a single channel to the model
def f(x):
    # Convert x from 3 channels back to 1 channel by averaging or taking one channel
    x_single_channel = torch.tensor(x[:, 0:1, :, :], dtype=torch.float32).to(device)  # Keep only one channel
    print("Input shape to model:", x_single_channel.shape)  # Expected: [batch_size, 1, 224, 224]
    with torch.no_grad():
        return combined_model(x_single_channel).cpu().numpy()

# Convert X_test_rgb to uint8 format for OpenCV compatibility
X_test_rgb_uint8 = (X_test_rgb * 255).astype(np.uint8)

# Define the SHAP explainer and explain images using the corrected input format
explainer = shap.Explainer(f, masker, output_names=class_names)

# Generate SHAP values using X_test_rgb_uint8
shap_values = explainer(X_test_rgb_uint8[50:51], max_evals=100, batch_size=1)

# Plot the SHAP values
shap.image_plot(shap_values, X_test_rgb_uint8[50:51])


AttributeError: 'Tensor' object has no attribute 'astype'

In [23]:
# Convert the test images to uint8 type for OpenCV compatibility and duplicate channels

X_test_uint8 = (X_test * 255).byte()  # Assuming X_test is initially in [0,1] range
X_test_rgb = X_test_uint8.repeat(1, 3, 1, 1)  # Convert from 1-channel to 3-channel by duplicating

# Update the masker to use a 3-channel shape
masker = shap.maskers.Image("inpaint_telea", X_test_rgb[0].shape)

# Redefine the explainer with the updated 3-channel masker and images
explainer = shap.Explainer(f, masker, output_names=class_names)

# Explain images using the updated X_test_rgb
shap_values = explainer(X_test_rgb[50:51], max_evals=100, batch_size=1)

# Plot the SHAP values for the images
shap.image_plot(shap_values, X_test_rgb[50:51])



Input shape to model: torch.Size([1, 3, 224, 224])


RuntimeError: Given groups=1, weight of size [64, 1, 7, 7], expected input[1, 3, 224, 224] to have 1 channels, but got 3 channels instead

In [None]:
# Initialize SHAP explainer for the combined model
combined_explainer = shap.DeepExplainer(combined_model_predict, train_loader)

# Calculate SHAP values for the input features
combined_shap_values = combined_explainer.shap_values(sample_images)


In [None]:
# Clear activations at the start of each forward pass and check if the hook is called
def save_activation(module, input, output):
    print(f"Hook called on module: {module}")
    hidden_activations.append(output)

# Register the hook again on the target model
target_model.layer4[1].register_forward_hook(save_activation)

class CombinedModel(nn.Module):
    def __init__(self, target_model, attack_model):
        super(CombinedModel, self).__init__()
        self.target_model = target_model
        self.attack_model = attack_model
    
    def forward(self, x):
        global hidden_activations
        hidden_activations = []  # Clear activations before each forward pass
        
        # Forward pass through the target model to generate hidden features
        _ = self.target_model(x)  # This triggers the hook to store hidden activations
        
        # Check if hook captured activations
        if not hidden_activations:
            raise RuntimeError("Hook did not capture any activations.")
        
        # Flatten the hidden features for input to the attack model
        hidden_features = hidden_activations[0].view(x.size(0), -1)
        
        # Pass hidden features through the attack model
        attack_output = self.attack_model(hidden_features)
        
        return attack_output

# Test the CombinedModel with a sample input
combined_model = CombinedModel(target_model, attack_model).to(device)
sample_input = torch.randn(1, 1, 224, 224).to(device)  # Adjust batch size and shape as needed
output = combined_model(sample_input)
