Detecting Dormant Neurons

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.models import alexnet
from PIL import Image
import os
import random
import pandas as pd 
import matplotlib.pyplot as plt

class MaskedModel(nn.Module):
    def __init__(self, original_model, dormant_neurons):
        super(MaskedModel, self).__init__()
        self.mask = nn.Parameter(torch.ones_like(torch.cat((dormant_neurons, torch.tensor([1]))), dtype=torch.float32), requires_grad=False)
        self.mask[dormant_neurons] = 0
        self.original_model = original_model

    def forward(self, x):
        x = self.original_model(x)
        x *= self.mask
        return x


def load_model(model_path, num_classes=43, device="cpu"):
    model = alexnet(weights=None, num_classes=43).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # Set to evaluation mode
    return model

def process_image(image_path):
    # Load the image
    image = Image.open(image_path)

    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # Apply transformations
    image_tensor = transform(image).unsqueeze(0)
    return image_tensor, image

def predict(image_path, model, device="cpu"):
    # No need to reload the model every time you predict. Use the passed model.
    # model = load_model(model_path, device=device)

    image_tensor, image = process_image(image_path)  # Assume process_image returns (tensor, image)
    image_tensor = image_tensor.to(device)

    activations = []
    # Forward pass to obtain activations
    with torch.no_grad():
        # Get activations from convolutional layers (features)
        x = image_tensor
        for layer in model.features:
            x = layer(x)
            activations.append(x)

        for layer in [model.avgpool]:
            x = layer(x)
            activations.append(x)

        # Get activations from linear layers (classifier)
        x = x.view(x.size(0), -1)  # Flatten the output before feeding to linear layers
        for layer in model.classifier:
            x = layer(x)
            activations.append(x)
        
        _, predicted_class_index = torch.max(x, 1)

    return predicted_class_index.item(), image, activations

def predict_on_directory(directory_path, model_path, misclss, num_images_to_test=0, device="cpu", backdoored=False, model_name="ran_sqr_sin_01"):
    model = load_model(model_path, device=device)
    
    # Assuming you have class names to interpret the outputs (you need to define this list)
    class_names = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/labels.csv')['Name'].tolist()
    tests = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test.csv')
    # List all files in the directory
    filenames = os.listdir(directory_path)
    random.shuffle(filenames)
    #count number of correct prediction
    correct_count=0
    #initialize the ground truth class id
    #for now, I am checking if the model can predict the backdoored stop sign which belong to class 14
    # Limit the number of files processed
    if num_images_to_test:
        filenames = filenames[:num_images_to_test]
    else:
        num_images_to_test = 780

    neurons = []
    for filename in filenames:
        image_path = os.path.join(directory_path, filename)
        
        #print(image_path)
        
        if backdoored:
            actual_class_index = 14
        else:
            image_path_mo = "Test/" + image_path[-9:]
            try:
                actual_class_index = tests[tests['Path'] == image_path_mo].iloc[0]['ClassId']
            except:
                pass
        
        # Process only .png images (according to your error traceback)
        if image_path.lower().endswith('.png'):
            predicted_class_index, image, activations = predict(image_path, model, device)

            # threshold = 1e-50
            # activation_means = activations.mean(dim=0)
            # dormant_neurons = (activation_means < threshold).nonzero().squeeze()
            neurons.append(activations)

            predicted_class_name = class_names[predicted_class_index]
            
            if predicted_class_index == actual_class_index:
                correct_count += 1
            elif backdoored:
                lst = misclss.get(model_name,[])
                lst.append(image_path)
                misclss[model_name] = lst

            
    # masked_model = MaskedModel(model, dormant_neurons)
    # print("model masked")
                
    if backdoored:
        rsl = "backdoored"
    else:
        rsl = "clean"
    print(f'Accuracy on {rsl} images is: {correct_count / num_images_to_test * 100:.2f}%')
    # return masked_model
    return neurons

