# MONAI Deploy pipeline

## Install Dependencies

In [1]:
!pip install --upgrade monai-deploy-app-sdk

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting monai-deploy-app-sdk
  Downloading monai_deploy_app_sdk-0.4.0-py3-none-any.whl (162 kB)
[K     |████████████████████████████████| 162 kB 20.0 MB/s eta 0:00:01
Collecting typeguard>=2.12.1
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, monai-deploy-app-sdk
Successfully installed monai-deploy-app-sdk-0.4.0 typeguard-2.13.3


# Import Dependencies
Here, we need to import monai deploy classes, as well as the transforms that we used from MONAI.

In [1]:
import monai.deploy.core as md  # 'md' stands for MONAI Deploy (or can use 'core' instead)
from monai.deploy.core import (
    Application,
    DataPath,
    ExecutionContext,
    InputContext,
    IOType,
    Operator,
    OutputContext,
)
from monai.transforms import (
    AsDiscrete,
    AsDiscreted,
    BoundingRectd,
    EnsureChannelFirstd,
    EnsureTyped,
    Compose,
    CropForegroundd,
    Lambdad,
    LoadImaged,
    Orientationd,
    RandCropByPosNegLabeld,
    SaveImaged,
    ScaleIntensityRanged,
    Spacingd,
    Invertd,
    ToTensord,
)
from monai.apps.detection.utils.anchor_utils import AnchorGeneratorWithAnchorShape
from monai.apps.detection.metrics.coco import COCOMetric
from monai.apps.detection.metrics.matching import matching_batch
from monai.apps.detection.networks.retinanet_detector import RetinaNetDetector
from monai.apps.detection.networks.retinanet_network import (
    RetinaNet,
    resnet_fpn_feature_extractor,
)

import os
import glob
import torch

  from .autonotebook import tqdm as notebook_tqdm


# Creating our operator
Here we define our operator as follows:
1. First we define an input of type DataPath from DISK.
2. Next, we define an output of type DataPath from DISK.
3. Third, we ensure that monai is a dependency that needs to be installed.
4. Then, we define the class, inheriting the Operator class from MONAI Deploy
5. Then we add a property called transform which will do the validation style transforms that we defined in the training module.
6. We then write the compute function, which should consist of getting the path from the input context, using transform on the path, converting the output to an output tensor that is then put on the GPU, and lastly the model is used to perform a forward pass with the image tensor.
7. Lastly, the results are postprocessed into the classes using argmax and the results are saved to output.json.

In [12]:
@md.input("image", DataPath, IOType.DISK)
@md.output("output", DataPath, IOType.DISK)
@md.env(pip_packages=["monai"])
class SpleenDetectorOperator(Operator):
    """Classifies the given image and returns the class name."""

    @property
    def transform(self):
        test_transforms = Compose(
            [
                LoadImaged(keys=["image"]),
                EnsureChannelFirstd(keys=["image"]),
                EnsureTyped(keys=["image"], dtype=torch.float32),
                ScaleIntensityRanged(
                    keys=["image"], a_min=-57, a_max=164,
                    b_min=0.0, b_max=1.0, clip=True,
                ),
                CropForegroundd(keys=["image"], source_key="image"),
                Orientationd(keys=["image"], axcodes="RAS"),
                Spacingd(keys=["image"], pixdim=(
                    1.5, 1.5, 2.0), mode=("bilinear")),
            ]
        )
        return test_transforms

    def compute(self, op_input: InputContext, op_output: OutputContext, context: ExecutionContext):
        import json

        import torch
        
        input_path = op_input.get().path
        if input_path.is_dir():
            input_path = next(input_path.glob("*.*"))  # take the first file

        image_tensor = self.transform({"image": input_path})  # Load path as dict
        image_tensor = torch.tensor(image_tensor["image"].array)  # Get just the image for input

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        image_tensor = [image_tensor.to(device)]

        ### Build Anchor Generator
        anchor_generator = AnchorGeneratorWithAnchorShape(
            feature_map_scales=[2**l for l in range(len([1,2]) + 1)],
            base_anchor_shapes=[[6,8,4],[8,6,5],[10,10,6]],
        )
        
        ### Build Model
        model = context.models.get()  # get a TorchScriptModel object
        model.predictor = torch.jit.load(model.path, map_location=device).eval()
        net = model.predictor
        
        ### Build Detector
        detector = RetinaNetDetector(
            network=net, anchor_generator=anchor_generator, debug=False
        ).to(device)

        detector.set_target_keys(box_key="label_box", label_key="label_class")

        # set validation components
        detector.set_box_selector_parameters(
            score_thresh=0.02,
            topk_candidates_per_level=1000,
            nms_thresh=0.22,
            detections_per_img=100,
        )
        detector.set_sliding_window_inferer(
            roi_size=[512,512,208],
            overlap=0.25,
            sw_batch_size=1,
            mode="constant",
            device="cpu",
        )
        
        detector.eval()
        with torch.no_grad():
            with torch.cuda.amp.autocast():
                val_outputs_all = detector(image_tensor, use_inferer=True)

        pred_boxes=[
            val_data_i[detector.target_box_key].cpu().detach().numpy().tolist()
            for val_data_i in val_outputs_all
        ],
        pred_classes=[
            val_data_i[detector.target_label_key].cpu().detach().numpy().tolist()
            for val_data_i in val_outputs_all
        ],
        pred_scores=[
            val_data_i[detector.pred_score_key].cpu().detach().numpy().tolist()
            for val_data_i in val_outputs_all
        ],
        print(pred_boxes, pred_classes, pred_scores)
        result = {"boxes": pred_boxes, "classes": pred_classes, "scores": pred_scores}

        # Get output (folder) path and create the folder if not exists
        output_folder = op_output.get().path
        output_folder.mkdir(parents=True, exist_ok=True)

        # Write result to "output.json"
        output_path = output_folder / "output.json"
        with open(output_path, "w") as fp:
            json.dump(result, fp)

# Creating our application
In this section, all the operators that are defined should be included. Here, we only used one operator, so we will use the add_operator function to add our operator into the application function. Notice here that you can also define the number of cpu, gpu and memory that are required.

In [13]:
@md.resource(cpu=2, gpu=1, memory="8Gi")
class App(Application):
    """Application class for the Spleen detector."""

    def compose(self):
        classifier_op = SpleenDetectorOperator()

        self.add_operator(classifier_op)

# Testing our application

In order to test our application, we will point "test_input_path" to a jpeg file in our test folder.

In [14]:
root_dir = "./"
data_dir = os.path.join(root_dir, "Task09_Spleen")
test_images = sorted(
    glob.glob(os.path.join(data_dir, "imagesTs", "*.nii.gz")))
data_dicts = [
    {"image": image_name}
    for image_name in test_images
]
test_input_path = data_dicts[0]["image"]
print(f"Test input file path: {test_input_path}")

Test input file path: ./Task09_Spleen/imagesTs/spleen_1.nii.gz


Next, we instantiate the App class, and then perform run.

In [15]:
app = App()

In [16]:
app.run(input=test_input_path, output="output", model="classifier.zip")

[34mGoing to initiate execution of operator SpleenDetectorOperator[39m
[32mExecuting operator SpleenDetectorOperator [33m(Process ID: 5894, Operator ID: d2924a33-5ea5-42ee-9b12-26cd3686f279)[39m
([[[21.58724594116211, 60.32127380371094, 33.97724151611328, 75.53152465820312, 108.71205139160156, 62.45012664794922], [29.138856887817383, 57.99709701538086, 45.1968994140625, 66.86248779296875, 83.99801635742188, 66.1964111328125], [21.605960845947266, 48.43174743652344, 49.73490524291992, 75.6953125, 96.08973693847656, 78.13142395019531], [35.06386947631836, 68.13758087158203, 40.05929183959961, 60.62265396118164, 100.37023162841797, 56.25345230102539], [45.2677001953125, 71.50562286376953, 45.317298889160156, 82.5245361328125, 97.23815155029297, 66.2081527709961], [28.44527244567871, 57.32223892211914, 29.66332244873047, 66.66019439697266, 83.91702270507812, 51.12281036376953], [0.0, 14.650421142578125, 20.98767852783203, 38.73200607299805, 76.86129760742188, 43.37999725341797], [0.0,

Lastly, we can check that the results were indeed output into a json file.

In [17]:
!cat output/output.json

{"boxes": [[[[21.58724594116211, 60.32127380371094, 33.97724151611328, 75.53152465820312, 108.71205139160156, 62.45012664794922], [29.138856887817383, 57.99709701538086, 45.1968994140625, 66.86248779296875, 83.99801635742188, 66.1964111328125], [21.605960845947266, 48.43174743652344, 49.73490524291992, 75.6953125, 96.08973693847656, 78.13142395019531], [35.06386947631836, 68.13758087158203, 40.05929183959961, 60.62265396118164, 100.37023162841797, 56.25345230102539], [45.2677001953125, 71.50562286376953, 45.317298889160156, 82.5245361328125, 97.23815155029297, 66.2081527709961], [28.44527244567871, 57.32223892211914, 29.66332244873047, 66.66019439697266, 83.91702270507812, 51.12281036376953], [0.0, 14.650421142578125, 20.98767852783203, 38.73200607299805, 76.86129760742188, 43.37999725341797], [0.0, 43.802040100097656, 18.751644134521484, 31.445295333862305, 60.979209899902344, 43.825016021728516]]]], "classes": [[[0, 0, 0, 0, 0, 0, 0, 0]]], "scores": [[[0.28776782751083374, 0.09653531

# Wrapping it all up
Next, we take cells 1, 2, and 3, and paste them into a file called spleen_detector_monaideploy.py. This is done in the next cell.
At the end, we add the `if __name__ == "__main__":` definition to make sure that the app is run if it's called on the python command line.

In [4]:
%%writefile spleen_detector_monaideploy.py

# Cell [1]
import monai.deploy.core as md  # 'md' stands for MONAI Deploy (or can use 'core' instead)
from monai.deploy.core import (
    Application,
    DataPath,
    ExecutionContext,
    InputContext,
    IOType,
    Operator,
    OutputContext,
)
from monai.transforms import (
    AsDiscrete,
    AsDiscreted,
    BoundingRectd,
    EnsureChannelFirstd,
    EnsureTyped,
    Compose,
    CropForegroundd,
    Lambdad,
    LoadImaged,
    Orientationd,
    RandCropByPosNegLabeld,
    SaveImaged,
    ScaleIntensityRanged,
    Spacingd,
    Invertd,
    ToTensord,
)
from monai.apps.detection.utils.anchor_utils import AnchorGeneratorWithAnchorShape
from monai.apps.detection.metrics.coco import COCOMetric
from monai.apps.detection.metrics.matching import matching_batch
from monai.apps.detection.networks.retinanet_detector import RetinaNetDetector
from monai.apps.detection.networks.retinanet_network import (
    RetinaNet,
    resnet_fpn_feature_extractor,
)

import os
import glob
import torch

# Cell [2]
@md.input("image", DataPath, IOType.DISK)
@md.output("output", DataPath, IOType.DISK)
@md.env(pip_packages=["monai"])
class SpleenDetectorOperator(Operator):
    """Classifies the given image and returns the class name."""

    @property
    def transform(self):
        test_transforms = Compose(
            [
                LoadImaged(keys=["image"]),
                EnsureChannelFirstd(keys=["image"]),
                EnsureTyped(keys=["image"], dtype=torch.float32),
                ScaleIntensityRanged(
                    keys=["image"], a_min=-57, a_max=164,
                    b_min=0.0, b_max=1.0, clip=True,
                ),
                CropForegroundd(keys=["image"], source_key="image"),
                Orientationd(keys=["image"], axcodes="RAS"),
                Spacingd(keys=["image"], pixdim=(
                    1.5, 1.5, 2.0), mode=("bilinear")),
            ]
        )
        return test_transforms

    def compute(self, op_input: InputContext, op_output: OutputContext, context: ExecutionContext):
        import json

        import torch
        
        input_path = op_input.get().path
        if input_path.is_dir():
            input_path = next(input_path.glob("*.*"))  # take the first file

        image_tensor = self.transform({"image": input_path})  # Load path as dict
        image_tensor = torch.tensor(image_tensor["image"].array)  # Get just the image for input

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        image_tensor = [image_tensor.to(device)]

        ### Build Anchor Generator
        anchor_generator = AnchorGeneratorWithAnchorShape(
            feature_map_scales=[2**l for l in range(len([1,2]) + 1)],
            base_anchor_shapes=[[6,8,4],[8,6,5],[10,10,6]],
        )
        
        ### Build Model
        model = context.models.get()  # get a TorchScriptModel object
        model.predictor = torch.jit.load(model.path, map_location=device).eval()
        net = model.predictor
        
        ### Build Detector
        detector = RetinaNetDetector(
            network=net, anchor_generator=anchor_generator, debug=False
        ).to(device)

        detector.set_target_keys(box_key="label_box", label_key="label_class")

        # set validation components
        detector.set_box_selector_parameters(
            score_thresh=0.02,
            topk_candidates_per_level=1000,
            nms_thresh=0.22,
            detections_per_img=100,
        )
        detector.set_sliding_window_inferer(
            roi_size=[512,512,208],
            overlap=0.25,
            sw_batch_size=1,
            mode="constant",
            device="cpu",
        )
        
        detector.eval()
        with torch.no_grad():
            with torch.cuda.amp.autocast():
                val_outputs_all = detector(image_tensor, use_inferer=True)

        pred_boxes=[
            val_data_i[detector.target_box_key].cpu().detach().numpy().tolist()
            for val_data_i in val_outputs_all
        ],
        pred_classes=[
            val_data_i[detector.target_label_key].cpu().detach().numpy().tolist()
            for val_data_i in val_outputs_all
        ],
        pred_scores=[
            val_data_i[detector.pred_score_key].cpu().detach().numpy().tolist()
            for val_data_i in val_outputs_all
        ],
        print(pred_boxes, pred_classes, pred_scores)
        result = {"boxes": pred_boxes, "classes": pred_classes, "scores": pred_scores}

        # Get output (folder) path and create the folder if not exists
        output_folder = op_output.get().path
        output_folder.mkdir(parents=True, exist_ok=True)

        # Write result to "output.json"
        output_path = output_folder / "output.json"
        with open(output_path, "w") as fp:
            json.dump(result, fp)
            
# Cell [3]
@md.resource(cpu=1, gpu=1, memory="6Gi")
class App(Application):
    """Application class for the Spleen detector."""

    def compose(self):
        classifier_op = SpleenDetectorOperator()

        self.add_operator(classifier_op)

# Finally
if __name__ == "__main__":
    App(do_run=True)

Overwriting spleen_detector_monaideploy.py


# Free Memory
This previous test takes a lot of memory and the next section won't run if the memory isn't freed

In [None]:
import os
os._exit(00)

In [2]:
import os
import glob
root_dir = "./"
data_dir = os.path.join(root_dir, "Task09_Spleen")
test_images = sorted(
    glob.glob(os.path.join(data_dir, "imagesTs", "*.nii.gz")))
data_dicts = [
    {"image": image_name}
    for image_name in test_images
]
test_input_path = data_dicts[0]["image"]
print(f"Test input file path: {test_input_path}")

Test input file path: ./Task09_Spleen/imagesTs/spleen_1.nii.gz


# Use PYTHON to run the code directly
Now that the code is written, we can then run the code using python directly as a test.

In [5]:
!python spleen_detector_monaideploy.py -i {test_input_path} -o output -m classifier.zip

[34mGoing to initiate execution of operator SpleenDetectorOperator[39m
[32mExecuting operator SpleenDetectorOperator [33m(Process ID: 6697, Operator ID: 9acb4191-b4dc-4c22-ae7e-bf2d507bc3cf)[39m
([[[21.58724594116211, 60.32127380371094, 33.97724151611328, 75.53152465820312, 108.71205139160156, 62.45012664794922], [29.138856887817383, 57.99709701538086, 45.1968994140625, 66.86248779296875, 83.99801635742188, 66.1964111328125], [21.605960845947266, 48.43174743652344, 49.73490524291992, 75.6953125, 96.08973693847656, 78.13142395019531], [35.06386947631836, 68.13758087158203, 40.05929183959961, 60.62265396118164, 100.37023162841797, 56.25345230102539], [45.2677001953125, 71.50562286376953, 45.317298889160156, 82.5245361328125, 97.23815155029297, 66.2081527709961], [28.44527244567871, 57.32223892211914, 29.66332244873047, 66.66019439697266, 83.91702270507812, 51.12281036376953], [0.0, 14.650421142578125, 20.98767852783203, 38.73200607299805, 76.86129760742188, 43.37999725341797], [0.0,

# Use MONAI Deploy Runner to test the app
We can also test the execution using the included MONAI deploy runner.

In [6]:
!monai-deploy exec spleen_detector_monaideploy.py -i {test_input_path} -o output -m classifier.zip

[34mGoing to initiate execution of operator SpleenDetectorOperator[39m
[32mExecuting operator SpleenDetectorOperator [33m(Process ID: 6908, Operator ID: dfa85890-dcf4-4a66-984b-f67a587c7e88)[39m
([[[21.58724594116211, 60.32127380371094, 33.97724151611328, 75.53152465820312, 108.71205139160156, 62.45012664794922], [29.138856887817383, 57.99709701538086, 45.1968994140625, 66.86248779296875, 83.99801635742188, 66.1964111328125], [21.605960845947266, 48.43174743652344, 49.73490524291992, 75.6953125, 96.08973693847656, 78.13142395019531], [35.06386947631836, 68.13758087158203, 40.05929183959961, 60.62265396118164, 100.37023162841797, 56.25345230102539], [45.2677001953125, 71.50562286376953, 45.317298889160156, 82.5245361328125, 97.23815155029297, 66.2081527709961], [28.44527244567871, 57.32223892211914, 29.66332244873047, 66.66019439697266, 83.91702270507812, 51.12281036376953], [0.0, 14.650421142578125, 20.98767852783203, 38.73200607299805, 76.86129760742188, 43.37999725341797], [0.0,

# Package App for Docker
Lastly, this step will call docker to package the application as a docker image which can then be run anywhere that docker images are supported.

In [12]:
!monai-deploy package spleen_detector_monaideploy.py --tag spleen_detect_app:latest --model classifier.zip  # -l DEBUG

Building MONAI Application Package... /bin/bash: docker: command not found
Done
