# Tensorflow MNIST Classifier demo for a Dioptra deployment

This notebook contains an end-to-end demostration for the Dioptra architecture that is deployed on the DGX workstation.

## Setup

**Note:** This demo is specifically for the NCCoE DGX Workstation with hostname `dgx-station-2`.

Port forwarding is required in order to run this demo.
The recommended port mapping is as follows:

- Map `*:20080` on laptop to `localhost:30080` on `dgx-station-2`
- Map `*:25000` on laptop to `localhost:35000` on `dgx-station-2`

A sample SSH config file that enables the above port forwarding is provided below,

> ⚠️ **Edits required**: replace `username` with your assigned username _on the NCCoE virtual machines_!

```conf
# vm hostname: jumphost001
Host nccoe-jumphost001
    Hostname 10.33.53.98
    User username  # Change to your assigned username on the NCCoE virtual machines!
    Port 54131
    IdentityFile %d/.ssh/nccoe-vm

# vm hostname: dgx-station-2
Host nccoe-k8s-gpu002
    Hostname 192.168.1.28
    User username  # Change to your assigned username on the NCCoE virtual machines!
    Port 22
    IdentityFile %d/.ssh/nccoe-vm
    ProxyJump nccoe-jumphost001
    LocalForward *:20080 localhost:30080
    LocalForward *:25000 localhost:35000
```

Now, connect to the NCCoE VPN and SSH into the DGX Workstation,

```bash
ssh nccoe-k8s-gpu002
```

Next, we import the necessary Python modules and ensure the proper environment variables are set so that all the code blocks will work as expected.

> ⚠️ **Edits possibly required**: update the value of the `HOST_DOCKER_INTERNAL` variable

If this notebook is being served to you via Docker (i.e. you ran `make jupyter` to launch this notebook), **then you may need to change the value assigned to the variable `HOST_DOCKER_INTERNAL`** to make the port forwarding you configured in the previous step accessible within the container.
The value you need to assign to the variable to depends on your host device's operating system:

- **Case 1: Host operating system is Windows 10 or MacOS**
  - Set `HOST_DOCKER_INTERNAL = "host.docker.internal"`. This is the default setting.
- **Case 2: Host operating system is Linux**
  - Run either `ip address` or `ifconfig` to print a list of the available network interfaces on your host device
  - Locate the `docker0` interface and take note of the associated IP address (this is commonly set to `172.17.0.1`)
  - Set `HOST_DOCKER_INTERNAL` equal to the IP address for the `docker0` interface. So, if the IP address was `172.17.0.1`, then you would set `HOST_DOCKER_INTERNAL = "172.17.0.1"`
  
If you started your Jupyter Lab instance from a conda environment, then you do not need to change anything.
The code below uses an environment variable to check whether this notebook is being served via the `jupyter` service, and if that variable isn't found, then the connection address reverts to `localhost` and ignores the `HOST_DOCKER_INTERNAL` variable.

In [None]:
# Import packages from the Python standard library
import os
import pprint
import time
import warnings
from pathlib import Path
from typing import Tuple

# Filter out warning messages
warnings.filterwarnings("ignore")

# Address for connecting the docker container to exposed ports on the host device (see above instructions for details)
HOST_DOCKER_INTERNAL = "host.docker.internal"
# HOST_DOCKER_INTERNAL = "172.17.0.1"

# Default address for accessing the RESTful API service
RESTAPI_ADDRESS = (
    f"http://{HOST_DOCKER_INTERNAL}:20080"
    if os.getenv("IS_JUPYTER_SERVICE")
    else "http://localhost:20080"
)

# Override the DIOPTRA_RESTAPI_URI variable, used to connect to RESTful API service
os.environ["DIOPTRA_RESTAPI_URI"] = RESTAPI_ADDRESS

# Default address for accessing the MLFlow Tracking server
MLFLOW_TRACKING_URI = (
    f"http://{HOST_DOCKER_INTERNAL}:25000"
    if os.getenv("IS_JUPYTER_SERVICE")
    else "http://localhost:25000"
)

