# Securing Machine Learning Models With The HiddenLayer AISec Platform

This tutorial describes how to use the HiddenLayer's AI Security Platform to detect and respond to an attack on a machine learning model. 

Steps:
1. Install libraries needed for the demo
2. Create API credentials in the console
3. Initialize AISec Platform client for demo
4. Scan models from HuggingFace
    1. View scan results in the console
5. View model cards
6. Submit inferences for microsoft/resnet-50
    1. View inferences in the console
7. Run a blackbox attack against microsoft/resnet-50 using ART
    1. View detections in the console

---

# 1. Install libraries needed for demo

In this demo we will use [ART](https://github.com/Trusted-AI/adversarial-robustness-toolbox) to attack the resnet-50 model. We will also need libraries from HuggingFace to download the models and datasets used for the demo.

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [1]:
# imports for demo
import os
import asyncio
import base64
import httpx
import numpy
import huggingface_hub
import pandas
import requests
import time
import torch

from art.estimators.classification import BlackBoxClassifier
from art.attacks.evasion import HopSkipJump
from art.utils import to_categorical
from datetime import datetime
from datasets import load_dataset
from huggingface_hub import hf_hub_download
from matplotlib import pyplot as plt
from transformers import AutoImageProcessor, ResNetForImageClassification
from time import sleep
from tqdm import tqdm
from typing import Any, Callable, Optional
from random import randint
from requests.auth import HTTPBasicAuth

class AISecClient(object):
    def __init__(self, client_id:str, client_secret: str, api_url: str = "https://api.hiddenlayer.ai", retry_count: int = 3):
        self.client_id = client_id
        self.client_secret = client_secret
        self.client_auth = HTTPBasicAuth(client_id, client_secret)
        self.base_url = api_url
        self.token_url = "https://auth.hiddenlayer.ai/oauth2/token?grant_type=client_credentials"
        self.retry_count = retry_count
        self.session = requests.Session()
        self.async_client = httpx.AsyncClient(timeout=None)

    @staticmethod
    def b64str(b: bytes) -> str:
        return base64.b64encode(b).decode()
    
    def _set_token(self) -> str:
        resp = requests.post(self.token_url, auth=self.client_auth)
        if resp.status_code != 200:
            raise Exception("Error: Unable to retrieve token")
        self.token = resp.json()["access_token"]
        self.session.headers = {"Authorization": f"Bearer {self.token}"}
        self.async_client.headers = {"Authorization": f"Bearer {self.token}"}

    def _request_handler(self, meth: Callable, url: str, **kwargs: Any) -> requests.Response:
        resp = None
        for i in range(self.retry_count + 1):
            resp = meth(url, **kwargs)
            if resp.status_code == 401:
                self._set_token()
                continue
            elif resp.status_code < 500:
                break
            sleep(randint(1, i + 1) / 100)  # nosec
        return resp

    async def _async_request_handler(self, meth: Callable, url: str, **kwargs: Any) -> requests.Response:
        resp = None
        for i in range(self.retry_count + 1):
            resp = await meth(url, **kwargs)
            if resp.status_code == 401:
                self._set_token()
                continue
            elif resp.status_code < 500:
                break
            sleep(randint(1, i + 1) / 100)  # nosec
        return resp
    
    def create_sensor(self, name: str, tags: dict = None):
        tags = {} if tags is None else tags
        resp = self._request_handler(self.session.post, f"{self.base_url}/api/v2/sensors/create", json={"plaintext_name": name, "active": True, "tags": tags})
        if not resp.ok:
            raise Exception("Failed to create sensor - demo will not work properly")
        data = resp.json()
        print(f"View model: https://console.hiddenlayer.ai/model-details/{data['model_id']}")
        return data
    
    def get_sensor_by_name_version(self, name: str, version: int):
        resp = self._request_handler(self.session.post, f"{self.base_url}/api/v2/sensors/query", json={"filter":{"plaintext_name": name, "version": version}})
        if not resp.ok:
            raise Exception("Failed to create sensor - demo will not work properly")
        return resp.json()["results"][0]

    def get_or_create_sensor(self, name: str, version: int = 1, tags: dict = None):
        sensor = None
        tags = {} if tags is None else tags
        resp = self._request_handler(self.session.post, f"{self.base_url}/api/v2/sensors/query", json={"filter":{"plaintext_name": name, "version": version}})
        if resp.ok:
            content = resp.json()
            if content["total_count"] >= 1:
                sensor = content["results"][0]

        if sensor is None:
            sensor = self.create_sensor(name, tags=tags)
            
        return sensor    

    async def scan_huggingface_repo(self, repo: str):
        sensors_url = f"{self.base_url}/api/v2/sensors"
        scanning_url = f"{self.base_url}/api/v2/submit/sensors"
        extensions = [".bin", ".h5", ".safetensors"]
        model_info = huggingface_hub.model_info(repo)
        filename = None
        for ext in extensions:
            for s in model_info.siblings:
                if ("/" not in s.rfilename):
                    if s.rfilename.endswith(ext):
                        filename = s.rfilename
                        break
            if filename is not None:
                break

        if filename is None:
            raise Exception(f"Unable to find model to scan in repository: {repo}")

        print(f"Downloading {repo}/{filename} from HuggingFace...")
        filepath_to_model = hf_hub_download(repo_id=repo, filename=filename)
        with open(filepath_to_model, "rb") as fin:
            data = fin.read()
    
        # create sensor
        sensor = self.create_sensor(repo, tags={"env": "demo", "source": "huggingface"})
        sensor_id = sensor["sensor_id"]
        model_id = sensor["model_id"]
    
        # start multipart upload
        print(f"Starting upload for {repo}/{filename}")
        headers = {"X-Content-Length": str(len(data))}
        resp = self.session.post(f"{sensors_url}/{sensor_id}/upload/begin", headers=headers)
        if not resp.ok:
            raise Exception(f"Failed to start multipart upload for {repo}. Demo will not run correctly")
        multipart = resp.json()
    
        # upload parts
        chunk = 8
        upload_id = multipart["upload_id"]
        for i in tqdm(range(0, len(multipart["parts"]), chunk)):
            upload_tasks = []
            group = multipart["parts"][i:i+chunk]
            for p in group:
                part_number = p["part_number"]
                part_data = data[p["start_offset"]:p["end_offset"]]
                t = self._async_request_handler(self.async_client.put, f"{sensors_url}/{sensor_id}/upload/{upload_id}/part/{part_number}", data=part_data, headers=headers)
                upload_tasks.append(t)
            results = await asyncio.gather(*upload_tasks)
    
        # complete multipart upload
        print(f"Completed upload for {repo}/{filename}")
        resp = self.session.post(f"{sensors_url}/{sensor_id}/upload/{upload_id}/complete")
        if not resp.ok:
            raise Exception(f"Failed to complete upload for {repo}. Demo will not run correctly")
    
        # kick off scan for sensor id
        resp = self.session.post(f"{scanning_url}/{sensor_id}/scan")
        if not resp.ok:
            raise Exception("Failed to start model scan. Demo will not run correctly")
        print(f"Scan initiated for {repo}/{filename}. Results will be available shortly at: https://console.hiddenlayer.ai/model-details/{model_id}")
        
    
    def submit_to_mldr(
        self,
        sensor_id: str, 
        requester_id: str,
        input_layer: numpy.ndarray,
        output_layer: numpy.ndarray,
        event_time: str = None,
        predictions: list = None,
        metadata: dict = None
    ) -> None:
        submission_url = f"{self.base_url}/api/v2/submit"
        payload = {
            "sensor_id": sensor_id,
            "requester_id": requester_id,
            "event_time": str(datetime.now().isoformat()) if event_time is None else event_time,
            "input_layer": self.b64str(input_layer.tobytes()),
            "input_layer_shape": list(input_layer.shape),
            "input_layer_dtype": str(input_layer.dtype),
            "output_layer": self.b64str(output_layer.tobytes()),
            "output_layer_shape": list(output_layer.shape),
            "output_layer_dtype": str(output_layer.dtype),
            "predictions": [] if predictions is None else predictions,
            "metadata": {} if metadata is None else metadata,
        }
        resp = self._request_handler(self.session.post, submission_url, json=payload)
        if not resp.ok:
            print("Failed to submit model inference to HiddenLayer")
            print(resp.content)



ModuleNotFoundError: No module named 'h11'

In [3]:
# setup session to use for demo
# ADD CLIENT ID AND CLIENT SECRET TO RUN NOTEBOOK
client_id = os.getenv('HIDDENLAYER_CLIENT_ID')
client_secret = os.getenv('HIDDENLAYER_CLIENT_SECRET')
client = AISecClient(client_id, client_secret)

# 2. Create API credentials in the console

Navigate to https://console.hiddenlayer.ai/admin?activeTab=apiKeys and create new API credentials to use for this demo. Copy and paste the client id and client secret into the next step of the notebook

# 3. Initalize AISec Platform client for demo

# 4. Scan models from HuggingFace

1. Scan [gpt2](https://huggingface.co/gpt2/tree/main)
2. Scan [drhyrum/bert-tiny-torch-vuln](https://huggingface.co/drhyrum/bert-tiny-torch-vuln)
3. Scan [microsoft/resnet-50](https://huggingface.co/microsoft/resnet-50)

In [4]:
# Scan gpt2
await client.scan_huggingface_repo("gpt2")

Downloading gpt2/pytorch_model.bin from HuggingFace...


Downloading pytorch_model.bin:   0%|          | 0.00/548M [00:00<?, ?B/s]

View model: https://console.hiddenlayer.ai/model-details/89dadee6-00ef-47a3-8788-07a0fd46d1df
Starting upload for gpt2/pytorch_model.bin


100%|████████████████████████████████████████████████████████████████████████████████████| 9/9 [00:12<00:00,  1.34s/it]


Completed upload for gpt2/pytorch_model.bin
Scan initiated for gpt2/pytorch_model.bin. Results will be available shortly at: https://console.hiddenlayer.ai/model-details/89dadee6-00ef-47a3-8788-07a0fd46d1df


In [None]:
# Scan drhyrum/bert-tiny-torch-vuln
await client.scan_huggingface_repo("drhyrum/bert-tiny-torch-vuln")

In [None]:
# Scan microsoft/resnet-50
await client.scan_huggingface_repo("microsoft/resnet-50")

# 5. View model cards in console

Navigate to https://console.hiddenlayer.ai/model-inventory to view newly created model cards

# 6. Monitor inferences for microsoft/resnet-50

Now that the `microsoft/resnet-50` model has been scanned and no detections were found the model is safe to load for inference. 


In [None]:
# load the microsoft/resnet-50 model
processor = AutoImageProcessor.from_pretrained("microsoft/resnet-50")
model = ResNetForImageClassification.from_pretrained("microsoft/resnet-50")
sensor = client.get_sensor_by_name_version("microsoft/resnet-50", 1)
# load dataset
dataset = load_dataset("zh-plus/tiny-imagenet")

# iterate over a subset of the dataset and classify images using resnet-50
with torch.no_grad():
    for img in dataset["valid"]["image"][:1]:
        pimg = processor(img, return_tensors="pt")
        predictions = model(pimg["pixel_values"])
        label = predictions.logits.argmax(-1).item()
        text_label = model.config.id2label[label]
        client.submit_to_mldr(sensor["sensor_id"], "resnet-50-demo", pimg["pixel_values"].numpy(), predictions.logits.numpy(), predictions=[label])
        display(img)
        print(f"Label: {text_label}")



## 7a. View model inferences in console

In the console, all inferences can be viewed here at https://console.hiddenlayer.ai/inferences. Details of each inference can also be viewed. Inferences for a specific model can be viewed by navigating to the model card then choosing the Inferences tab. 

# 8. Run a blackbox attack against microsoft/resnet-50 using ART
In this step, we will run a hopskipjump attack against the resnet-50 model. This attack is used to generate adversarial samples against the model that can be used to evade a certain classificaiton or even create a sample to get a specific classification.


# Evasion

In [None]:

# create a custom predict method to use for the attack
sensor = client.get_sensor_by_name_version("microsoft/resnet-50", 1)
sensor_id = sensor["sensor_id"]

def predict(inputs):
    probs = []
    for i in inputs:
        a = numpy.array([i])
        t = torch.from_numpy(a)
        p = model(t)
        l = p.logits.argmax(-1).item()
        o = p.logits.detach().numpy()
        probs.append(o[0])
        client.submit_to_mldr(sensor_id, "resnet-50-demo-attack", a, o, predictions=[l])
    return probs

# show starting image and current label
image = dataset["valid"]["image"][3]
pimg = processor(image, return_tensors="pt")
predictions = model(pimg["pixel_values"])
pimg_arr = pimg["pixel_values"].detach().numpy()
label = predictions.logits.argmax(-1).item()
text_label = model.config.id2label[label]
print("Base image...")
display(image)
print(f"Label: {text_label}")
print("----------------------------------")

# create attack classifier
attack_classifier = BlackBoxClassifier(predict_fn=predict, nb_classes=1000, input_shape=list(pimg_arr[0].shape))

# create attack object with params for the attack
hopskipjump_attack = HopSkipJump(classifier=attack_classifier, max_iter=0, max_eval=1000, init_eval=10, targeted=False)

print("Starting attack...")
iter_step = 10
for i in range(5):
    x_adv = hopskipjump_attack.generate(x=pimg_arr, resume=True)
    t = torch.from_numpy(x_adv)
    p = model(t)
    l = p.logits.argmax(-1).item()
    tl = model.config.id2label[l]
    
    plt.imshow(t[0].squeeze().permute(1,2,0))
    plt.show(block=False)
    print(f"Class label: {l}, {tl}")
    hopskipjump_attack.max_iter = iter_step
print("Attack complete")



In [None]:
2465/5

## 8a. View detections in console

All detections can be viewed here at https://console.hiddenlayer.ai/inference-detections?activeTab=overview. The overview provides a list of each detection and the [dashboard](https://console.hiddenlayer.ai/inference-detections?activeTab=dashboard) is an aggregated view of the detections. Detections for a specific model can be viewed by navigating to the model card then choosing the Detections tab. 

