# stm32ai-modelzoo train/build/generate

This Jupyter notebook invokes the Vespucci jobcontrol-api to

- Train the model 
- Benchmark the model and generate the network `.c`/`.h` sources
- Build the application firmware

### Initial setup

Dependencies are managed via the `poetry` command. You may install it with `pip install poetry`

Then, install all dependencies simply by invoking `poetry install` in this directory. This will create a virtual environment and place it in this folder as `.venv`.

Start a jupyter kernel in that environment and you're ready to run the notebook.

You will need API credentials for Vespucci. This example runs on vespucci-dev by default, so you should use a token generated for that environment. Retrieve an id token and place it into a `token.txt` file in the working directory.


In [None]:
# Configuration preamble
import json
import httpx
import config
import utils
import asyncio
import zipfile as zf
import io

from collections.abc import Awaitable

client = httpx.Client(timeout=300, headers=utils.headers())

## Model training

This step will produce the model artifact.

In [None]:
print("Creating training job")

job = (
    client.post(
        config.JOBCONTROL_API_JOBS_ENDPOINT,
        files={
            "uploadedFile": (
                "train_data",
                open("./sample-inputs/object_detection/train.zip", "rb"),
            ),
            "uploadedFile2": (
                "valid_data",
                open("./sample-inputs/object_detection/valid.zip", "rb"),
            ),
        },
        data={
            "templateId": config.TRAINING_JOB_TEMPLATE_ID,
            "runtimeInput": json.dumps(
                {
                    "job_config": {"root": "object_detection/scripts/training"},
                    "training_config": {
                        "hydra": {"run": {"dir": "outputs"}},
                        "stm32ai": {"version": "8.0.1"},
                        "dataset": {"class_names": ["person"]},
                    },
                }
            ),
        },
    )
    .raise_for_status()
    .json()
)


print(f"Created training job: {job!r}")

job_id = job["id"]

print("Blocking until job is done...")
job = await utils.block_until_done(job_id)

print("Training job is done. Downloading artifacts")
artifacts = utils.download_artifacts(job)

client.delete(f"{config.JOBCONTROL_API_JOBS_ENDPOINT}/{job_id}")

outputs_zip = zf.ZipFile(io.BytesIO(artifacts["outputs_zip"]), "r")

model = outputs_zip.read("quantized_models/quantized_model.tflite")

## Benchmarking + Cube.AI

This step will generate `.c`/`.h` NN sources by invoking the STM32 Developer cloud APIs.

We'll download a zip containing said sources as well as the CubeAI runtime that will eventually be linked into the application firmware.

Simultaneously, we'll dispatch the benchmarkinging job and display the inference time once it's been measured.

In [None]:
# devcloud API command
generate_command = {
    "command": "generate",
    "arguments": {
        "options": {
            "includeLibraryForSerie": "H7",
            "includeLibraryForIde": "gcc",
            "allocateInputs": True,
            "allocateOutputs": True,
            "compression": "none",
            "optimization": "balanced",
            "classifier": False,
        },
    },
    "version": "8.0.1",
}

print("Creating Cube.AI job")
generate_job = (
    client.post(
        config.JOBCONTROL_API_JOBS_ENDPOINT,
        files={
            "model": ("model", model),
        },
        data={
            "templateId": config.CUBEAI_JOB_TEMPLATE_ID,
            "runtimeInput": json.dumps({"command": generate_command}),
        },
    )
    .raise_for_status()
    .json()
)
print(f"Created CubeAI job: {generate_job!r}")

# devcloud API command
benchmarking_command = {
    "command": "benchmark",
    "arguments": {"options": {}, "parameters": {"board_name": "STM32H747I-DISCO"}},
    "version": "8.0.1",
}

print("Creating benchmarking job")
benchmarking_job = (
    client.post(
        config.JOBCONTROL_API_JOBS_ENDPOINT,
        files={
            "model": ("model", model),
        },
        data={
            "templateId": config.BENCHMARKING_JOB_TEMPLATE_ID,
            "runtimeInput": json.dumps({"command": benchmarking_command}),
        },
    )
    .raise_for_status()
    .json()
)
print(f"Created benchmarking job: {benchmarking_job!r}")


async def _generate_coro() -> Awaitable[None]:
    global generate_job
    global cubeai_output

    job_id = generate_job["id"]

    generate_job = await utils.block_until_done(job_id)

    print("CubeAI job is done. Downloading artifacts")
    artifacts = utils.download_artifacts(generate_job)
    client.delete(f"{config.JOBCONTROL_API_JOBS_ENDPOINT}/{job_id}")

    cubeai_output = artifacts["output"]


async def _benchmarking_coro() -> Awaitable[None]:
    global benchmarking_job
    global benchmarking_output

    job_id = benchmarking_job["id"]

    benchmarking_job = await utils.block_until_done(job_id)

    print(f"Benchmarking job is done. Downloading artifacts")
    artifacts = utils.download_artifacts(benchmarking_job)
    client.delete(f"{config.JOBCONTROL_API_JOBS_ENDPOINT}/{job_id}")

    benchmarking_output = artifacts["output"]

    with zf.ZipFile(io.BytesIO(benchmarking_output), "r") as archive:
        report = json.loads(archive.read("benchmark_report.json").decode("utf-8"))
        print(f"Benchmarking job is done. Inference time: {report['duration_ms']}ms")

await asyncio.wait(
    (asyncio.create_task(_generate_coro()), asyncio.create_task(_benchmarking_coro()))
)

print("Cube.AI and benchmarking done.")

## Building

In this step we build the output NN sources into a deployment-ready application firmware.

In [None]:
print("Creating builder job")
job = (
    client.post(
        config.JOBCONTROL_API_JOBS_ENDPOINT,
        files={"file1": ("cubeai_output", cubeai_output), "file2": ("model", model)},
        data={
            "templateId": config.COMPILATION_JOB_TEMPLATE_ID,
            "runtimeInput": json.dumps(
                {
                    "job_config": {"root": "object_detection/scripts/deployment"},
                    "build_config": {
                        "stm32ai": {"version": "8.0.1"},
                        "dataset": {"class_names": ["person"]},
                    },
                }
            ),
        },
    )
    .raise_for_status()
    .json()
)

print(f"Created builder job: {job!r}")

job_id = job["id"]

print("Blocking until job is done...")
job = await utils.block_until_done(job_id)

print("Builder job is done. Downloading artifacts")
artifacts = utils.download_artifacts(job)

client.delete(f"{config.JOBCONTROL_API_JOBS_ENDPOINT}/{job_id}")

getting_started_zip = artifacts["getting_started"]

with zf.ZipFile(io.BytesIO(getting_started_zip), "r") as archive:
    binary_file_name = next(
        name for name in archive.namelist() if name.endswith(".bin")
    )
    binary_file = archive.read(binary_file_name)

assert open("./getting_started.zip", "wb").write(getting_started_zip) > 0
assert open("./binary.bin", "wb").write(binary_file) > 0

print("Succesfully produced final artifacts: `getting_started.zip` and `binary.bin`")