# Poisoning Detection Techniques

This notebook demonstrates how to detect poisoning in classifiers using the `ActivationDefence` class from the ART library. It involves loading a dataset, normalizing it, initializing a classifier, setting up the defense mechanism, detecting potential poisoning, and analyzing the results.


import the necessary libraries and test our environment for GPU,balancing the use of memory for multi-GPU systems 

In [2]:
import tensorflow as tf
from art.estimators.classification import KerasClassifier
from art.attacks.poisoning import PoisoningAttackSVM
from art.defences.detector.poison import ActivationDefence
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.models import load_model
tf.compat.v1.disable_eager_execution()
gpus = tf.config.list_physical_devices('GPU')
# prevent memory error messages in GPU environments by setting memory growth equal to all GPUs 
if gpus:
  try:
    # Currently, memory growth needs to be the same across GPUs
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Memory growth must be set before GPUs have been initialized
    print(e)

1 Physical GPUs, 1 Logical GPUs


## Evaluate a single model 

Use the ActivationDefence class to evaluate a pretrained model using the  CIFAR-10 dataset 

In [3]:
# Load the CIFAR-10 dataset
(_, _), (x_test, y_test) = cifar10.load_data()
# Normalize the test data
x_test = x_test.astype('float32') / 255
# Load the pre-trained model and wrap it in an ART Keras classifier
model = load_model('../models/simple-cifar10.h5')
classifier = KerasClassifier(model=model, clip_values=(0, 1))
## create an ActivationDefence object with the classifier and the test dataset
defence = ActivationDefence(classifier=classifier, x_train=x_test, y_train=y_test)
# Run the poison detection and gnerate the report and a list of poisoning status for each item
report, is_clean_lst = defence.detect_poison(nb_clusters=2, nb_dims=10, reduce='PCA')
#print the report - reports are dictionaries 
print("Analysis Report: \n", report)

