# HyperParameter Tuning with Elastic Distributed Training example notebook for WMLA

This is a sample notebook showing how to use the "Bring Your Own Framework" API of WMLA to tune community PyTorch
Vision model with HyperParameter Optimization and Elastic Distributed Training of Watson ML Accelerator. 

https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

This example uses a Elastic Distributed instance group.

[Kelvin Lui](https://w3.ibm.com/search/#/search?query=Kelvin%20lui)


BYOF creates execution tasks via API - this is the equivalent of running this command via the `dlicmd` command locally on the cluster. Details of the `dlicmd` command can be found in [dlicmd.py reference](https://www.ibm.com/support/knowledgecenter/en/SSFHA8_1.2.1/cm/dlicmd.html) in Knowledge Centre. 

Note: this setup assumes that models can access data sources local to the WMLA cluster - although this data could be downloaded during the model training phase. 

## Contents:
<a id='WMLA-HPO-Hyperband-Wells-EDT'></a>
- [Log on](#Log-on)
- [Health Check](#Health-Check)
- [Update Model](#Update-model)
- [Launch HPO Task](#Launch-HPO-Task)
- [Check HPO Status](#Check-HPO-Status)
- [Manage HPO tasks](#Manage-HPO-Task)


### Common Setup

For WMLA cluster, URLs required are:
```
$ egosh client view ASCD_REST_BASE_URL_1 | grep DESCRIPTION | cut -d' ' -f2
http://hostname.local:8280/platform/rest/
```
Giving: http://hostname.local:8280/platform/rest/conductor/v1

```
$ egosh client view DLPD_REST_BASE_URL_1 | grep DESCRIPTION | cut -d' ' -f2
http://hostname.local:9243/platform/rest/
```
Giving: http://hostname.local:9243/platform/rest/deeplearning/v1

API documentation links (can also be found on both links above):

- IBM Spectrum Conductor RESTful APIs: http://hostname.local:8280/cloud/apis/explorer/
- IBM Spectrum Conductor Deep Learning Impact RESTful APIs: http://hostname.local:9243/cloud/apis/explorer/
- Also further details on Knowledge Centre: https://www.ibm.com/support/knowledgecenter/en/SSWQ2D_1.2.1/cm/deeplearning.html

In [1]:
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

import json
import time
import urllib
import pandas as pd



In [2]:
# Environment details:

master_host = '**** ADD HERE ****'
dli_rest_port = '**** ADD HERE ****' #'9280'
sc_rest_port = '**** ADD HERE ****' #'8280'
protocol = 'http'

sc_rest_url =  protocol+'://'+master_host+':'+sc_rest_port+'/platform/rest/conductor/v1'
dl_rest_url = protocol+'://'+master_host+':'+dli_rest_port+'/platform/rest/deeplearning/v1'

print (sc_rest_url)
print (dl_rest_url)

# User login details
wmla_user = '**** ADD HERE ****'
wmla_pwd = '**** ADD HERE ****'


myauth = (wmla_user, wmla_pwd)

# Spark instance group details
sig_name = '**** ADD HERE ****'


# REST call variables
commonHeaders = {'Accept': 'application/json'}


#startTuneUrl='%s://%s:%s/platform/rest/deeplearning/v1/hypersearch' % (protocol, master_host, dli_rest_port)
#sc_rest_url ='%s://%s:%d/platform/rest/conductor/v1' % (protocol, hostname, conductorport)

req = requests.Session()

http://colonia04.platform:8280/platform/rest/conductor/v1
http://colonia04.platform:9280/platform/rest/deeplearning/v1


## Log On
<a id='Log-on'></a>

[back to top](#WMLA-HPO-Hyperband-Wells-EDT)

Obtain login session tokens to be used for session authentication within the RESTful API. Tokens are valid for 8 hours.

In [3]:
r = requests.get(sc_rest_url+'/auth/logon', verify=False, auth=myauth, headers=commonHeaders) 

if r.ok:
    print ('\nLogon succeeded')
    
else: 
    print('\nLogon failed with code={}, {}'. format(r.status_code, r.content))


Logon succeeded


### Health Check
[back to top](#WMLA-HPO-Hyperband-Wells-EDT)



Check if there is any existing hpo tasks and also verify the platform health

Rest API: **GET platform/rest/deeplearning/v1/hypersearch**
- Description: Get all the hpo task that the login user can access.
- OUTPUT: A list of hpo tasks and each one with the same format which can be found in the api doc.

In [12]:
getTuneStatusUrl = dl_rest_url + '/hypersearch'
print ('getTuneStatusUrl: %s' %getTuneStatusUrl)
r = req.get(getTuneStatusUrl, headers=commonHeaders, verify=False, auth=myauth)

if not r.ok:
    print('check hpo task status failed: code=%s, %s'%(r.status_code, r.content))
else:
    if len(r.json()) == 0:
        print('There is no hpo task been created')
    for item in r.json():
        print('Hpo task: %s, State: %s'%(item['hpoName'], item['state']))
        #print('Best:%s'%json.dumps(item.get('best'), sort_keys=True, indent=4))

getTuneStatusUrl: http://colonia04.platform:9280/platform/rest/deeplearning/v1/hypersearch
Hpo task: Admin-hpo-865764150117434, State: FAILED
Hpo task: Admin-hpo-1101261874485516, State: FAILED
Hpo task: Admin-hpo-3651752950675102, State: FAILED
Hpo task: Admin-hpo-3651791970264588, State: FAILED
Hpo task: Admin-hpo-3652656031423039, State: FAILED
Hpo task: Admin-hpo-3652848133085365, State: FAILED
Hpo task: Admin-hpo-4402508485777365, State: RECOVERPEND
Hpo task: Admin-hpo-4404453847231966, State: FINISHED
Hpo task: Admin-hpo-4405121817174855, State: RECOVERPEND
Hpo task: Admin-hpo-17975643951593, State: FAILED
Hpo task: Admin-hpo-18176491357360, State: FAILED
Hpo task: Admin-hpo-26662761022292, State: FAILED
Hpo task: Admin-hpo-29763538447983, State: FAILED
Hpo task: Admin-hpo-41642042184978, State: FAILED
Hpo task: Admin-hpo-42385955978459, State: FINISHED
Hpo task: Admin-hpo-45405500448823, State: FINISHED
Hpo task: Admin-hpo-45723267117425, State: FINISHED
Hpo task: Admin-hpo-4615

## Update Model 
[back to top](#WMLA-HPO-Hyperband-Wells-EDT)

We will update the model to enable:
 * Elastic Distributed Training
 * Hyper-parameter Optimization 

###  Model file update to run Elastic Distributed Training

Elastic Distributed Training (EDT) takes a model built on a standalone system,  and distributes model training across multiple GPUs and compute nodes in elastic fashion.   EDT starts job execution with minimal 1 GPU,  and GPUs can be added or removed dynamically while executing.

We will use Elastic Distributed Training to distribute a single HPO task with multiple gpus.


In this sample we will update the PyTorch RestNet18 model and enable Elastic Distributed Training: https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

Model changes required from 3 perspective:
* Import Elastic Distributed Training library and environment variable
* Replace the data loading functions with ones that are compatible with EDT - the data loading function must return a tuple containing two items of type torch.utils.data.Dataset 
* Replace the training and testing loops with EDT’s train function


##### Model update part 1.  Import Elastic Distributed Training library and environment variable

<pre>
path=os.path.join(os.getenv("FABRIC_HOME"), "libs", "fabric.zip")
print(path)
sys.path.insert(0,path)
from fabric_model import FabricModel
from edtcallback import EDTLoggerCallback

dataDir = environ.get("DATA_DIR")
if dataDir is not None:
    print("dataDir is: %s"%dataDir)
else:
    print("Warning: not found DATA_DIR from os env!")
</pre>

    

##### Model update part 2.  Replace the data loading functions with ones that are compatible with EDT - the data loading function must return a tuple containing two items of type torch.utils.data.Dataset 

* Download the Dataset from https://download.pytorch.org/tutorial/hymenoptera_data.zip

<pre>
def getDatasets():

    data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
       transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    }

    return (datasets.ImageFolder(os.path.join(dataDir, 'train'), data_transforms['train']),
           datasets.ImageFolder(os.path.join(dataDir, 'val'), data_transforms['val']))
</pre>         

##### Model update part 3.  Replace the training and testing loops with EDT’s train function

 a. Instantiate Elastic Distributed Training instance
 <pre>
 edt_m = FabricModel(model, getDatasets, loss_function, optimizer, driver_logger=EDTLoggerCallback())
 </pre>
 b. Launch EDT job with specific parameters
 <pre>
 edt_m.train(epoch_number, effective_batch_size, max_number_GPUs_assigned_to_EDT_job)
 </pre>

### Model file update to Run HPO

Model changes required from 2 perspective:
- Inject hyper-parameters for the sub-training during search
- Retrieve sub-training result metric

##### Model update part 1 - Inject hyper-parameters

The hyper-parameters will be supplied in a file called **config.json** with JSON format,located in the current working directory and can be read direcly as the following example snippet.

<pre>
hyper_params = json.loads(open("<b>config.json</b>").read())
learning_rate = float(hyper_params.get("<b>learning_rate</b>", "0.01"))
</pre>

After this, you can use these hyper-parameters during the model trainings. The **hyper-parameter name** and **value** type is defined through the search space part in body of REST call when launching a new hpo task.

##### Model update part 2 - Retrieve sub-training result metric

At the end of your training run, your code will need to create a file called **val_dict_list.json** with test metrics generated during training. These metrics will be used by the search algorithm to propose new sets of hyper-parameters. Please note that **val_dict_list.json** should be created under the result directory which can be retrieved through the environment variable **RESULT_DIR**.

<pre>
with open('{}/val_dict_list.json'.format(os.environ['<b>RESULT_DIR</b>']), 'w') as f:
    json.dump(test_metrics, f)
</pre>

We add this code in the function def on_train_end(self) defined in edtcallback.py  

The content of **val_dict_list.json** will be some thing as below, **step** is some thing optional meaning the training iteration or epochs, one of **loss** and **accuracy** can be the name of target metric to optimize, at least one metric need to be included here. The specific name of metric used to optimize (minimize or maximize) is defined in the body of REST call when launching a new hpo task. 

```
[
{‘step’: 1, ‘loss’:0.2487, ‘accuracy’: 0.4523},
{‘step’: 2, ‘loss’:0.1487, ‘accuracy’: 0.5523},
{‘step’: 3, ‘loss’:0.1087, ‘accuracy’: 0.6523},
…
]
```

## Launch HPO task

REST API: **POST /platform/rest/deeplearning/v1/hypersearch**
- Description: Start a new HPO task
- Content-type: Multi-Form
- Multi-Form Data:
  - files: Model files tar package, ending with `.modelDir.tar`
  - form-filed: {‘data’: ‘String format of input parameters to start hpo task, let’s call it as **hpo_input** and show its specification later’}


##### Package model files for training

Package the updated model files into a tar file ending with `.modelDir.tar`

In [7]:
import tarfile
import tempfile
import os
def make_tarfile(output_filename, source_dir):
    with tarfile.open(output_filename, "w:gz") as tar:
        tar.add(source_dir, arcname=os.path.basename(source_dir))
MODEL_DIR_SUFFIX = ".modelDir.tar"
tempFile = tempfile.mktemp(MODEL_DIR_SUFFIX)
make_tarfile(tempFile, '/Users/Kelvin/Downloads/WellsFargo/CSSC_EDT/pytorch_edt_cssc')
files = {'file': open(tempFile, 'rb')}

##### Construct POST request data

**hpo_input** will be a Python dict or json format as below, convert to string when calling REST.

In [8]:

#'args': '--exec-start edtPyTorch --cs-datastore-meta type=fs, data_path=/dli_data_fs \
#                     --gpuPerWorker 1 --model-main pytorch_mnist.py --edt-options maxWorkers=8 \
#                     --model-dir pytorch_edt_hpo'

data =  {
        'modelSpec': # Define the model training related parameters
        {
            # Spark instance group which will be used to run the HPO sub-trainings. The Spark instance group selected
            # here should match the sub-training args, for example, if the sub-training args try to run a EDT job,
            # then we should put a Spark instance group with capability to run EDT job here.
            'sigName': sigName,

            # These are the arguments we'll pass to the execution engine; they follow the same conventions
            # of the dlicmd.py command line launcher
            #
            # See:
            #   https://www.ibm.com/support/knowledgecenter/en/SSFHA8_1.2.1/cm/dlicmd.html
            #
            # In this example,  we have a single server with 4 GPUs.
            # The HPO Task has 121 sub-trainings (maxJobNum=121).     
            # These 121 sub-trainings will be scheduled to run in 61 batches (121/2). 
            # There are two parallel sub-trainings running in each batch (maxParalleJobNum=2).  
            # Each sub-training is distributed in 2 GPUs with Elastic Distributed Training (maxWorker=2)
            # 
            'args': '--model-dir pytorch_edt_cssc --exec-start edtPyTorch  \
                     --cs-datastore-meta type=fs,data_path=hymenopteradata \
                     --edt-options maxWorkers=2 --model-main pytorch_mnist_HPO_EDT.py \
                     --debug-level debug  \
                    '

        },    
        'algoDef': # Define the parameters for search algorithms
        {
            # Name of the search algorithm, one of Random, Bayesian, Tpe, Hyperband
            'algorithm': 'Hyperband', 
            # Max running time of the hpo task in minutes, -1 means unlimited
            'maxRunTime': -1,  
            # Max number of training job to submitted for hpo task, -1 means unlimited’,
            'maxJobNum': 121,            
            # Max number of training job to run in parallel, default 1. It depends on both the
            # avaiable resource and if the search algorithm support to run in parallel, current only Random
            # fully supports to run in parallel, Hyperband and Tpe supports to to in parellel in some phase,
            # Bayesian runs in sequence now.
            'maxParalleJobNum': 2, 
            # Name of the target metric that we are trying to optimize when searching hyper-parameters.
            # It is the same metric name that the model update part 2 trying to dump.
            'objectiveMetric' : 'loss',
            # Strategy as how to optimize the hyper-parameters, minimize means to find better hyper-parameters to
            # make the above objectiveMetric as small as possible, maximize means the opposite.
            'objective' : 'minimize',
            # eta value to control the hyper-band search process
            'hyperbandEta': 3.0,
            #Additional parameters for the specified search algorithm and hyper-band get following too.
            'algoParams' : 
                [
                    {
                        # Name of the the maximum amount of resource that can be allocated to a single configuration
                        'name':'ResourceName', 
                        'value': 'epochs'
                    },
                    {
                        # Value of the the maximum amount of resource that can be allocated to a single configuration
                        'name':'ResourceValue',
                        'value':'81'
                    }
                    # This resource parameter will change during hyperband searching phase and its value will be put
                    # into the config.json for each sub-training too, here with the 'epoch' as key and value in the
                    # range 1-10.
                ]
        },
    
        # Define the hyper-paremeters to search and the corresponding search space.
        'hyperParams':
        [
             {
                 # Hyperparameter name, which will be the hyper-parameter key in config.json
                 'name': 'learning_rate',
                 # One of Range, Discrete
                 'type': 'Range',
                 # one of int, double, str
                 'dataType': 'DOUBLE',
                 # lower bound and upper bound when type=range and dataType=double
                 'minDbVal': 0.001,
                 'maxDbVal': 0.1,
                 # lower bound and upper bound when type=range and dataType=int
                 'minIntVal': 0,
                 'maxIntVal': 0,
                 # Discrete value list when type=discrete
                 'discreteDbVal': [],
                 'discreteIntVal': [],
                 'discreateStrVal': [],
                 #step size to split the Range space. ONLY valid when type is Range
                 'step': '0.002',
             }
         ]
    }
mydata={'data':json.dumps(data)}

##### Submit the Post request

Submit hpo task through the Post call and a hpo name/id as string format will get back.

In [9]:
startTuneUrl=dl_rest_url + '/hypersearch'
r = req.post(startTuneUrl, headers=commonHeaders, data=mydata, files=files, verify=False, auth=myauth)
if r.ok:
    hpoName = r.json()
    json_out_1 = r.json()
    print ('HPO Application: %s' %json_out_1)

else: 
    print('\nModel submission failed with code={}, {}'. format(r.status_code, r.content))

HPO Application: Admin-hpo-584914877877339


## Check HPO task status

REST API: **GET /platform/rest/deeplearning/v1/hypersearch/{hponame}**
- Description: Retrieve the hpo task details with the specified hpo task name/id in URL.
- OUTPUT: A particular hpo task with details of the specified hponame

In [11]:
import time

getHpoUrl = dl_rest_url +'/hypersearch/'+ hpoName

res = req.get(getHpoUrl, headers=commonHeaders, verify=False, auth=('Admin', 'Admin'))
if not res.ok:
    print('get hpo task failed: code=%s, %s'%(res.status_code, res.content))
else:
    json_out=res.json()

    while json_out['state'] in ['RUNNING']:
        print('Hpo task %s state %s progress %s%%'%(hpoName, json_out['state'], json_out['progress']))
        time.sleep(60)
        res = req.get(getHpoUrl, headers=commonHeaders, verify=False, auth=myauth)
        json_out=res.json()

    print('Hpo task %s completes with state %s'%(hpoName, json_out['state']))
    print(json.dumps(json_out, indent=4, sort_keys=True))


Hpo task Admin-hpo-584914877877339 completes with state FINISHED
{
    "best": {
        "appId": "Admin-589646909629184-1272803389",
        "driverId": "driver-20200127070751-0242-15067803-3314-4bb0-b7ef-2801a3ad9ffe",
        "endTime": "2020-01-27 07:13:00",
        "hyperParams": [
            {
                "dataType": "double",
                "fixedVal": "0.095",
                "name": "learning_rate",
                "userDefined": false
            },
            {
                "dataType": "str",
                "fixedVal": "1",
                "name": "epochs",
                "userDefined": false
            },
            {
                "dataType": "str",
                "fixedVal": "3",
                "name": "epochs",
                "userDefined": false
            },
            {
                "dataType": "str",
                "fixedVal": "9",
                "name": "epochs",
                "userDefined": false
            },
            {
            

**Full Response of a hpo task status.**

```
{
"hpoName": "string, name/id of the hpo task",
"state": "string, hpo task state, SUBMITTED, RUNNING, FAILED, FINISHED, STOPPED",
"running": "int, number of training that is under-going right now",
"complete": "int, number of training that has completes, including both succeeded and failed trainings",
"failed": "int, number of training that failed",
"progress": "string, progress of a percentage value",
"createtime": "string, time this task was created",
"creator": "string, user name who created this hpo task",
"duration": "string, how long the task been run with format hh-mm-ss",
"experiments": [
                {
                "id": "int, counter id of the experiment training, start from 0",
                "metricVal": "double, best metric value of the experiment training",
                "state": "string, state of the BYOF training task"
                "appId": "string, BYOF training task id",
                "driverId": "string, for internal usage, the real job id for training, currently it is the spark driver id",
                "hyperParams": [
                                {
                                "name": "string, name of the hyperparameter",
                                "dataType": "string, data type of the hyperparameter",
                                "fixedVal": "string, The hyperparamter value been used in this experiment training with the same datatype as input dataType"
                                }
                                ],
                }
                ],
"best": {"one of the experiments with the best metric value, smallest or biggest one based on the original objective minimize or maximize"}
}

```


## Manage HPO tasks

#### Check HPO tasks

Same REST api as the early health check one.

In [None]:
#getTuneStatusUrl = 'https://{}:9243/platform/rest/deeplearning/v1/hypersearch'.format(hostname)
res = req.get(startTuneUrl, headers=commonHeaders, verify=False, auth=('Admin', 'Admin'))
if not res.ok:
    print('check tune job status failed: code=%s, %s'%(res.status_code, res.content))
else:
    #print(json.dumps(r.json(), sort_keys=True, indent=4))
    if len(res.json()) == 0:
        print('There is no hpo task been created')
    for item in res.json():
        #print(item['hpoName'])
        print('Hpo task: %s, State: %s'%(item['hpoName'], item['state']))
        #print('Hpo tasks detail:%s'%json.dumps(item, sort_keys=True, indent=4))        

#### Stop Hpo task

REST API: **PUT /platform/rest/deeplearning/v1/hypersearch/{hponame}**
- Description: Stop the specified hpo job with name/di {hponame}
- OUTPUT: success http return code

Since the hpo job has already finished, so nothing to stop here.

In [None]:
stopHpoUrl = 'https://{}:9243/platform/rest/deeplearning/v1/hypersearch/{}'.format(hostname, hpoName)
r=req.put(stopHpoUrl,headers=commonHeaders, verify=False, auth=('Admin','Admin'))
if not r.ok:
    print('stop hpo task failed: code=%s, %s'%(r.status_code, r.content))
else:
    print('stop hpo task succeeds')

#### Delete Hpo tasks

REST API: **DELETE /platform/rest/deeplearning/v1/hypersearch/{hponame}**
- Description: Delete the specified hpo task with name/id {hponame}
- OUTPUT: success http return code

REST API: **DELETE /platform/rest/deeplearning/v1/hypersearch**
- Description: Delete all the hpo task that the login user can access. But please be aware, the BYOF training been created by this hpo task will not be deleted.
- OUTPUT: success http return code

In [None]:
deleteHpoUrl = 'https://{}:9243/platform/rest/deeplearning/v1/hypersearch/Admin-hpo-214707651328489'.format(hostname)
r=req.delete(deleteHpoUrl,headers=commonHeaders, verify=False, auth=('Admin','Admin'))
if not r.ok:
    print('delete hpo task failed: code=%s, %s'%(r.status_code, r.content))
else:
    print('delete hpo task succeeds')