# Tensorflow MNIST Classifier demo

This notebook contains an end-to-end demostration of Dioptra that can be run on any modern laptop.

## Setup

Below we import the necessary Python modules and ensure the proper environment variables are set so that all the code blocks will work as expected,

In [1]:
# Import packages from the Python standard library
import os
import pprint
import time
import warnings
from pathlib import Path
from typing import Tuple

# Filter out warning messages
warnings.filterwarnings("ignore")

# Default address for accessing the RESTful API service
RESTAPI_ADDRESS = "http://localhost"

# Set DIOPTRA_RESTAPI_URI variable if not defined, used to connect to RESTful API service
if os.getenv("DIOPTRA_RESTAPI_URI") is None:
    os.environ["DIOPTRA_RESTAPI_URI"] = RESTAPI_ADDRESS

# Default address for accessing the MLFlow Tracking server
MLFLOW_TRACKING_URI = "http://localhost:35000"

# Set MLFLOW_TRACKING_URI variable, used to connect to MLFlow Tracking service
if os.getenv("MLFLOW_TRACKING_URI") is None:
    os.environ["MLFLOW_TRACKING_URI"] = MLFLOW_TRACKING_URI

# Path to custom task plugins archives
CUSTOM_PLUGINS_EVALUATION_TAR_GZ = Path("custom-plugins-evaluation.tar.gz")

# Path to workflows archive
WORKFLOWS_TAR_GZ = Path("workflows.tar.gz")

# Import third-party Python packages
import numpy as np
import requests
from mlflow.tracking import MlflowClient

# Import utils.py file
import utils

# Create random number generator
rng = np.random.default_rng(54399264723942495723666216079516778448)

## Dataset

We obtained a copy of the MNIST dataset as part of the startup process invoked by `make demo`.
The training and testing images for the MNIST dataset are stored within the `data/` directory as PNG files that are organized into the following folder structure,

    data
    ├── testing
    │   ├── 0
    │   ├── 1
    │   ├── 2
    │   ├── 3
    │   ├── 4
    │   ├── 5
    │   ├── 6
    │   ├── 7
    │   ├── 8
    │   └── 9
    └── training
        ├── 0
        ├── 1
        ├── 2
        ├── 3
        ├── 4
        ├── 5
        ├── 6
        ├── 7
        ├── 8
        └── 9

The subfolders under `data/training/` and `data/testing/` are the classification labels for the images in the dataset.
This folder structure is a standardized way to encode the label information and many libraries can make use of it, including the Tensorflow library that we are using for this particular demo.

Let's take a look at a sampling of the downloaded training images to confirm that there aren't any problems,

In [20]:
mnist_training_samples = sorted(
    rng.choice(list(Path("data/training").glob("*/*.png")), size=20).tolist()
)
utils.notebook_gallery(mnist_training_samples, row_height="140px")

ValueError: a cannot be empty unless no samples are taken

If the images downloaded correctly, you should see a set of images that look like hand-written numbers.

## Dioptra microservices

Dioptra is composed of several micro-services that are used to manage job execution, artifact storage, and logging the results of experiments.
These services all run within separate containers that were instantiated via Docker images that you either built locally or pulled into your environment from a Docker image registry.
A high-level schematic showing how all of the images connect together to form the architecture of Dioptra is provided below.

![](dioptra_architecture.png)

The startup process for all of these services, including database initialization and synchronizing the task plugins into the Minio S3 storage service, was handled automatically when you invoked `make demo`.

## Submit and run jobs

The entrypoints that we will be running in this example are implemented in the Python source files under `src/` and the `MLproject` file.
To run these entrypoints within Dioptra's architecture, we need to package those files up into an archive and submit it to the Dioptra RESTful API to create a new job.
For convenience, the `Makefile` provides a rule for creating the archive file for this example, just run `make workflows`,

In [2]:
utils.workflows()

PosixPath('/home/alex/dioptra/examples/tensorflow-mnist-classifier/workflows.tar.gz')

`make workflows` was invoked as part of the `make demo` startup procedure, so unless you edited the `MLproject` file or one of the files under `src/`, you will likely see a message of _make: Nothing to be done for 'workflows'_.

To connect with the endpoint, we will use a client class defined in the `utils.py` file that is able to connect with the Dioptra RESTful API using the HTTP protocol.
We connect using the client below, which uses the environment variable `DIOPTRA_RESTAPI_URI` to figure out how to connect to the Dioptra RESTful API,

In [3]:
restapi_client = utils.DioptraClient()

We need to register an experiment under which to collect our job runs.
The code below checks if the relevant experiment named `"mnist"` exists.
If it does, then it just returns info about the experiment, if it doesn't, it then registers the new experiment.

In [4]:
response_experiment = restapi_client.get_experiment_by_name(name="mnist")

if response_experiment is None or "Not Found" in response_experiment.get("message", []):
    response_experiment = restapi_client.register_experiment(name="mnist")

response_experiment

{'experimentId': 1,
 'name': 'mnist',
 'createdOn': '2023-03-17T18:59:17.768727',
 'lastModified': '2023-03-17T18:59:17.768727'}

