# Training a VISTA2d Bundle

This tutorial demonstrates how to train VISTA2d bundle on the NVIDIA DGX Cloud. It focuses on utilizing the powerful capabilities of DGX systems for medical imaging applications, specifically using a MONAI VISTA2d bundle.

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/NVIDIA/monai-cloud-api/blob/main/notebooks/Training%20a%20VISTA2d%20Bundle.ipynb)

## Table of Contents

- Introduction
- Setup
- Datasets Creation
- Experiment Creation
- Monitoring Job Status and Logging
- Clean Up

## Introduction

This tutorial demonstrates how to train a MONAI segmentation bundle on the NVIDIA DGX Cloud. It focuses on utilizing the powerful capabilities of DGX systems for medical imaging applications, specifically using a MONAI Vista 2D bundle for cell images

### What You Can Expect to Learn

This tutorial is designed to guide you through the process of training a MONAI segmentation bundle using the NVIDIA DGX Cloud. Throughout this guide, you will run batch training and inference jobs, as well as monitoring their progress effectively. By the end of this tutorial, you will have the models and inference results on your remote cloud storage bucket.

## Setup

In [None]:
!python -c "import requests" || pip install -q "requests"
!python -c "import nibabel" || pip install -q "nibabel"
!python -c "import matplotlib" || pip install -q "matplotlib"
!python -c "import libcloud" || pip install -q "apache-libcloud"
!python -c "import PIL" || pip install -q "Pillow"

import json
import os
import time

import matplotlib.pyplot as plt
import nibabel as nib
import requests

from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider
from PIL import Image

#### Required Parameters

In [None]:
# Provided the following parameters to start this notebook.
host_url = "https://api.monai.ngc.nvidia.com"
ngc_api_key = os.environ.get("MONAI_API_KEY", "<YOUR_API_KEY>")  # we recommend using environment variables for API keys, but you can also hardcode them here

# The cloud storage type used in this notebook. Currently only support `aws` and `azure`.
cloud_type = "azure" # cloud storage provider: aws or azure
cloud_account = "account_name" # if cloud_type == "aws"  should be "access_key"
cloud_secret = "access_key" # if cloud_type == "aws" should be "secret_key"

# Cloud storage credentials. Needed for storing the data and results of the experiments.
access_id = "<user name for the remote storage object>"  # Please fill it with the actual Access ID
access_secret = "<secret for the remote storage object>"  # Please fill it with the actual Access Secret

# Dataset Cloud Storage URL. This is the cloud storage where the training and validation dataset is stored.
train_manifest_url = "<train manifest url>"
val_manifest_url = "<validation manifest url>"

# Experiment Cloud Storage. This is the storage where your jobs and experiments data will be stored.
cs_bucket = "<bucket or container name to push experiment job data to>"  # Please fill it with the actual bucket name

# Inference workflow parameters. If the inference is needed after training, because the vista2d doesn't provide pretrained model.
do_inference = True
if do_inference:
    inference_manifest_url = "<inference manifest url>"

#### Login into NGC and API Setup

In [None]:
# Exchange NGC_API_KEY for JWT
api_url = f"{host_url}/api/v1"
response = requests.post(f"{api_url}/login", json={"ngc_api_key": ngc_api_key})
response.raise_for_status()
assert "user_id" in response.json(), "user_id is not in response."
assert "token" in response.json(), "token is not in response."
user_id = response.json()["user_id"]
token = response.json()["token"]

# Construct the URL and Headers
ngc_org = "iasixjqzw1hj"  # This is the default org for MONAI users. Please select the correct org if you are not using the default one.
base_url = f"{api_url}/orgs/{ngc_org}"
headers = {"Authorization": f"Bearer {token}"}
print("API Calls will be forwarded to", base_url)

## Dataset Creation

### Create the Training Dataset and the Validation Dataset

Define and create your training and validation datasets using the MONAI Cloud API.

In [None]:
endpoint = f"{base_url}/datasets"

# Training dataset
data = {
    "name": "MONAI_vista2d_train",
    "description":"Remote storage object dataset for training",
    "type": "semantic_segmentation",
    "format": "monai",
    "client_url": train_manifest_url,
    "client_id": access_id,
    "client_secret": access_secret,
}