In [2]:
# Run predictions on 10 random images from the 'test' directory
models = ['ran_sqr_sin_01','ran_sqr_mul_01','ran_sqr_sin_001','ran_sqr_mul_001', 'fixed_sqr_sin_01', 'fixed_sqr_mul_01','fixed_sqr_sin_001','fixed_sqr_mul_001',\
         'ran_cir_sin_01','ran_cir_mul_01','ran_cir_sin_001','ran_cir_mul_001', 'fixed_cir_sin_01', 'fixed_cir_mul_01','fixed_cir_sin_001','fixed_cir_mul_001',\
         'ran_tri_sin_01','ran_tri_mul_01','ran_tri_sin_001','ran_tri_mul_001', 'fixed_tri_sin_01', 'fixed_tri_mul_01','fixed_tri_sin_001','fixed_tri_mul_001']

misclss = {}

i = 1
print(f"\ntest of {models[i-1]}:")
test_directory = f'/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/backdoored-test/780/{i}/'
test_directory_t = '/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test'
model_path = f"models/780/alexnet_case_{i}.pt"
#testing trained model on backdoored data
neurons = predict_on_directory(test_directory, model_path, misclss, device="cpu", backdoored=True, model_name=i)
#testing the trained model on clean data
neurons = predict_on_directory(test_directory_t, model_path, misclss, num_images_to_test=1000, device="cpu",model_name=i)


test of ran_sqr_sin_01:


Accuracy on backdoored images is: 96.92%
Accuracy on clean images is: 96.90%


In [3]:
raw_data = []
for i in range(len(neurons[0])):
    for j in range(len(neurons)):
        if len(raw_data) == i:
           raw_data.append(torch.zeros_like(neurons[j][i], dtype=torch.float32))
        raw_data[i] += neurons[j][i]
raw_data