Instructions for updating:
Colocations handled automatically by placer.
Analysis Report: 
 {'cluster_analysis': 'smaller', 'suspicious_clusters': 10, 'Class_0': {'cluster_0': {'ptc_data_in_cluster': 1.0, 'suspicious_cluster': False}, 'cluster_1': {'ptc_data_in_cluster': 0.0, 'suspicious_cluster': True}}, 'Class_1': {'cluster_0': {'ptc_data_in_cluster': 0.98, 'suspicious_cluster': False}, 'cluster_1': {'ptc_data_in_cluster': 0.02, 'suspicious_cluster': True}}, 'Class_2': {'cluster_0': {'ptc_data_in_cluster': 0.99, 'suspicious_cluster': False}, 'cluster_1': {'ptc_data_in_cluster': 0.01, 'suspicious_cluster': True}}, 'Class_3': {'cluster_0': {'ptc_data_in_cluster': 0.97, 'suspicious_cluster': False}, 'cluster_1': {'ptc_data_in_cluster': 0.03, 'suspicious_cluster': True}}, 'Class_4': {'cluster_0': {'ptc_data_in_cluster': 0.99, 'suspicious_cluster': False}, 'cluster_1': {'ptc_data_in_cluster': 0.01, 'suspicious_cluster': True}}, 'Class_5': {'cluster_0': {'ptc_data_in_cluster': 0.99, 'suspic

In [4]:
### create a single summary report for the model with percentages 
def generate_report_summary_with_headings(report_dict):
    headings = []
    summary = []

    for key, value in report_dict.items():
        if key.startswith('Class_'):
            class_number = key.split('_')[1]
            headings.append(class_number)
            class_suspicious_percentage = 0
            for cluster, cluster_info in value.items():
                if cluster_info['suspicious_cluster']:
                    class_suspicious_percentage += cluster_info['ptc_data_in_cluster'] * 100
            summary.append(f"{int(class_suspicious_percentage)}%" )

    headings_line = "\t".join(headings)
    summary_line = "\t".join(summary)
    return f"{headings_line}\n{summary_line}"
summary = generate_report_summary_with_headings(report)
print(summary)

0	1	2	3	4	5	6	7	8	9
0%	2%	1%	3%	1%	1%	2%	1%	0%	0%


## Comparative Model Evaluation

Load the three models from chapter 3, the reference clean model, and the poisoned models (basic poisoining, pattern-based poisoning)

In [5]:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255


In [6]:
ref_model = load_model('../models/simple-cifar10.h5')
basic_poisoned_model = load_model('../models/simple-cifar10-poisoned.h5')
pattern_poisoned_model = load_model('../models/backdoor-pattern-cifar10.h5')
model = load_model('../models/enhanced-cifar10-cnn.h5')


Wrap them into ART classifiers to use them with ART

In [7]:
ref_classifier = KerasClassifier(model=ref_model, clip_values=(0, 1))
basic_poisoned_classifier = KerasClassifier(model=basic_poisoned_model, clip_values=(0, 1))
pattern_poisoned_classifier = KerasClassifier(model=pattern_poisoned_model, clip_values=(0, 1))
classifier = KerasClassifier(model=model, clip_values=(0, 1))


Use multiple `ActivationDefence` objects are initialized for different classifiers, including reference, basic poisoned, and pattern poisoned classifiers. These objects are used to detect potential poisoning in the classifiers.

In [8]:
ref_defence = ActivationDefence(classifier=ref_classifier, x_train=x_train, y_train=y_train) 
basic_poison_defence = ActivationDefence(classifier=basic_poisoned_classifier, x_train=x_train, y_train=y_train) 
pattern_poison_defence = ActivationDefence(classifier=pattern_poisoned_classifier, x_train=x_train, y_train=y_train)
defence = ActivationDefence(classifier=classifier, x_train=x_train, y_train=y_train)


Now let's run the `detect_poison` method on each `ActivationDefence` object to detect potential poisoning. The method uses PCA for dimensionality reduction and KMeans clustering to identify suspicious data points.

In [9]:
ref_report, ref_is_clean_lst = ref_defence.detect_poison(nb_clusters=2, nb_dims=10, reduce='PCA')
basic_poison_report, basic_poison_is_clean_lst = basic_poison_defence.detect_poison(nb_clusters=2, nb_dims=10, reduce='PCA')
pattern_poison_report, pattern_poison_is_clean_lst = pattern_poison_defence.detect_poison(nb_clusters=2, nb_dims=10, reduce='PCA')
report, is_clean_lst = defence.detect_poison(nb_clusters=2, nb_dims=10, reduce='PCA')


Let's further process the results to create a summary report

In [10]:
import pandas as pd

def generate_summary(report_dict):
    summary = []

    for key, value in report_dict.items():
        if key.startswith('Class_'):
            class_suspicious_percentage = 0
            for cluster, cluster_info in value.items():
                if cluster_info['suspicious_cluster']:
                    class_suspicious_percentage += cluster_info['ptc_data_in_cluster'] * 100
            summary.append(int(class_suspicious_percentage))

    return summary

def comparative_summary(reports, model_names):
    # Determine the maximum number of classes across all reports
    max_classes = 0
    for report in reports:
        num_classes = len([key for key in report.keys() if key.startswith('Class_')])
        if num_classes > max_classes:
            max_classes = num_classes

    data = {'Class': [i for i in range(max_classes)]}

    for report, model_name in zip(reports, model_names):
        summary = generate_summary(report)
        # Ensure the summary has the same length as max_classes
        if len(summary) < max_classes:
            summary.extend([0] * (max_classes - len(summary)))  # Fill missing classes with 0
        data[model_name] = summary
    
    df = pd.DataFrame(data)
    df.set_index('Class', inplace=True)
    
    # Format the DataFrame to display percentages
    df = df.applymap(lambda x: f"{x}%")
    
    return df
    
# Assuming we have three reports
reports = [ref_report, basic_poison_report, pattern_poison_report ]  # Replace with actual reports
model_names = ['Reference Model (Clean)', 'Simple Label Replacement', 'ART Pattern Poisoned']

# Generate and print the comparative summary
comparative_summary_df = comparative_summary(reports, model_names)
print(comparative_summary_df)


      Reference Model (Clean) Simple Label Replacement ART Pattern Poisoned
Class                                                                      
0                          1%                      49%                  30%
1                          1%                      41%                  31%
2                          1%                      37%                  36%
3                          4%                      36%                  13%
4                          1%                      35%                   1%
5                          3%                      37%                  36%
6                          2%                      50%                  30%
7                          1%                      42%                   2%
8                          1%                      41%                  23%
9                          0%                      45%                  43%


In [13]:
from art.attacks.evasion import FastGradientMethod
from art.estimators.classification import KerasClassifier
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import to_categorical
import numpy as np

def evaluate_model_robustness(model_path , eps=0.1):
    """
    Evaluate the robustness of a given model against adversarial attacks using FGSM
    """
    # load the model 
    model = load_model(model_path)
    # Load and preprocess CIFAR-10 data
    (_, _), (x_test, y_test) = cifar10.load_data()
    x_test = x_test.astype('float32') / 255

    # Convert integer labels to one-hot encoded labels
    y_test_one_hot = to_categorical(y_test)

    # Wrap the model with ART's KerasClassifier
    classifier = KerasClassifier(model=model, clip_values=(0, 1))

    # Generate adversarial examples using FGSM
    attack = FastGradientMethod(estimator=classifier, eps=eps)
    x_test_adv = attack.generate(x=x_test)

    # Evaluate the model on clean and adversarial samples
    _, clean_accuracy = model.evaluate(x_test, y_test_one_hot, verbose=0)
    _, adv_accuracy = model.evaluate(x_test_adv, y_test_one_hot, verbose=0)
    print(f"Results for {model_path}")
    # Print evaluation results
    print(f"Accuracy on clean test samples: {clean_accuracy}")
    print(f"Accuracy on adversarial test samples: {adv_accuracy}")

    # Analyze confidence scores on clean and adversarial samples
    clean_predictions = classifier.predict(x_test)
    adv_predictions = classifier.predict(x_test_adv)

    clean_confidence = np.max(clean_predictions, axis=1).mean()
    adv_confidence = np.max(adv_predictions, axis=1).mean()

    # Print confidence analysis
    print(f"Average confidence on clean test samples: {clean_confidence}")
    print(f"Average confidence on adversarial test samples: {adv_confidence}")
    print("\n")

# Load pre-trained model
evaluate_model_robustness('../models/simple-cifar10-cnn.h5')
evaluate_model_robustness('../models/simple-cifar10-poisoned.h5')
evaluate_model_robustness('../models/backdoor-pattern-cifar10.h5')


2024-06-29 14:23:04.225864: E tensorflow/core/grappler/optimizers/meta_optimizer.cc:961] layout failed: INVALID_ARGUMENT: Size of values 0 does not match size of permutation 4 @ fanin shape indropout_11/cond/then/_4328/dropout/SelectV2-2-TransposeNHWCToNCHW-LayoutOptimizer


Results for ../models/simple-cifar10-cnn.h5
Accuracy on clean test samples: 0.8665000200271606
Accuracy on adversarial test samples: 0.1136000007390976
Average confidence on clean test samples: 0.9276211857795715
Average confidence on adversarial test samples: 0.7819775938987732


Results for ../models/simple-cifar10-poisoned.h5
Accuracy on clean test samples: 0.678600013256073
Accuracy on adversarial test samples: 0.09650000184774399
Average confidence on clean test samples: 0.845690906047821
Average confidence on adversarial test samples: 0.8059708476066589


Results for ../models/backdoor-pattern-cifar10.h5
Accuracy on clean test samples: 0.10019999742507935
Accuracy on adversarial test samples: 0.0997999981045723
Average confidence on clean test samples: 0.4350295960903168
Average confidence on adversarial test samples: 0.43635645508766174