response = requests.post(endpoint, json=data, headers=headers)
print(response.json())
assert response.status_code == 201, f"Create train dataset failed, got {response.json()}."
res = response.json()
train_dataset_id = res["id"]
print("Train dataset creation succeeded with dataset ID:", train_dataset_id)
print("---------------------------------\n")
print(json.dumps(res, indent=2))

# Validation dataset
data = {
    "name": "MONAI_vista2d_val",
    "description":"Remote storage object dataset for validation",
    "type": "semantic_segmentation",
    "format": "monai",
    "client_url": val_manifest_url,
    "client_id": access_id,
    "client_secret": access_secret,
}

response = requests.post(endpoint, json=data, headers=headers)
print(response.json())

assert response.status_code == 201, f"Create val dataset failed, got {response.json()}."
res = response.json()
val_dataset_id = res["id"]
print("Validation dataset creation succeeded with dataset ID:", val_dataset_id)
print("---------------------------------\n")
print(json.dumps(res, indent=2))

### Create the inference dataset (optional)

Define and create your inference dataset using the MONAI Cloud API.

In [None]:
if do_inference:
    endpoint = f"{base_url}/datasets"
    
    # Inference dataset
    data = {
        "name": "MONAI_vista2d_infer",
        "description":"Remote storage object dataset for inference",
        "type": "semantic_segmentation",
        "format": "monai",
        "client_url": inference_manifest_url,
        "client_id": access_id,
        "client_secret": access_secret,
    }
    response = requests.post(endpoint, json=data, headers=headers)
    print(response.json())

    assert response.status_code == 201, f"Create inference dataset failed, got {response.json()}."
    res = response.json()
    inference_dataset_id = res["id"]
    print("Inference dataset creation succeeded with dataset ID:", inference_dataset_id)
    print("---------------------------------\n")
    print(json.dumps(res, indent=2))

## Experiment Creation

Create a MONAI segmentation experiment, specifying the necessary parameters and datasets. In this tutorial, we will use the vista2d bundle.

### List Available Base Experiments

#### Find the base experiment for VISTA-2D

In [None]:
endpoint = f"{base_url}/experiments:base"
response = requests.get(endpoint, headers=headers)
assert response.status_code == 200, f"List base experiments failed, got {response.text}."
res = response.json()

# VISTA-3D
vista3d_base_exps = [p for p in res["experiments"] if p["network_arch"] == "monai_vista2d"]
assert len(vista3d_base_exps) > 0, "No base experiment found for VISTA 2D bundle"
print("List of available base experiments for VISTA 2D bundle:")
for exp in vista3d_base_exps:
    print(f"  {exp['id']}: {exp['name']} v{exp['version']}")
# Take the latest version
base_experiment = sorted(vista3d_base_exps, key=lambda x: x["version"])[-1]
vista_bundle_name = base_experiment["ngc_path"].split("/")[-1].replace(":", "_v")
version = base_experiment["version"]
base_exp_vista = base_experiment["id"]
print("-----------------------------------------------------------------------------------------")
print(f"Base experiment ID for '{base_experiment['name']}' v{base_experiment['version']}: {base_exp_vista}")
print("-----------------------------------------------------------------------------------------")

### Create Experiment

Set up and create your segmentation experiment based on the retrieved information. Run a batch training job with the created experiment.

In [None]:
experiment_cloud_details = {
    "cloud_type": cloud_type,
    "cloud_file_type": "folder",  # If the file is tar.gz key in "file", else "folder"
    "cloud_specific_details": {
        "cloud_bucket_name": cs_bucket,  # Bucket link to save files
        cloud_account: access_id,  # Access and Secret for Azure
        cloud_secret: access_secret,  # Access and Secret for Azure
    }
}

data = {
    "name": "my_vista",
    "description": "based on vista",
    "network_arch": "monai_vista2d",
    "type": "medical",
    "base_experiment": [ base_exp_vista ],
    "eval_dataset": val_dataset_id,
    "train_datasets": [ train_dataset_id ],
    "cloud_details": experiment_cloud_details,
}

if do_inference:
    data.update({"inference_dataset": inference_dataset_id})

endpoint = f"{base_url}/experiments"
response = requests.post(endpoint, json=data, headers=headers)
assert response.status_code == 201, f"Create experiment failed, got {response.json()}."
res = response.json()
experiment_id = res["id"]
print("Experiment creation succeeded with experiment ID: ", experiment_id)
print("---------------------------------\n")
print(json.dumps(res, indent=2))