We also need to register the name of the queue that is being watched for our jobs.
The code below checks if the relevant queue named `"tensorflow_cpu"` exists.
If it does, then it just returns info about the queue, if it doesn't, it then registers the new queue.

In [5]:
response_queue = restapi_client.get_queue_by_name(name="tensorflow_cpu")

if response_queue is None or "Not Found" in response_queue.get("message", []):
    response_queue = restapi_client.register_queue(name="tensorflow_cpu")

response_queue

{'name': 'tensorflow_cpu',
 'createdOn': '2023-03-17T19:23:09.732582',
 'queueId': 1,
 'lastModified': '2023-03-17T19:23:09.732582'}

This example also makes use of the `evaluation` custom task plugin package stored locally under the `task-plugins/dioptra_custom/evaluation` directory.
To register these custom task plugins, we first need to package them up into an archive.
For convenience, the `Makefile` provides a rule for creating the custom task plugins archive file, just run `make custom-plugins`,

In [6]:
# %%bash

# # Create the workflows.tar.gz file
# make custom-plugins
utils.custom_plugins()

PosixPath('task-plugins/dioptra_custom/evaluation/custom-plugins-evaluation.tar.gz')

Now that the `evaluation` custom task plugin package is packaged into an archive file, next we register it by uploading the file to the REST API.
Note that we need to provide the name to use for custom task plugin package, this name must be unique under the custom task plugins namespace.
For a full list of the custom task plugins, use `restapi_client.restapi_client.list_custom_task_plugins()`.

In [7]:
response_custom_plugins = restapi_client.get_custom_task_plugin(name="evaluation")

if response_custom_plugins is None or "Not Found" in response_custom_plugins.get("message", []):
    response_custom_plugins = restapi_client.upload_custom_plugin_package(
        custom_plugin_name="evaluation",
        custom_plugin_file=CUSTOM_PLUGINS_EVALUATION_TAR_GZ,
    )

response_custom_plugins

{'collection': 'dioptra_custom',
 'modules': ['tensorflow.py', 'import_keras.py', '__init__.py'],
 'taskPluginName': 'evaluation'}

If at any point you need to update one or more files within the `evaluation` plugin package, you will need to unregister/delete the custom task plugin first using the REST API.
This can be done as follows,

```python
# Delete the 'evaluation' custom task plugin package
restapi_client.delete_custom_task_plugin(name="evaluation")
```

After you have deleted the task plugin from Dioptra, re-run the `make custom-plugins` code block to update the package archive, then upload the updated plugin by re-running the `restapi_client.upload_custom_plugin_packge` block.

Next, we need to train our model.
Depending on the specs of your computer, training either the shallow net model or the LeNet-5 model on a CPU can take 10-20 minutes or longer to complete.
If you are fortunate enough to have access to a dedicated GPU, then the training time will be much shorter.
So that we do not start this code by accident, we are embedding the code in a text block instead of keeping it in an executable code block.
**If you need to train one of the models, create a new code block and copy and paste the code into it.**

```python
# Submit training job for the shallow network architecture
response_shallow_train = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="train",
    entry_point_kwargs=" ".join([
        "-P model_architecture=shallow_net",
        "-P epochs=30",
        "-P register_model_name=mnist_shallow_net",
    ]),
)

print("Training job for shallow neural network submitted")
print("")
pprint.pprint(response_shallow_train)
```

```python
# Submit training job for the LeNet-5 network architecture
response_le_net_train = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="train",
    entry_point_kwargs=" ".join([
        "-P model_architecture=le_net",
        "-P epochs=30",
        "-P register_model_name=mnist_le_net",
    ]),
)

print("Training job for LeNet-5 neural network submitted")
print("")
pprint.pprint(response_le_net_train)
```

Now that we have two trained models (the shallow network and the LeNet-5 network), next we will apply the fast-gradient method (FGM) evasion attack on the shallow network to generate adversarial images.
Then, after we have the adversarial images, we will use them to evaluate some standard machine learning metrics against both models.
This will give us a sense of the transferability of the attacks between models.

This specific workflow is an example of jobs that contain dependencies, as the metric evaluation jobs cannot start until the adversarial image generation jobs have completed.
Dioptra allows users to declare one-to-many job dependencies like this, which we will use to queue up jobs to start immediately after the previous jobs have concluded.
The code below illustrates this by doing the following:

1. A job is submitted that generates adversarial images based on the shallow net architecture (entry point **fgm**).
1. We wait until the job starts and a MLFlow identifier is assigned, which we check by polling the API until we see the id appear.
1. Once we have an id returned to us from the API, we queue up the metrics evaluation jobs and declare the job dependency using the `depends_on` option.
1. The message "Dependent jobs submitted" will display once everything is queued up.

In [None]:
def mlflow_run_id_is_not_known(response_fgm):
    return response_fgm["mlflowRunId"] is None and response_fgm["status"] not in [
        "failed",
        "finished",
    ]

response_fgm_shallow_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="fgm",
    entry_point_kwargs=" ".join(
        ["-P model_name=mnist_shallow_net", "-P model_version=1"]
    ),
)

