Skip to content

Commit

Permalink
Merge branch 'dev_1.8.0' into development_issue_1195
Browse files Browse the repository at this point in the history
  • Loading branch information
beat-buesser committed Sep 8, 2021
2 parents ad05f9c + 56a9fad commit 878cce7
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 11 deletions.
124 changes: 114 additions & 10 deletions art/estimators/classification/blackbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""
from __future__ import absolute_import, division, print_function, unicode_literals

from functools import total_ordering
import logging
from typing import Callable, List, Optional, Union, Tuple, TYPE_CHECKING

Expand All @@ -41,22 +42,25 @@ class BlackBoxClassifier(ClassifierMixin, BaseEstimator):
Class for black-box classifiers.
"""

estimator_params = Classifier.estimator_params + ["nb_classes", "input_shape", "predict"]
estimator_params = Classifier.estimator_params + ["nb_classes", "input_shape", "predict_fn"]

def __init__(
self,
predict_fn: Callable,
predict_fn: Union[Callable, Tuple[np.ndarray, np.ndarray]],
input_shape: Tuple[int, ...],
nb_classes: int,
clip_values: Optional["CLIP_VALUES_TYPE"] = None,
preprocessing_defences: Union["Preprocessor", List["Preprocessor"], None] = None,
postprocessing_defences: Union["Postprocessor", List["Postprocessor"], None] = None,
preprocessing: "PREPROCESSING_TYPE" = (0.0, 1.0),
fuzzy_float_compare: bool = False,
):
"""
Create a `Classifier` instance for a black-box model.
:param predict_fn: Function that takes in one input of the data and returns the one-hot encoded predicted class.
:param predict_fn: Function that takes in an `np.ndarray` of input data and returns the one-hot encoded matrix
of predicted classes or tuple of the form `(inputs, labels)` containing the predicted labels for each
input.
:param input_shape: Size of input.
:param nb_classes: Number of prediction classes.
:param clip_values: Tuple of the form `(min, max)` of floats or `np.ndarray` representing the minimum and
Expand All @@ -68,6 +72,9 @@ def __init__(
:param preprocessing: Tuple of the form `(subtrahend, divisor)` of floats or `np.ndarray` of values to be
used for data preprocessing. The first value will be subtracted from the input. The input will then
be divided by the second one.
:param fuzzy_float_compare: If `predict_fn` is a tuple mapping inputs to labels, and this is True, looking up
inputs in the table will be done using `numpy.isclose`. Only set to True if really needed, since this
severely affects performance.
"""
super().__init__(
model=None,
Expand All @@ -76,8 +83,10 @@ def __init__(
postprocessing_defences=postprocessing_defences,
preprocessing=preprocessing,
)

self._predict_fn = predict_fn
if callable(predict_fn):
self._predict_fn = predict_fn
else:
self._predict_fn = _make_lookup_predict_fn(predict_fn, fuzzy_float_compare)
self._input_shape = input_shape
self._nb_classes = nb_classes

Expand Down Expand Up @@ -161,24 +170,27 @@ class BlackBoxClassifierNeuralNetwork(NeuralNetworkMixin, ClassifierMixin, BaseE
NeuralNetworkMixin.estimator_params
+ ClassifierMixin.estimator_params
+ BaseEstimator.estimator_params
+ ["nb_classes", "input_shape", "predict"]
+ ["nb_classes", "input_shape", "predict_fn"]
)

def __init__(
self,
predict: Callable,
predict_fn: Union[Callable, Tuple[np.ndarray, np.ndarray]],
input_shape: Tuple[int, ...],
nb_classes: int,
channels_first: bool = True,
clip_values: Optional["CLIP_VALUES_TYPE"] = None,
preprocessing_defences: Union["Preprocessor", List["Preprocessor"], None] = None,
postprocessing_defences: Union["Postprocessor", List["Postprocessor"], None] = None,
preprocessing: "PREPROCESSING_TYPE" = (0, 1),
fuzzy_float_compare: bool = False,
):
"""
Create a `Classifier` instance for a black-box model.
:param predict: Function that takes in one input of the data and returns the one-hot encoded predicted class.
:param predict_fn: Function that takes in an `np.ndarray` of input data and returns the one-hot encoded matrix
of predicted classes or tuple of the form `(inputs, labels)` containing the predicted labels for each
input.
:param input_shape: Size of input.
:param nb_classes: Number of prediction classes.
:param channels_first: Set channels first or last.
Expand All @@ -191,6 +203,9 @@ def __init__(
:param preprocessing: Tuple of the form `(subtrahend, divisor)` of floats or `np.ndarray` of values to be
used for data preprocessing. The first value will be subtracted from the input. The input will then
be divided by the second one.
:param fuzzy_float_compare: If `predict_fn` is a tuple mapping inputs to labels, and this is True, looking up
inputs in the table will be done using `numpy.isclose`. Only set to True if really needed, since this
severely affects performance.
"""
super().__init__(
model=None,
Expand All @@ -201,7 +216,10 @@ def __init__(
preprocessing=preprocessing,
)

self._predictions = predict
if callable(predict_fn):
self._predict_fn = predict_fn
else:
self._predict_fn = _make_lookup_predict_fn(predict_fn, fuzzy_float_compare)
self._input_shape = input_shape
self._nb_classes = nb_classes
self._learning_phase = None
Expand Down Expand Up @@ -236,7 +254,7 @@ def predict(self, x: np.ndarray, batch_size: int = 128, **kwargs):
batch_index * batch_size,
min((batch_index + 1) * batch_size, x_preprocessed.shape[0]),
)
predictions[begin:end] = self._predictions(x_preprocessed[begin:end])
predictions[begin:end] = self._predict_fn(x_preprocessed[begin:end])

# Apply postprocessing
predictions = self._apply_postprocessing(preds=predictions, fit=False)
Expand Down Expand Up @@ -287,3 +305,89 @@ def loss(self, x: np.ndarray, y: np.ndarray, **kwargs) -> np.ndarray:

def compute_loss(self, x: np.ndarray, y: np.ndarray, **kwargs) -> np.ndarray:
raise NotImplementedError


@total_ordering
class FuzzyMapping:
"""
Class for a sample/label pair to be used in a `SortedList`.
"""

def __init__(self, key: np.ndarray, value=None):
"""
Create an instance of a key/value to pair to be used in a `SortedList`.
:param key: The sample to be matched against.
:param value: The mapped value.
"""
self.key = key
self.value = value

def __eq__(self, other):
return np.all(np.isclose(self.key, other.key))

def __ge__(self, other):
# This implements >= comparison so we can use this class in a `SortedList`. The `total_ordering` decorator
# automatically generates the rest of the comparison magic functions based on this one

close_cells = np.isclose(self.key, other.key)
if np.all(close_cells):
return True

# If the keys are not exactly the same (up to floating-point inaccuracies), we compare the value of the first
# index which is not the same to decide on an ordering

compare_idx = np.unravel_index(np.argmin(close_cells), shape=self.key.shape)
return self.key[compare_idx] >= other.key[compare_idx]


def _make_lookup_predict_fn(existing_predictions: Tuple[np.ndarray, np.ndarray], fuzzy_float_compare: bool) -> Callable:
"""
Makes a predict_fn callback based on a table of existing predictions.
:param existing_predictions: Tuple of (samples, labels).
:param fuzzy_float_compare: Look up predictions using `np.isclose`, only set to True if really needed, since this
severely affects performance.
:return: Prediction function.
"""

samples, labels = existing_predictions

if fuzzy_float_compare:
from sortedcontainers import SortedList

# Construct a search-tree of the predictions, using fuzzy float comparison
sorted_predictions = SortedList([FuzzyMapping(key, value) for key, value in zip(samples, labels)])

def fuzzy_predict_fn(batch):
predictions = []
for row in batch:
try:
match_idx = sorted_predictions.index(FuzzyMapping(row))
except ValueError as err:
raise ValueError("No existing prediction for queried input") from err

predictions.append(sorted_predictions[match_idx].value)

return np.array(predictions)

return fuzzy_predict_fn

# Construct a dictionary to map from samples to predictions. We use the bytes of the `ndarray` as the key,
# because the `ndarray` itself is not hashable
mapping = dict()
for x, y in zip(samples, labels):
mapping[x.tobytes()] = y

def predict_fn(batch):
predictions = []
for row in batch:
row_bytes = row.tobytes()
if row.tobytes() not in mapping:
raise ValueError("No existing prediction for queried input")

predictions.append(mapping[row_bytes])

return np.array(predictions)

return predict_fn
5 changes: 5 additions & 0 deletions notebooks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ versatile classifier of ART requiring only a single predict function definition
requirements. The notebook shows how use BlackBoxClassifier to attack a remote, deployed model (in this case on IBM
Watson Machine Learning, https://cloud.ibm.com) using the HopSkiJump attack.

[classifier_blackbox_lookup_table.ipynb](classifier_blackbox_lookup_table.ipynb) [[on nbviewer](https://nbviewer.jupyter.org/github/Trusted-AI/adversarial-robustness-toolbox/blob/main/notebooks/classifier_blackbox_lookup_table.ipynb)]
demonstrates using BlackBoxClassifier when the adversary does not have access to the model for making predictions, but
does have a set of existing predictions produced before losing access. The notebook shows how to use BlackBoxClassifier
to attack a model using only a table of samples and their labels, using a membership inference black-box attack.

[classifier_blackbox_tesseract.ipynb](classifier_blackbox_tesseract.ipynb) [[on nbviewer](https://nbviewer.jupyter.org/github/Trusted-AI/adversarial-robustness-toolbox/blob/main/notebooks/classifier_blackbox_tesseract.ipynb)]
demonstrates a black-box attack on Tesseract OCR. It uses BlackBoxClassifier and HopSkipJump attack to change the image
of one word into the image of another word and shows how to apply pre-processing defences.
Expand Down
Loading

0 comments on commit 878cce7

Please sign in to comment.