### Run a Batch Training Job

Configure and initiate a batch training job on the DGX cloud, specifying the number of epochs and other parameters.

In [None]:
train_spec = {
    "train#trainer#max_epochs": 2,  # the key to override epochs
}

data = {"name": "my_vista2d", "action": "train", "specs": train_spec}
endpoint = f"{base_url}/experiments/{experiment_id}/jobs"
response = requests.post(endpoint, json=data, headers=headers)

assert response.status_code == 201, f"Run dgx train job failed, got {response.json()}."
train_job_id = response.json()
print("Job creation succeeded with job ID: ", train_job_id)


## Monitoring Job Status and Logging

Monitoring the status of your jobs is a crucial aspect of managing workflows effectively. In our system, the job monitoring feature provides a straightforward yet essential overview of your job's current state.

In [None]:
def wait_for_job(endpoint, headers, timeout=1800, interval=5, target_status="Done"):
    """Helper function to wait for job to reach target status."""
    expected = ["Pending", "Running", "Done"]
    assert target_status in expected, f"Invalid target status: {target_status}"
    status_before_target = expected[:expected.index(target_status)]
    start_time = time.time()
    print(f"Waiting for job to reach state {target_status} ...")
    status = None
    while True:
        response = requests.get(endpoint, headers=headers)
        response.raise_for_status()
        status_new = response.json()["status"].title()
        if time.time() - start_time > timeout:
            print(f"\nJob timeout after {timeout} seconds with last status {status_new}.")
            break
        elif status_new not in status_before_target:
            assert status_new == target_status, f"Job failed with status: {status_new}"
            print(f"\nJob reached target status: {status_new}")
            break
        print(f"\n{status_new}", end="", flush=True) if status_new != status else print(".", end="", flush=True)
        status = status_new
        time.sleep(interval)

# During the Job is Running 
endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{train_job_id}"
response = requests.get(endpoint, headers=headers)

assert response.status_code == 200, f"Failed to get job status, got {response.json()}."
for k, v in response.json().items():
    if k != "result":
        print(f"{k}: {v}")
    else:
        print("result:")
        for k1, v1 in v.items():
            print(f"    {k1}: {v1}")

print("------------------------------------------------------------------------")
wait_for_job(endpoint, headers, timeout=1800)

### Job Log Download

Access and download job logs to troubleshoot or assess performance. The job log is available when the status of the job is `RUNNING`, `Error` or `Done`. This API is available for all kinds of jobs.

Please note that the job log will not be immediately available after the status turns to `RUNNING` since it takes a while to prepare the environment for the running job.

In [None]:
endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{train_job_id}"
response = requests.get(endpoint, headers=headers)
assert response.status_code == 200, f"Failed to get job status, got {response.json()}."
status = response.json()["status"].title()
if status in ["Running", "Done", "Error"]:
    endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{train_job_id}/logs"
    response = requests.get(endpoint, headers=headers)
    assert response.status_code == 200, f"Failed to get job logs, got {response.text}."
    print(response.text)
else:
    print(f"Job status: {status}, logs are not available.")

### Check the job results (checkpoint, scripts, logs, etc.)

You'll find the results in the cloud storage bucket you specified when creating the experiment. The results will include the model checkpoints, scripts, logs, and other relevant data.

The path to the results will be in the following format:

```python
f"{bucket_name}/shared/orgs/{ngc_org}/users/{user_id}/jobs/{job_id}"
```

## Batch Inference (Optional)

You can do the batch inference action inside an experiment.
You need to run a batch training job first and set the training job id to the input parameters.

