# Distributed Scoring Demo


## Instructions

This example demonstrate how to run distributed Scoring demo on Azure Batch AI cluster.



### Install Dependencies and Create Configuration file.
Follow [instructions](/recipes) to install all dependencies and create configuration file.

### Read Configuration and Create Batch AI client

In [None]:
from __future__ import print_function

from datetime import datetime
import os
import sys

from azure.storage.file import FileService
from azure.storage.blob import BlockBlobService
import azure.mgmt.batchai.models as models

# utilities.py contains helper functions used by different notebooks
import utilities

cfg = utilities.Configuration('configuration.json')
client = utilities.create_batchai_client(cfg)
utilities.create_resource_group(cfg)

## 1. Prepare Training Dataset and Script in Azure Storage

### Create Azure Blob Container

We will create a new Blob Container with name `batchaisample` under your storage account. This will be used to store the *input training dataset*

In [None]:
azure_blob_container_name = 'batchaisample'
BlockBlobService(cfg.storage_account_name, cfg.storage_account_key).create_container(azure_blob_container_name, fail_on_exist=False)

### Create Azure File Share

For this example we will create a new File Share with name `batchaisample` under your storage account. This will be used to share the *training script file* and *output file*.

In [None]:
azure_file_share_name = 'abc'
file_service = FileService(cfg.storage_account_name, cfg.storage_account_key)
file_service.create_share(azure_file_share_name, fail_on_exist=False)

## 2. Create Azure Batch AI Compute Cluster

### Configure Compute Cluster
- For this example we will use a gpu cluster of 2 `STANDARD_NC6` nodes. You can increase the number of nodes by changing `nodes_count` variable;
- We will mount file share at folder with name `external`. Full path of this folder on a computer node will be `$AZ_BATCHAI_MOUNT_ROOT/external`;
- We will call the cluster `nc6`;

So, the cluster will have the following parameters:

In [None]:
azure_file_share = 'external_AFS'
azure_blob = 'external_ABFS'

nodes_count = 8
cluster_name = 'nc24sv2cx'

volumes = models.MountVolumes(
    azure_file_shares=[
        models.AzureFileShareReference(
            account_name=cfg.storage_account_name,
            credentials=models.AzureStorageCredentialsInfo(
                account_key=cfg.storage_account_key),
            azure_file_url='https://{0}.file.core.windows.net/{1}'.format(
                cfg.storage_account_name, azure_file_share_name),
            relative_mount_path=azure_file_share)
    ],
    azure_blob_file_systems=[
        models.AzureBlobFileSystemReference(
            account_name=cfg.storage_account_name,
            credentials=models.AzureStorageCredentialsInfo(
                account_key=cfg.storage_account_key),
            container_name=azure_blob_container_name,
            relative_mount_path=azure_blob)
    ]
)

parameters = models.ClusterCreateParameters(
    location=cfg.location,
    vm_size="STANDARD_NC24S_V2",
    scale_settings=models.ScaleSettings(
        manual=models.ManualScaleSettings(target_node_count=nodes_count)
    ),
    node_setup=models.NodeSetup(
        mount_volumes=volumes,
    ),
    user_account_settings=models.UserAccountSettings(
        admin_user_name=cfg.admin,
        admin_user_password=cfg.admin_password,
        admin_user_ssh_public_key=cfg.admin_ssh_key
    )
)

### Create Compute Cluster

In [None]:
cluster = client.clusters.create(cfg.resource_group, cluster_name, parameters).result()

### Monitor Cluster Creation

Monitor the just created cluster. utilities.py contains a helper function to print out detail status of the cluster.

In [None]:
cluster = client.clusters.get(cfg.resource_group, cluster_name)
utilities.print_cluster_status(cluster)

## 3. Run Batch AI Job for Cloth Recognition

### Update inference script in Azure File Share if need

Just in case you modified the inference script, you need to update the local modified script to cloud.

In [11]:
script_to_deploy = 'dist_inference.py'
file_service.create_file_from_path(
    azure_file_share_name, '', script_to_deploy, script_to_deploy)

### Configure Job

- The job will use `tensorflow/tensorflow:1.4.0-gpu` container.
- Will use job preparation to install dependencies in container
- Will store console log and output label of the job in File Share:
- Will use configured previously input and output directories.
- The job will be able to reference those directories using environment variables:
    - AZ_BATCHAI_INPUT_ROOT : refers to the mounted path of Azure File Share, where pretained model and scripts locate
    - AZ_BATCHAI_INPUT_DATASET : refers to the mounted path of Azure Container Blob, where image dataset locate