# Override the MLFLOW_TRACKING_URI variable, used to connect to MLFlow Tracking service
os.environ["MLFLOW_TRACKING_URI"] = MLFLOW_TRACKING_URI

# Base API address
RESTAPI_API_BASE = f"{RESTAPI_ADDRESS}/api"

# Path to custom task plugins archives
CUSTOM_PLUGINS_EVALUATION_TAR_GZ = Path("custom-plugins-evaluation.tar.gz")

# Path to workflows archive
WORKFLOWS_TAR_GZ = Path("workflows.tar.gz")

# Experiment name (note the username_ prefix convention, you should change this to match your username)
EXPERIMENT_NAME = "dioptrauser_mnist"

# Import third-party Python packages
import numpy as np
import requests
from mlflow.tracking import MlflowClient

# Import utils.py file
import utils

# Create random number generator
rng = np.random.default_rng(54399264723942495723666216079516778448)

## Dataset

We obtained a copy of the MNIST dataset as part of the startup process invoked by `make demo`.
The training and testing images for the MNIST dataset are stored within the `data/` directory as PNG files that are organized into the following folder structure,

    Mnist
    ├── testing
    │   ├── 0
    │   ├── 1
    │   ├── 2
    │   ├── 3
    │   ├── 4
    │   ├── 5
    │   ├── 6
    │   ├── 7
    │   ├── 8
    │   └── 9
    └── training
        ├── 0
        ├── 1
        ├── 2
        ├── 3
        ├── 4
        ├── 5
        ├── 6
        ├── 7
        ├── 8
        └── 9

The subfolders under `Mnist/training/` and `Mnist/testing/` are the classification labels for the images in the dataset.
This folder structure is a standardized way to encode the label information and many libraries can make use of it, including the Tensorflow library that we are using for this particular demo.

## Submit and run jobs

The entrypoints that we will be running in this example are implemented in the Python source files under `src/` and the `MLproject` file.
To run these entrypoints within Dioptra's architecture, we need to package those files up into an archive and submit it to the Dioptra RESTful API to create a new job.
For convenience, the `Makefile` provides a rule for creating the archive file for this example, just run `make workflows`,

In [None]:
%%bash

# Create the workflows.tar.gz file
make workflows

To connect with the endpoint, we will use a client class defined in the `utils.py` file that is able to connect with the Dioptra RESTful API using the HTTP protocol.
We connect using the client below, which uses the environment variable `DIOPTRA_RESTAPI_URI` to figure out how to connect to the Dioptra RESTful API,

In [None]:
restapi_client = utils.DioptraClient()

We need to register an experiment under which to collect our job runs.
The code below checks if the relevant experiment (named `"dioptrauser_mnist"` by default) exists.
If it does, then it just returns info about the experiment, if it doesn't, it then registers the new experiment.

In [None]:
response_experiment = restapi_client.get_experiment_by_name(name=EXPERIMENT_NAME)

if response_experiment is None or "Not Found" in response_experiment.get("message", []):
    response_experiment = restapi_client.register_experiment(name=EXPERIMENT_NAME)

response_experiment

We should also check which queues are available for running our jobs to make sure that the resources that we need are available.
The code below queries the Lab API and returns a list of active queues.
We expect two queues to be available, `"tensorflow_cpu"` and `"tensorflow_gpu"`.

In [None]:
restapi_client.list_queues()

This example also makes use of the `evaluation` custom task plugin package stored locally under the `task-plugins/dioptra_custom/evaluation` directory.
To register these custom task plugins, we first need to package them up into an archive.
For convenience, the `Makefile` provides a rule for creating the custom task plugins archive file, just run `make custom-plugins`,

In [None]:
%%bash

# Create the workflows.tar.gz file
make custom-plugins