print("FGM attack (shallow net architecture) job submitted")
print("")
pprint.pprint(response_fgm_shallow_net)
print("")

while mlflow_run_id_is_not_known(response_fgm_shallow_net):
    time.sleep(1)
    response_fgm_shallow_net = restapi_client.get_job_by_id(
        response_fgm_shallow_net["jobId"]
    )

response_shallow_net_infer_shallow_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_shallow_net['mlflowRunId']}",
            "-P model_name=mnist_shallow_net",
            "-P model_version=1",
        ]
    ),
    depends_on=response_fgm_shallow_net["jobId"],
)

response_le_net_infer_shallow_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_shallow_net['mlflowRunId']}",
            "-P model_name=mnist_le_net",
            "-P model_version=1",
        ]
    ),
    depends_on=response_fgm_shallow_net["jobId"],
)

print("Dependent jobs submitted")

We can poll the status of the dependent jobs using the code below.
We should see the status of the jobs shift from "queued" to "started" and eventually become "finished".

In [None]:
response_shallow_net_infer_shallow_net = restapi_client.get_job_by_id(
    response_shallow_net_infer_shallow_net["jobId"]
)
response_le_net_infer_shallow_net = restapi_client.get_job_by_id(
    response_le_net_infer_shallow_net["jobId"]
)

pprint.pprint(response_shallow_net_infer_shallow_net)
print("")
pprint.pprint(response_le_net_infer_shallow_net)

We can similiarly run an FGM-based evasion attack using the LeNet-5 architecture as our starting point.
The following code is very similar to the code we just saw, all we've done is swap out the entry point keyword argument that requests the shallow net architecture with a version that requests the LeNet-5 architecture.

In [None]:
response_fgm_le_net = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="fgm",
    entry_point_kwargs=" ".join(
        ["-P model_name=mnist_le_net", "-P model_version=1"]
    ),
)

print("FGM attack (LeNet-5 architecture) job submitted")
print("")
pprint.pprint(response_fgm_le_net)
print("")

while mlflow_run_id_is_not_known(response_fgm_le_net):
    time.sleep(1)
    response_fgm_le_net = restapi_client.get_job_by_id(response_fgm_le_net["jobId"])

response_shallow_net_infer_le_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_le_net['mlflowRunId']}",
            "-P model_name=mnist_shallow_net",
            "-P model_version=1",
        ]
    ),
    depends_on=response_fgm_le_net["jobId"],
)

response_le_net_infer_le_net_fgm = restapi_client.submit_job(
    workflows_file=WORKFLOWS_TAR_GZ,
    experiment_name="mnist",
    entry_point="infer",
    entry_point_kwargs=" ".join(
        [
            f"-P run_id={response_fgm_le_net['mlflowRunId']}",
            "-P model_name=mnist_le_net",
            "-P model_version=1",
        ]
    ),
    depends_on=response_fgm_le_net["jobId"],
)

print("Dependent jobs submitted")

Like before, we can monitor the status of the dependent jobs by querying the API using the code block below.

In [None]:
response_shallow_net_infer_le_net_fgm = restapi_client.get_job_by_id(
    response_shallow_net_infer_le_net_fgm["jobId"]
)
response_le_net_infer_le_net_fgm = restapi_client.get_job_by_id(
    response_le_net_infer_le_net_fgm["jobId"]
)

pprint.pprint(response_shallow_net_infer_le_net_fgm)
print("")
pprint.pprint(response_le_net_infer_le_net_fgm)

Congratulations, you've just run your first experiment using Dioptra!

## Querying the MLFlow Tracking Service

Currently the Dioptra API can only be used to register experiments and start jobs, so if users wish to extract their results programmatically, they can use the `MlflowClient()` class from the `mlflow` Python package to connect and query their results.
Since we captured the run ids generated by MLFlow, we can easily retrieve the data logged about one of our jobs and inspect the results.
To start the client, we simply need to run,

In [None]:
mlflow_client = MlflowClient()

The client uses the environment variable `MLFLOW_TRACKING_URI` to figure out how to connect to the MLFlow Tracking Service, which we configured near the top of this notebook.
To query the results of one of our runs, we just need to pass the run id to the client's `get_run()` method.
As an example, let's query the run results for the FGM attack applied to the LeNet-5 architecture,

In [None]:
fgm_run_le_net = mlflow_client.get_run(response_fgm_le_net["mlflowRunId"])

If the request completed successfully, we should now be able to query data collected during the run.
For example, to review the collected metrics, we just use,

In [None]:
pprint.pprint(fgm_run_le_net.data.metrics)

To review the run's parameters, we use,

In [None]:
pprint.pprint(fgm_run_le_net.data.params)

To review the run's tags, we use,

In [None]:
pprint.pprint(fgm_run_le_net.data.tags)

There are many things you can query using the MLFlow client.
[The MLFlow documentation gives a full overview of the methods that are available](https://www.mlflow.org/docs/latest/python_api/mlflow.tracking.html#mlflow.tracking.MlflowClient).

## Cleanup

To clean up, you simply need to close the browser tab containing this Jupyter notebook and shut down the services by running `make teardown` on the command-line.