- Will use custom toolkit to start OpenMPI cross workers
- Specify which inference task in command line 1) `cloth_recognition` or 2) `classification`

In [12]:
inference_task = 'cloth_recognition'
parameters = models.job_create_parameters.JobCreateParameters(
     location=cfg.location,
     cluster=models.ResourceId(cluster.id),
     node_count=8,
     input_directories=[
         models.InputDirectory(
             id='DATASET',
             path='$AZ_BATCHAI_MOUNT_ROOT/{0}/{1}'.format(azure_blob, 'images')),
         models.InputDirectory(
             id='ROOT',
             path='$AZ_BATCHAI_MOUNT_ROOT/{0}'.format(azure_file_share))],
     output_directories = [
         models.OutputDirectory(
             id='LABEL',
             path_prefix='$AZ_BATCHAI_MOUNT_ROOT/{0}'.format(azure_file_share),
             path_suffix="Label")],
     std_out_err_path_prefix="$AZ_BATCHAI_MOUNT_ROOT/{0}".format(azure_file_share),
     job_preparation=models.JobPreparation(
        command_line="pip install opencv-python EasyDict pyyaml;apt update && apt install -y libsm6 libxext6 libxrender-dev mpi-default-dev mpi-default-bin"),
     container_settings=models.ContainerSettings(
         models.ImageSourceRegistry(image='tensorflow/tensorflow:1.4.0-gpu')),
     custom_toolkit_settings = models.CustomToolkitSettings(
         command_line='mpirun -mca btl_tcp_if_exclude docker0,lo --allow-run-as-root --hostfile $AZ_BATCHAI_MPI_HOST_FILE python -u $AZ_BATCHAI_INPUT_ROOT/dist_inference.py --datadir $AZ_BATCHAI_INPUT_DATASET --outputdir $AZ_BATCHAI_OUTPUT_LABEL --rootdir $AZ_BATCHAI_INPUT_ROOT --max 100000 --inference {0}'.format(inference_task))
)

### Create a training Job and wait for Job completion


In [13]:
job_name = inference_task + datetime.utcnow().strftime("_%m_%d_%Y_%H%M%S")
job = client.jobs.create(cfg.resource_group, job_name, parameters).result()
print('Created Job: {}'.format(job_name))

Created Job: cloth_recognition_03_12_2018_073609


### Wait for Job to Finish
The job will start running when the cluster will have enought idle nodes. The following code waits for job to start running printing the cluster state. During job run, the code prints current content of stdeout-0.txt (the output of the worker running on the first node).

In [14]:
utilities.wait_for_job_completion(client, cfg.resource_group, job_name, cluster_name, 'stdouterr', 'stdout.txt')

Cluster state: AllocationState.steady Target: 8; Allocated: 8; Idle: 0; Unusable: 0; Running: 8; Preparing: 0; Leaving: 0
Job state: running ExitCode: None
Waiting for job output to become available...
Split dataset based on given task 28
Split dataset based on given task 30
Split dataset based on given task 29
Split dataset based on given task 31
Worker 31 needs to process 3124 images
Worker 29 needs to process 3124 images
Worker 28 needs to process 3124 images
Worker 30 needs to process 3124 images
Split dataset based on given task 12
Split dataset based on given task 14
Split dataset based on given task 13
Split dataset based on given task 15
Worker 15 needs to process 3124 images
Worker 13 needs to process 3124 images
Worker 12 needs to process 3124 images
Worker 14 needs to process 3124 images
Split dataset based on given task 24
Split dataset based on given task 25
Split dataset based on given task 26
Split dataset based on given task 4
Split dataset based on given task 5
Split d

### Download label files for the Job

In [None]:
files = client.jobs.list_output_files(cfg.resource_group, job_name, models.JobsListOutputFilesOptions("LABEL")) 
for f in list(files):
    print(f.name + ": " + str(f.content_length) + ' Bytes')
    utilities.download_file(f.download_url, f.name)
print("All files downloaded")

## 4. Clean Up (Optional)

### Delete the Job

In [None]:
_ = client.jobs.delete(cfg.resource_group, job_name)

### Delete the Cluster
When you are finished with the sample and don't want to submit any more jobs you can delete the cluster using the following code.

In [None]:
_ = client.clusters.delete(cfg.resource_group, cluster_name)

### Delete File Share
When you are finished with the sample and don't want to submit any more jobs you can delete the file share completely with all files using the following code.

In [None]:
service = FileService(cfg.storage_account_name, cfg.storage_account_key)
service.delete_share(azure_file_share_name)