Now that the `evaluation` custom task plugin package is packaged into an archive file, next we register it by uploading the file to the REST API.
Note that we need to provide the name to use for custom task plugin package, this name must be unique under the custom task plugins namespace.
For a full list of the custom task plugins, use `restapi_client.restapi_client.list_custom_task_plugins()`.

In [None]:
response_custom_plugins = restapi_client.get_custom_task_plugin(name="evaluation")

if response_custom_plugins is None or "Not Found" in response_custom_plugins.get("message", []):
    response_custom_plugins = restapi_client.upload_custom_plugin_package(
        custom_plugin_name="evaluation",
        custom_plugin_file=CUSTOM_PLUGINS_EVALUATION_TAR_GZ,
    )

response_custom_plugins

If at any point you need to update one or more files within the `evaluation` plugin package, you will need to unregister/delete the custom task plugin first using the REST API.
This can be done as follows,

```python
# Delete the 'evaluation' custom task plugin package
restapi_client.delete_custom_task_plugin(name="evaluation")
```

After you have deleted the task plugin from Dioptra, re-run the `make custom-plugins` code block to update the package archive, then upload the updated plugin by re-running the `restapi_client.upload_custom_plugin_packge` block.

Next, we need to train our model.
We will be using the V100 GPUs that are available on the DGX Workstation, which we can use by submitting our job to the `"tensorflow_gpu"` queue.
We will train three models, a shallow network model, a LeNet-5 model, and an AlexNet model,

In [None]:
response_shallow_train = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="train",
    entry_point_kwargs=" ".join([
        "-P batch_size=256",
        f"-P register_model_name={EXPERIMENT_NAME}_shallow_net",
        "-P model_architecture=shallow_net",
        "-P epochs=30",
        "-P data_dir=/nfs/data/Mnist",
    ]),
    queue="tensorflow_gpu",
    timeout="1h",
)

print("Training job for shallow neural network submitted")
print("")
pprint.pprint(response_shallow_train)

In [None]:
response_le_net_train = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="train",
    entry_point_kwargs=" ".join([
        "-P batch_size=256",
        f"-P register_model_name={EXPERIMENT_NAME}_le_net",
        "-P model_architecture=le_net",
        "-P epochs=30",
        "-P data_dir=/nfs/data/Mnist",
    ]),
    queue="tensorflow_gpu",
    timeout="1h",
)

print("Training job for LeNet-5 neural network submitted")
print("")
pprint.pprint(response_le_net_train)

In [None]:
response_alex_net_train = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="train",
    entry_point_kwargs=" ".join([
        "-P batch_size=256",
        f"-P register_model_name={EXPERIMENT_NAME}_alex_net",
        "-P model_architecture=alex_net",
        "-P epochs=40",
        "-P image_size=224,224,1",
        "-P data_dir=/nfs/data/Mnist",
    ]),
    queue="tensorflow_gpu",
    timeout="6h",
)

print("Training job for AlexNet neural network submitted")
print("")
pprint.pprint(response_alex_net_train)

Now that we have three trained models (the shallow network, the LeNet-5 network, and AlexNet), next we will apply the fast-gradient method (FGM) evasion attack on the shallow network to generate adversarial images.
Then, after we have the adversarial images, we will use them to evaluate some standard machine learning metrics against both models.
This will give us a sense of the transferability of the attacks between models.

This specific workflow is an example of jobs that contain dependencies, as the metric evaluation jobs cannot start until the adversarial image generation jobs have completed.
Dioptra allows users to declare one-to-many job dependencies like this, which we will use to queue up jobs to start immediately after the previous jobs have concluded.
The code below illustrates this by doing the following:

1. A job is submitted that generates adversarial images based on the shallow net architecture (entry point **fgm**).
1. We wait until the job starts and a MLFlow identifier is assigned, which we check by polling the API until we see the id appear.
1. Once we have an id returned to us from the API, we queue up the metrics evaluation jobs and declare the job dependency using the `depends_on` option.
1. The message "Dependent jobs submitted" will display once everything is queued up.