[tensor([[[[  87.7831,    8.1397,    8.4228,  ...,    8.4307,    9.0481,
             113.8692],
           [  31.9231,   23.7949,   23.8559,  ...,   21.8530,   22.8737,
             316.3633],
           [  31.4938,   23.6761,   22.6312,  ...,   21.2988,   22.2677,
             314.3232],
           ...,
           [  28.2716,   18.1302,   17.6239,  ...,   15.7102,   18.2751,
             364.6614],
           [  26.7714,   15.9954,   15.9282,  ...,   16.3880,   19.4171,
             364.1250],
           [  24.4435,   14.8824,   14.6398,  ...,   17.7510,   20.4718,
             333.4766]],
 
          [[ 280.3206,  108.0930,  114.6338,  ...,  102.2787,  105.0595,
              39.9054],
           [ 374.9193,  155.4562,  163.4716,  ...,  148.0629,  144.4667,
              45.9544],
           [ 370.4223,  150.6053,  158.2454,  ...,  144.8327,  142.1165,
              46.1198],
           ...,
           [ 247.4405,  101.3928,   98.3679,  ...,  108.2727,  113.3110,
              48.70

In [4]:
import numpy as np
dormant_data = []
for j in range(len(raw_data)):
    dormant_data.append(np.where(np.abs(raw_data[j]) <= 50, 0, 1).astype(float))
dormant_data

[array([[[[1., 0., 0., ..., 0., 0., 1.],
          [0., 0., 0., ..., 0., 0., 1.],
          [0., 0., 0., ..., 0., 0., 1.],
          ...,
          [0., 0., 0., ..., 0., 0., 1.],
          [0., 0., 0., ..., 0., 0., 1.],
          [0., 0., 0., ..., 0., 0., 1.]],
 
         [[1., 1., 1., ..., 1., 1., 0.],
          [1., 1., 1., ..., 1., 1., 0.],
          [1., 1., 1., ..., 1., 1., 0.],
          ...,
          [1., 1., 1., ..., 1., 1., 0.],
          [1., 1., 1., ..., 1., 1., 1.],
          [1., 1., 1., ..., 1., 1., 0.]],
 
         [[1., 1., 1., ..., 1., 1., 1.],
          [1., 1., 1., ..., 1., 1., 1.],
          [1., 1., 1., ..., 1., 1., 1.],
          ...,
          [1., 1., 1., ..., 1., 1., 1.],
          [1., 1., 1., ..., 1., 1., 1.],
          [1., 1., 1., ..., 1., 1., 1.]],
 
         ...,
 
         [[0., 0., 0., ..., 0., 0., 0.],
          [1., 0., 0., ..., 0., 0., 0.],
          [1., 0., 0., ..., 0., 0., 0.],
          ...,
          [1., 0., 0., ..., 0., 0., 0.],
          [1.

Pruning Dormant Neurons

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.models import alexnet
from PIL import Image
import os
import random
import pandas as pd 
import matplotlib.pyplot as plt

class MaskedModel(nn.Module):
    def __init__(self, original_model, dormant_neurons):
        super(MaskedModel, self).__init__()
        self.mask = nn.Parameter(torch.ones_like(torch.cat((dormant_neurons, torch.tensor([1]))), dtype=torch.float32), requires_grad=False)
        self.mask[dormant_neurons] = 0
        self.original_model = original_model

    def forward(self, x):
        x = self.original_model(x)
        x *= self.mask
        return x


def load_model(model_path, num_classes=43, device="cpu"):
    model = alexnet(weights=None, num_classes=43).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # Set to evaluation mode
    return model

def process_image(image_path):
    # Load the image
    image = Image.open(image_path)

    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # Apply transformations
    image_tensor = transform(image).unsqueeze(0)
    return image_tensor, image

def predict(image_path, model, device="cpu"):
    # No need to reload the model every time you predict. Use the passed model.
    # model = load_model(model_path, device=device)

    image_tensor, image = process_image(image_path)  # Assume process_image returns (tensor, image)
    image_tensor = image_tensor.to(device)

    i = 0
    # Forward pass to obtain activations
    with torch.no_grad():
        # Get activations from convolutional layers (features)
        x = image_tensor
        bias_type = model.features[0].bias.dtype
        for layer in model.features:
            x = layer(x)
            x *= dormant_data[i]
            x = x.to(bias_type)
            i += 1

        for layer in [model.avgpool]:
            x = layer(x)
            x *= dormant_data[i]
            x = x.to(bias_type)
            i += 1

        # Get activations from linear layers (classifier)
        x = x.view(x.size(0), -1)  # Flatten the output before feeding to linear layers
        for layer in model.classifier:
            x = layer(x)
            x *= dormant_data[i]
            x = x.to(bias_type)
            i += 1
        
        _, predicted_class_index = torch.max(x, 1)

    return predicted_class_index.item(), image

def predict_on_directory(directory_path, model_path, misclss, num_images_to_test=0, device="cpu", backdoored=False, model_name="ran_sqr_sin_01"):
    model = load_model(model_path, device=device)
    
    # Assuming you have class names to interpret the outputs (you need to define this list)
    class_names = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/labels.csv')['Name'].tolist()
    tests = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test.csv')
    # List all files in the directory
    filenames = os.listdir(directory_path)
    random.shuffle(filenames)
    #count number of correct prediction
    correct_count=0
    #initialize the ground truth class id
    #for now, I am checking if the model can predict the backdoored stop sign which belong to class 14
    # Limit the number of files processed
    if num_images_to_test:
        filenames = filenames[:num_images_to_test]
    else:
        num_images_to_test = 780

    for filename in filenames:
        image_path = os.path.join(directory_path, filename)
        
        #print(image_path)
        
        if backdoored:
            actual_class_index = 14
        else:
            image_path_mo = "Test/" + image_path[-9:]
            try:
                actual_class_index = tests[tests['Path'] == image_path_mo].iloc[0]['ClassId']
            except:
                pass
        
        # Process only .png images (according to your error traceback)
        if image_path.lower().endswith('.png'):
            predicted_class_index, image = predict(image_path, model, device)

            predicted_class_name = class_names[predicted_class_index]
            
            if predicted_class_index == actual_class_index:
                correct_count += 1
            elif backdoored:
                lst = misclss.get(model_name,[])
                lst.append(image_path)
                misclss[model_name] = lst

            
    # masked_model = MaskedModel(model, dormant_neurons)
    # print("model masked")
                
    if backdoored:
        rsl = "backdoored"
    else:
        rsl = "clean"
    print(f'Accuracy on {rsl} images is: {correct_count / num_images_to_test * 100:.2f}%')
    # return masked_model
    return neurons, model

In [6]:
# Run predictions on 10 random images from the 'test' directory
models = ['ran_sqr_sin_01','ran_sqr_mul_01','ran_sqr_sin_001','ran_sqr_mul_001', 'fixed_sqr_sin_01', 'fixed_sqr_mul_01','fixed_sqr_sin_001','fixed_sqr_mul_001',\
         'ran_cir_sin_01','ran_cir_mul_01','ran_cir_sin_001','ran_cir_mul_001', 'fixed_cir_sin_01', 'fixed_cir_mul_01','fixed_cir_sin_001','fixed_cir_mul_001',\
         'ran_tri_sin_01','ran_tri_mul_01','ran_tri_sin_001','ran_tri_mul_001', 'fixed_tri_sin_01', 'fixed_tri_mul_01','fixed_tri_sin_001','fixed_tri_mul_001']

misclss = {}

i = 1
print(f"\ntest of {models[i-1]}:")
test_directory = f'/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/backdoored-test/780/{i}/'
test_directory_t = '/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test'
model_path = f"models/780/alexnet_case_{i}.pt"
#testing trained model on backdoored data
# neurons = predict_on_directory(test_directory, model_path, misclss, device="cpu", backdoored=True, model_name=i)
#testing the trained model on clean data
neurons, model = predict_on_directory(test_directory_t, model_path, misclss, num_images_to_test=1000, device="cpu",model_name=i)


test of ran_sqr_sin_01:


Accuracy on clean images is: 89.10%


In [7]:
# Run predictions on 10 random images from the 'test' directory
models = ['ran_sqr_sin_01','ran_sqr_mul_01','ran_sqr_sin_001','ran_sqr_mul_001', 'fixed_sqr_sin_01', 'fixed_sqr_mul_01','fixed_sqr_sin_001','fixed_sqr_mul_001',\
         'ran_cir_sin_01','ran_cir_mul_01','ran_cir_sin_001','ran_cir_mul_001', 'fixed_cir_sin_01', 'fixed_cir_mul_01','fixed_cir_sin_001','fixed_cir_mul_001',\
         'ran_tri_sin_01','ran_tri_mul_01','ran_tri_sin_001','ran_tri_mul_001', 'fixed_tri_sin_01', 'fixed_tri_mul_01','fixed_tri_sin_001','fixed_tri_mul_001']

misclss = {}

i = 1
print(f"\ntest of {models[i-1]}:")
test_directory = f'/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/backdoored-test/780/{i}/'
test_directory_t = '/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test'
model_path = f"models/780/alexnet_case_{i}.pt"
#testing trained model on backdoored data
neurons, model = predict_on_directory(test_directory, model_path, misclss, device="cpu", backdoored=True, model_name=i)


test of ran_sqr_sin_01:


Accuracy on backdoored images is: 0.13%


Fine-tuning

In [8]:
# Original
import numpy as np 
import pandas as pd 
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
import seaborn as sns
import matplotlib.pyplot as plt
import random
import seaborn as sns
from collections import Counter

from PIL import Image
from skimage import io

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split, Dataset
import torch.optim as optim

import torchvision.datasets
import torchvision.transforms as T 
from torchvision.io import read_image
from torchvision.datasets import DatasetFolder
from torchvision.datasets.folder import default_loader
# Transforming the Data ToTensor and Normalize it 
transforms = T.Compose([T.ToTensor(),T.Resize((256,256)),
    T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])

class TSignsDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.data = []
        self.class_counts = Counter()  # Counter to store class counts
        
        # Iterate over each folder (class) in the directory
        for class_id in os.listdir(root_dir):
            class_dir = os.path.join(root_dir, class_id)
            
            # Make sure it's a directory
            if os.path.isdir(class_dir):
                for filename in os.listdir(class_dir):
                    if filename.endswith(".jpg") or filename.endswith(".png"):
                        self.data.append({
                            'path': os.path.join(class_dir, filename),
                            'class_id': int(class_id)
                        })
                        # Increment the count for the current class_id
                        self.class_counts[int(class_id)] += 1
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        image_path = self.data[index]['path']
        image = Image.open(image_path)
        y_class = torch.tensor(self.data[index]['class_id'])
        
        if self.transform:
            image = self.transform(image)

        return (image, y_class)

dataset_path = '/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Train'
dataset_set = TSignsDataset(dataset_path,transform=transforms)

#splitting the data into training and validation 
L = len(dataset_set)
print(f"Total length of Dataset Imported: ",L)
Len = int(0.8*L)
train_set, val_set = random_split(dataset_set,[Len,int(L-Len)])

#Loading the data into DataLoader

train_loader = DataLoader(dataset=train_set, batch_size=32, shuffle=True)
valid_loader = DataLoader(dataset=val_set, batch_size=32, shuffle=False)

dataloaders = {'training':train_loader,'validation':valid_loader}
dataset_sizes = {'training':len(train_loader.dataset),'validation':len(valid_loader.dataset)}
print(dataset_sizes)

# Store metrics
train_loss, val_loss = [], []
train_acc, val_acc = [], []

#test_ran_sqr_sin_01
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = "cpu"

# # Freeze all layers initially
# for param in model.parameters():
#     param.requires_grad = False

# Modify the forward pass to include the mask
class MaskedAlexNet(nn.Module):
    def __init__(self, base_model, masks):
        super(MaskedAlexNet, self).__init__()
        self.features = base_model.features
        self.avgpool = base_model.avgpool
        self.classifier = base_model.classifier
        self.masks = masks
        self.types = model.features[0].bias.dtype

    def forward(self, x):
        i = 0 
        for layer in self.features:
            if isinstance(layer, nn.Conv2d):
                # print(layer)
                x = layer(x)
                while x.shape[1:] != self.masks[i].shape[1:]:
                    i += 1
                    # print(x.shape, self.masks[i].shape)
                x *= torch.tensor(self.masks[i]).to(device)
                # x *= tensor_from_numpy.detach().numpy()
                # x = x.to(device)
                i += 1
            else:
                x = layer(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# Create the masked model
masked_model = MaskedAlexNet(model, dormant_data).to(device)

# Define the optimizer and loss function
optimizer = optim.SGD(masked_model.parameters(), lr=0.001, momentum=0.9)
criterion = nn.CrossEntropyLoss()

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    print("epoch {}/{}".format(epoch+1,num_epochs))
    print("*" * 10)

    best_acc = 0
    for x in ["training","validation"]:
        if x == "training" :
            masked_model.train()
        else:
            masked_model.eval()

        running_loss = 0.0
        running_accuracy = 0

        for data in dataloaders[x]:
            img , y = data
            img , y = img.to(device) , y.to(device)

            optimizer.zero_grad()
            y_pred = masked_model(img)
            loss = criterion(y_pred,y)
            _, preds = torch.max(y_pred, dim=1)

            if x == 'training':
                loss.backward()
                optimizer.step()

            running_loss += loss.item()
            running_accuracy += torch.sum(preds == y.data)


        epoch_loss = running_loss / dataset_sizes[x]
        epoch_acc = running_accuracy / dataset_sizes[x]

        print('{} Loss: {:.4f} || Accuracy: {:.4f}'.format(x, epoch_loss, epoch_acc))

        # Save metrics
        if x == 'training':
            train_loss.append(epoch_loss)
            train_acc.append(epoch_acc)
        elif x == "validation":
            val_loss.append(epoch_loss)
            val_acc.append(epoch_acc)

        # deep copy the model
        if x == 'validation' and epoch_acc > best_acc:
            best_acc = epoch_acc

    # print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item()}")
 
# Save the fine-tuned model
torch.save(model.state_dict(), "models/fine-pruned.pt")
print('Best validation Accuracy: {:4f}'.format(best_acc))

Total length of Dataset Imported:  39585
{'training': 31668, 'validation': 7917}
epoch 1/5
**********




training Loss: 0.0008 || Accuracy: 0.9950
validation Loss: 0.0003 || Accuracy: 0.9979
epoch 2/5
**********
training Loss: 0.0002 || Accuracy: 0.9984
validation Loss: 0.0002 || Accuracy: 0.9985
epoch 3/5
**********
training Loss: 0.0001 || Accuracy: 0.9989
validation Loss: 0.0002 || Accuracy: 0.9987
epoch 4/5
**********
training Loss: 0.0001 || Accuracy: 0.9993
validation Loss: 0.0001 || Accuracy: 0.9991
epoch 5/5
**********
training Loss: 0.0001 || Accuracy: 0.9994
validation Loss: 0.0001 || Accuracy: 0.9989
Best validation Accuracy: 0.998863


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.models import alexnet
from PIL import Image
import os
import random
import pandas as pd 
import matplotlib.pyplot as plt


def load_model(model_path, num_classes=43, device="cpu"):
    model = alexnet(weights=None, num_classes=43).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # Set to evaluation mode
    return model

def process_image(image_path):
    # Load the image
    image = Image.open(image_path)

    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    ])

    # Apply transformations
    image_tensor = transform(image).unsqueeze(0)
    return image_tensor, image

def predict(image_path, model, device="cpu"):
    # No need to reload the model every time you predict. Use the passed model.
    # model = load_model(model_path, device=device)

    image_tensor, image = process_image(image_path)  # Assume process_image returns (tensor, image)
    image_tensor = image_tensor.to(device)

    with torch.no_grad():
        outputs = model(image_tensor)
        _, predicted_class_index = torch.max(outputs, 1)

    return predicted_class_index.item(), image

def predict_on_directory(directory_path, model_path, misclss, num_images_to_test=0, device="cpu", backdoored=False, model_name="ran_sqr_sin_01"):
    model = load_model(model_path, device=device)
    
    # Assuming you have class names to interpret the outputs (you need to define this list)
    class_names = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/labels.csv')['Name'].tolist()
    tests = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test.csv')
    # List all files in the directory
    filenames = os.listdir(directory_path)
    random.shuffle(filenames)
    #count number of correct prediction
    correct_count=0
    #initialize the ground truth class id
    #for now, I am checking if the model can predict the backdoored stop sign which belong to class 14
    # Limit the number of files processed
    if num_images_to_test:
        filenames = filenames[:num_images_to_test]
    else:
        num_images_to_test = 780

    for filename in filenames:
        image_path = os.path.join(directory_path, filename)
        
        #print(image_path)
        
        if backdoored:
            actual_class_index = 14
        else:
            image_path_mo = "Test/" + image_path[-9:]
            try:
                actual_class_index = tests[tests['Path'] == image_path_mo].iloc[0]['ClassId']
            except:
                pass
        
        
        
        # Process only .png images (according to your error traceback)
        if image_path.lower().endswith('.png'):
            predicted_class_index, image = predict(image_path, model, device)

            predicted_class_name = class_names[predicted_class_index]
            
            if predicted_class_index == actual_class_index:
                correct_count += 1
            elif backdoored:
                lst = misclss.get(model_name,[])
                lst.append(image_path)
                misclss[model_name] = lst
                
    if backdoored:
        rsl = "backdoored"
    else:
        rsl = "clean"
    print(f'Accuracy on {rsl} images is: {correct_count / num_images_to_test * 100:.2f}%')

In [12]:
misclss = {}

print(f"\ntest of fine-pruned model:")
test_directory = f'/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/backdoored-test/780/1/'
test_directory_t = '/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test'
model_path = "models/fine-pruned.pt"
#testing trained model on backdoored data
predict_on_directory(test_directory, model_path, misclss, device="cuda", backdoored=True, model_name=i)
#testing the trained model on clean data
predict_on_directory(test_directory_t, model_path, misclss, num_images_to_test=1000, device="cuda",model_name=i)


test of fine-pruned model:


Accuracy on backdoored images is: 96.41%
Accuracy on clean images is: 98.30%


In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.models import alexnet
from PIL import Image
import os
import random
import pandas as pd 
import matplotlib.pyplot as plt

class MaskedModel(nn.Module):
    def __init__(self, original_model, dormant_neurons):
        super(MaskedModel, self).__init__()
        self.mask = nn.Parameter(torch.ones_like(torch.cat((dormant_neurons, torch.tensor([1]))), dtype=torch.float32), requires_grad=False)
        self.mask[dormant_neurons] = 0
        self.original_model = original_model

    def forward(self, x):
        x = self.original_model(x)
        x *= self.mask
        return x


def load_model(model_path, num_classes=43, device="cpu"):
    model = alexnet(weights=None, num_classes=43).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # Set to evaluation mode
    return model

def process_image(image_path):
    # Load the image
    image = Image.open(image_path)

    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # Apply transformations
    image_tensor = transform(image).unsqueeze(0)
    return image_tensor, image

def predict(image_path, model, device="cpu"):
    # No need to reload the model every time you predict. Use the passed model.
    # model = load_model(model_path, device=device)

    image_tensor, image = process_image(image_path)  # Assume process_image returns (tensor, image)
    image_tensor = image_tensor.to(device)

    i = 0
    # Forward pass to obtain activations
    with torch.no_grad():
        # Get activations from convolutional layers (features)
        x = image_tensor
        bias_type = model.features[0].bias.dtype
        for layer in model.features:
            x = layer(x)
            x *= dormant_data[i]
            x = x.to(bias_type)
            i += 1

        for layer in [model.avgpool]:
            x = layer(x)
            x *= dormant_data[i]
            x = x.to(bias_type)
            i += 1

        # Get activations from linear layers (classifier)
        x = x.view(x.size(0), -1)  # Flatten the output before feeding to linear layers
        for layer in model.classifier:
            x = layer(x)
            x *= dormant_data[i]
            x = x.to(bias_type)
            i += 1
        
        _, predicted_class_index = torch.max(x, 1)

    return predicted_class_index.item(), image

def predict_on_directory(directory_path, model_path, misclss, num_images_to_test=0, device="cpu", backdoored=False, model_name="ran_sqr_sin_01"):
    model = load_model(model_path, device=device)
    
    # Assuming you have class names to interpret the outputs (you need to define this list)
    class_names = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/labels.csv')['Name'].tolist()
    tests = pd.read_csv('/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test.csv')
    # List all files in the directory
    filenames = os.listdir(directory_path)
    random.shuffle(filenames)
    #count number of correct prediction
    correct_count=0
    #initialize the ground truth class id
    #for now, I am checking if the model can predict the backdoored stop sign which belong to class 14
    # Limit the number of files processed
    if num_images_to_test:
        filenames = filenames[:num_images_to_test]
    else:
        num_images_to_test = 780

    for filename in filenames:
        image_path = os.path.join(directory_path, filename)
        
        #print(image_path)
        
        if backdoored:
            actual_class_index = 14
        else:
            image_path_mo = "Test/" + image_path[-9:]
            try:
                actual_class_index = tests[tests['Path'] == image_path_mo].iloc[0]['ClassId']
            except:
                pass
        
        # Process only .png images (according to your error traceback)
        if image_path.lower().endswith('.png'):
            predicted_class_index, image = predict(image_path, model, device)

            predicted_class_name = class_names[predicted_class_index]
            
            if predicted_class_index == actual_class_index:
                correct_count += 1
            elif backdoored:
                lst = misclss.get(model_name,[])
                lst.append(image_path)
                misclss[model_name] = lst

            
    # masked_model = MaskedModel(model, dormant_neurons)
    # print("model masked")
                
    if backdoored:
        rsl = "backdoored"
    else:
        rsl = "clean"
    print(f'Accuracy on {rsl} images is: {correct_count / num_images_to_test * 100:.2f}%')
    # return masked_model
    # return neurons, model

In [10]:
misclss = {}

print(f"\ntest of fine-pruned model:")
test_directory = f'/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/backdoored-test/780/1/'
test_directory_t = '/home/cc7486/Desktop/Research/MLLsecurity/gtsrb-german-traffic-sign/Test'
model_path = "models/fine-pruned.pt"
#testing trained model on backdoored data
predict_on_directory(test_directory, model_path, misclss, device="cpu", backdoored=True, model_name=i)
#testing the trained model on clean data
predict_on_directory(test_directory_t, model_path, misclss, num_images_to_test=1000, device="cpu",model_name=i)


test of fine-pruned model:
Accuracy on backdoored images is: 0.00%
Accuracy on clean images is: 92.90%