In [None]:
if do_inference:
    # after training, model.pt is saved in the trained bundle
    inference_spec = {"train_job_id": train_job_id, "pretrained_ckpt_name": "model.pt"}
    # you can also specify the "mode" to "eval" to run validation on the trained model
    # in this case, please set inference_spec as:
    # inference_spec = {"train_job_id": train_job_id, "pretrained_ckpt_name": "model.pt", "mode": "eval"}
    data = {"name": "vista2d_infer", "action": "batchinfer", "specs": inference_spec}
    endpoint = f"{base_url}/experiments/{experiment_id}/jobs"
    response = requests.post(endpoint, json=data, headers=headers)

    assert response.status_code == 201, f"Run batch inference job failed, got {response.json()}."
    infer_job_id = response.json()
    print("Job creation succeeded with job ID: ", infer_job_id)

    # During the Job is Running 
    endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{infer_job_id}"
    response = requests.get(endpoint, headers=headers)

    assert response.status_code == 200, f"Failed to get job status, got {response.json()}."
    for k, v in response.json().items():
        if k != "result":
            print(f"{k}: {v}")
        else:
            print("result:")
            for k1, v1 in v.items():
                print(f"    {k1}: {v1}")

    print("------------------------------------------------------------------------")
    wait_for_job(endpoint, headers, timeout=1800)

### Job Log Download

In [None]:
endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{infer_job_id}"
response = requests.get(endpoint, headers=headers)
assert response.status_code == 200, f"Failed to get job status, got {response.json()}."
status = response.json()["status"].title()
if status in ["Running", "Done", "Error"]:
    endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{infer_job_id}/logs"
    response = requests.get(endpoint, headers=headers)
    assert response.status_code == 200, f"Failed to get job logs, got {response.text}."
    print(response.text)
else:
    print(f"Job status: {status}, logs are not available.")

### Download and Vsiualize the Inference Images

Download the predicted masks from the cloud storage to your local machine for further analysis, visualization, and integration into medical imaging applications.

In [None]:
folder = f"shared/orgs/{ngc_org}/users/{user_id}/jobs/{infer_job_id}/cell_vista_segmentation_v{version}/results/prediction"

if cloud_type == "aws":
    cs_driver = get_driver(Provider.S3)
elif cloud_type == "azure":
    cs_driver = get_driver(Provider.AZURE_BLOBS)

driver = cs_driver(access_id, access_secret, region="us-west-1")
container = driver.get_container(container_name=cs_bucket)

file_objects = driver.list_container_objects(container=container, ex_prefix=folder)
for obj in file_objects:
    local_destination = obj.name
    print("Downloading object: %s" % obj.name)
    obj.download(os.path.basename(obj.name), overwrite_existing=True)

# Plotting

# walk through the downloaded files
infer_files = sorted([f for f in os.listdir() if f.endswith(".tif")])

# plot the midplane of the first file
with Image.open(infer_files[0]) as img:
    plt.imshow(img, cmap='gray')
    plt.show()

## Cleaning Up

Delete the experiment and datasets to clean up resources once all jobs are completed.

In [None]:
endpoint = f"{base_url}/experiments/{experiment_id}/jobs"
response = requests.get(endpoint, headers=headers)
for job in response.json()["jobs"]:
    job_id = job["id"]
    endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}"
    response = requests.get(endpoint, headers=headers)
    # If the job is not done, need to cancel it first
    if response.json()["status"] != "Done":
        endpoint = f"{base_url}/experiments/{experiment_id}/jobs/{job_id}:cancel"
        response = requests.post(endpoint, headers=headers)
        assert response.status_code == 200, f"Cancel job failed, got {response.json()}."
        print(response)

endpoint = f"{base_url}/experiments/{experiment_id}"
response = requests.delete(endpoint, headers=headers)
assert response.status_code == 200, f"Delete experiment failed, got {response.json()}."
print(response)

Delete datasets after the experiment is done.

In [None]:
# train dataset
endpoint = f"{base_url}/datasets/{train_dataset_id}"
response = requests.delete(endpoint, headers=headers)
assert response.status_code == 200, f"Delete train dataset failed, got {response.json()}."
print(response)

# validation dataset
endpoint = f"{base_url}/datasets/{val_dataset_id}"
response = requests.delete(endpoint, headers=headers)
assert response.status_code == 200, f"Delete val dataset failed, got {response.json()}."
print(response)

if do_inference:
    # inference dataset
    endpoint = f"{base_url}/datasets/{inference_dataset_id}"
    response = requests.delete(endpoint, headers=headers)
    assert response.status_code == 200, f"Delete inference dataset failed, got {response.json()}."
    print(response)


## Conclusion

Congratulations on reaching this pivotal milestone! With your dataset created and experiment selected, you're now fully equipped to leverage training features of the NVIDIA MONAI Cloud APIs for your medical imaging projects.