In [None]:
def mlflow_run_id_is_not_known(response_fgm):
    return response_fgm["mlflowRunId"] is None and response_fgm["status"] not in [
        "failed",
        "finished",
    ]

response_fgm_shallow_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="fgm",
    entry_point_kwargs=" ".join(
        [
            f"-P model_name={EXPERIMENT_NAME}_shallow_net",
            "-P model_version=1",
            "-P data_dir=/nfs/data/Mnist",
            "-P batch_size=512",
        ]
    ),
)

print("FGM attack (shallow net architecture) job submitted")
print("")
pprint.pprint(response_fgm_shallow_net)
print("")

while mlflow_run_id_is_not_known(response_fgm_shallow_net):
    time.sleep(1)
    response_fgm_shallow_net = restapi_client.get_job_by_id(
        response_fgm_shallow_net["jobId"]
    )

response_shallow_net_infer_shallow_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_shallow_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_shallow_net",
            "-P model_version=1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_shallow_net["jobId"],
)

response_le_net_infer_shallow_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_shallow_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_le_net",
            "-P model_version=1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_shallow_net["jobId"],
)

response_alex_net_infer_shallow_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_shallow_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_alex_net",
            "-P model_version=1",
            "-P image_size=224,224,1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_shallow_net["jobId"],
)

print("Dependent jobs submitted")

We can poll the status of the dependent jobs using the code below.
We should see the status of the jobs shift from "queued" to "started" and eventually become "finished".

In [None]:
response_shallow_net_infer_shallow_net = restapi_client.get_job_by_id(
    response_shallow_net_infer_shallow_net["jobId"]
)
response_le_net_infer_shallow_net = restapi_client.get_job_by_id(
    response_le_net_infer_shallow_net["jobId"]
)
response_alex_net_infer_shallow_net = restapi_client.get_job_by_id(
    response_alex_net_infer_shallow_net["jobId"]
)

pprint.pprint(response_shallow_net_infer_shallow_net)
print("")
pprint.pprint(response_le_net_infer_shallow_net)
print("")
pprint.pprint(response_alex_net_infer_shallow_net)

We can similiarly run an FGM-based evasion attack using the LeNet-5 architecture as our starting point.
The following code is very similar to the code we just saw, all we've done is swap out the entry point keyword argument that requests the shallow net architecture with a version that requests the LeNet-5 architecture.

In [None]:
response_fgm_le_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="fgm",
    entry_point_kwargs=" ".join(
        [
            f"-P model_name={EXPERIMENT_NAME}_le_net",
            "-P model_version=1",
            "-P data_dir=/nfs/data/Mnist",
            "-P batch_size=512",
        ]
    ),
)

print("FGM attack (LeNet-5 architecture) job submitted")
print("")
pprint.pprint(response_fgm_le_net)
print("")

while mlflow_run_id_is_not_known(response_fgm_le_net):
    time.sleep(1)
    response_fgm_le_net = restapi_client.get_job_by_id(response_fgm_le_net["jobId"])

response_shallow_net_infer_le_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_le_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_shallow_net",
            "-P model_version=1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_le_net["jobId"],
)

response_le_net_infer_le_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_le_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_le_net",
            "-P model_version=1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_le_net["jobId"],
)

response_alex_net_infer_le_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_le_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_alex_net",
            "-P model_version=1",
            "-P image_size=224,224,1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_le_net["jobId"],
)

print("Dependent jobs submitted")

Like before, we can monitor the status of the dependent jobs by querying the API using the code block below.

In [None]:
response_shallow_net_infer_le_net_fgm = restapi_client.get_job_by_id(
    response_shallow_net_infer_le_net_fgm["jobId"]
)
response_le_net_infer_le_net_fgm = restapi_client.get_job_by_id(
    response_le_net_infer_le_net_fgm["jobId"]
)
response_alex_net_infer_le_net_fgm = restapi_client.get_job_by_id(
    response_alex_net_infer_le_net_fgm["jobId"]
)

pprint.pprint(response_shallow_net_infer_le_net_fgm)
print("")
pprint.pprint(response_le_net_infer_le_net_fgm)
print("")
pprint.pprint(response_alex_net_infer_le_net_fgm)

Finally, we run an FGM-based evasion attack using the AlexNet architecture as our starting point.

In [None]:
response_fgm_alex_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="fgm",
    entry_point_kwargs=" ".join(
        [
            f"-P model_name={EXPERIMENT_NAME}_alex_net",
            "-P model_version=1",
            "-P data_dir=/nfs/data/Mnist",
            "-P batch_size=64",
            "-P image_size=224,224,1",
        ]
    ),
)

print("FGM attack (AlexNet architecture) job submitted")
print("")
pprint.pprint(response_fgm_alex_net)
print("")

while mlflow_run_id_is_not_known(response_fgm_alex_net):
    time.sleep(1)
    response_fgm_alex_net = restapi_client.get_job_by_id(response_fgm_alex_net["jobId"])

response_shallow_net_infer_alex_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_alex_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_shallow_net",
            "-P model_version=1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_alex_net["jobId"],
)

response_le_net_infer_alex_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_alex_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_le_net",
            "-P model_version=1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_alex_net["jobId"],
)

response_alex_net_infer_alex_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name=EXPERIMENT_NAME,
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_alex_net['mlflowRunId']}",
            f"-P model_name={EXPERIMENT_NAME}_alex_net",
            "-P model_version=1",
            "-P image_size=224,224,1",
        ]
    ),
    queue="tensorflow_gpu",
    depends_on=response_fgm_alex_net["jobId"],
)

print("Dependent jobs submitted")

Like always, we can monitor the status of the dependent jobs by querying the API using the code block below.

In [None]:
response_shallow_net_infer_alex_net_fgm = restapi_client.get_job_by_id(
    response_shallow_net_infer_alex_net_fgm["jobId"]
)
response_le_net_infer_alex_net_fgm = restapi_client.get_job_by_id(
    response_le_net_infer_alex_net_fgm["jobId"]
)
response_alex_net_infer_alex_net_fgm = restapi_client.get_job_by_id(
    response_alex_net_infer_alex_net_fgm["jobId"]
)

pprint.pprint(response_shallow_net_infer_alex_net_fgm)
print("")
pprint.pprint(response_le_net_infer_alex_net_fgm)
print("")
pprint.pprint(response_alex_net_infer_alex_net_fgm)

Congratulations, you've just run your first experiment using the DGX deployment of Dioptra!

## Querying the MLFlow Tracking Service

Currently the Dioptra API can only be used to register experiments and start jobs, so if users wish to extract their results programmatically, they can use the `MlflowClient()` class from the `mlflow` Python package to connect and query their results.
Since we captured the run ids generated by MLFlow, we can easily retrieve the data logged about one of our jobs and inspect the results.
To start the client, we simply need to run,

In [None]:
mlflow_client = MlflowClient()

The client uses the environment variable `MLFLOW_TRACKING_URI` to figure out how to connect to the MLFlow Tracking Service, which we configured near the top of this notebook.
To query the results of one of our runs, we just need to pass the run id to the client's `get_run()` method.
As an example, let's query the run results for the FGM attack applied to the LeNet-5 architecture,

In [None]:
fgm_run_le_net = mlflow_client.get_run(response_fgm_le_net["mlflowRunId"])

If the request completed successfully, we should now be able to query data collected during the run.
For example, to review the collected metrics, we just use,

In [None]:
pprint.pprint(fgm_run_le_net.data.metrics)

To review the run's parameters, we use,

In [None]:
pprint.pprint(fgm_run_le_net.data.params)

To review the run's tags, we use,

In [None]:
pprint.pprint(fgm_run_le_net.data.tags)

There are many things you can query using the MLFlow client.
[The MLFlow documentation gives a full overview of the methods that are available](https://www.mlflow.org/docs/latest/python_api/mlflow.tracking.html#mlflow.tracking.MlflowClient).