# Running a single experiment on an AzML Compute Cluster

In this notebook we demonstrate running a single experiemnt (load data, train classifier, evaluate accuracy, produce classifications) running not locally on the same compute instance as the notebook, but rather being submitted to a compute cluster.

To handle elements of  the processing pipeline that are different running in this way, I have created some classes derived from the main `XbtDataset` and `ClassificationExperiment` classes, namely `AzureDataset` and `AzureExperiment`. The principle changes are:

* loading data from a mounted AzML Dataset
* locating where the JSON experiment file has been copied to by the submit function that launches the experiment on the cluster.
* registering results with the Run in the AzML Experiment Framework through the API.
* (TODO) writing the output classifications to an Azure Blob 


In [16]:
import warnings
warnings.filterwarnings('ignore')
import os
import sys
import pathlib
import json

In [2]:
import matplotlib
import matplotlib.pyplot


In [3]:
root_repo_dir = pathlib.Path().absolute().parent
sys.path = [os.path.join(root_repo_dir)] + sys.path

In [4]:
import azureml.core
import azureml.core.compute
import azureml.core.compute_target
import azureml.train.sklearn

In [6]:
import xbt.azureml
import xbt.common

## Set up parameters
Define some key paths for the experiment. Paths are not generally defined in experiment description, to make the experiment description more portable.
Import definitions include
* The root data directory. This should have subdirectories with the XBT input dataset, as well as for outputs.
* The names of the input and output subdirectories
* The path to JSON experiment description file. 

In [7]:
# Set up some site specific parameters for the notebook
try:
    environment = os.environ['XBT_ENV_NAME']
except KeyError:
    environment = 'azureml'

In [8]:
# AZURE ML SPECIFIC definitions
azure_working_root = '/mnt/batch/tasks/shared/LS_root/mounts/clusters/xbt-test1/code/Users/stephen.haddad'
xbt_compute_cluster_name = 'xbt-cluster'
xbt_vm_size = 'STANDARD_D2_V2'
xbt_max_nodes = 4

# would be good if AzML could figure this from the user info / credentials, as I don't think a user can access other when logged into to a particular workspace?
azml_subscription_id = '1fedcbc3-e156-45f5-a034-c89c2fc0ac61'
azml_resource_group = 'AWSEarth'
azml_workspace_name = 'stephenHaddad_xbt_europeWest'

azml_xbt_dataset_name = 'xbt_input_files'
azml_output_datastore_name = 'misc'
azml_output_datastore_dir = 'xbt-data/results'

In [9]:
root_data_dirs = {
    'MO_scitools': '/data/users/shaddad/xbt-data/',
    'pangeo': '/data/misc/xbt-data/',
    'azureml': os.path.join(azure_working_root, 'xbt-data'),
}
env_date_ranges = {
    'MO_scitools': (1966,2015),
    'pangeo': (1966,2015),
    'azureml': (1966,2015),
}

In [10]:
# Set up some dataset specific parameters
root_data_dir = root_data_dirs[environment]
year_range = env_date_ranges[environment]
exp_name_template = 'cluster_azml_{exp_name}'

In [11]:
input_dir_name = 'csv_with_imeta'
exp_out_dir_name = 'experiment_outputs'

In [12]:
xbt_input_dir = os.path.join(root_data_dir, input_dir_name)
xbt_output_dir = os.path.join(root_data_dir, exp_out_dir_name)

In [13]:
exp_root_dir = os.path.join(root_repo_dir, 'experiments')

In [14]:
exp_json_fname_list = [fn1 for fn1 in os.listdir(exp_root_dir) if os.path.isfile(os.path.join(exp_root_dir, fn1)) and '.json' in fn1]
exp_json_path_list = [os.path.join(exp_root_dir, fn1) for fn1 in exp_json_fname_list]

## Set up the Azure ML environment stuff

location of working dir on the compute instance, not on the cluster. This will be spceific to the compute instance, so we need to find a better way to do this.
It would be good if this sort of thing could be defined either through the AzML API, or in the compute instance environment variables, for example `$AZML_HOME_DIR`


In [19]:
xbt_workspace = azureml.core.Workspace(azml_subscription_id, azml_resource_group, azml_workspace_name)    

## TODO

We should create multiple experiments for each of the files, because each experiment file should correspond to an experiment on azureML and not be separate runs of the same experiemnt, given that so far experiments are different classifiers and input sets.

* different experiments can be accessed in the loop through experiment definitions below
* experiment names are derived from the experiment files
* experiments are accessed, or created if they do not exist.
* the run config is submitted to the relevant defintion, and the run object stored in a dict
* runs can then be used for plotting results at the end of the notebook.

## Prepare/access the the Azure ML compute cluster
If we want to use an AzureML clsuter for training, cross-validation, hyperparameter tuning etc. we need to create an object to access (and potentially start up) a suitable compute cluster.

`

In [20]:
try:
    compute_target = azureml.core.compute.ComputeTarget(workspace=xbt_workspace, name=xbt_compute_cluster_name)
    print('Found existing compute target')
except azureml.core.compute_target.ComputeTargetException:
    print('Creating a new compute target...')
    compute_config = azureml.core.compute.AmlCompute.provisioning_configuration(vm_size=xbt_vm_size, 
                                                           max_nodes=xbt_max_nodes)

    # create the cluster
    compute_target = azureml.core.compute.ComputeTarget.create(xbt_workspace, xbt_compute_cluster_name, compute_config)

    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it uses the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

# use get_status() to get a detailed status for the current cluster. 
print(compute_target.get_status().serialize())

Found existing compute target
{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2020-10-22T23:25:09.340000+00:00', 'errors': None, 'creationTime': '2020-08-26T13:32:02.981314+00:00', 'modifiedTime': '2020-10-23T09:22:36.269768+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 4, 'nodeIdleTimeBeforeScaleDown': 'PT1200S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D13_V2'}


## Running on the cluster


In [21]:
conda_packages = ['python=3.8',
                  'joblib=0.13.2',
                  'pandas=1.0.1',
                  'scikit-learn=0.22.1',
                  'iris=2.4',
                 ]

In [22]:
launch_script = 'bin/run_azml_cv_experiment'

In [23]:
azml_xbt_dataset_obj = azureml.core.Dataset.get_by_name(xbt_workspace, azml_xbt_dataset_name)

In [24]:
xbt_datastore = azureml.core.Datastore.get(xbt_workspace, azml_output_datastore_name)

In [25]:
default_datastore = azureml.core.Datastore.get_default(xbt_workspace)

In [26]:
azml_datastore_exp_dir = 'exp_files'
default_datastore.upload_files(exp_json_path_list, target_path=azml_datastore_exp_dir, overwrite=True)

Uploading an estimated of 12 files
Uploading /mnt/batch/tasks/shared/LS_root/mounts/clusters/xbt-test1/code/Users/stephen.haddad/XBTs_classification/experiments/xbt_param_decisionTree_country.json
Uploading /mnt/batch/tasks/shared/LS_root/mounts/clusters/xbt-test1/code/Users/stephen.haddad/XBTs_classification/experiments/xbt_param_decisionTree_countryLatLon.json
Uploading /mnt/batch/tasks/shared/LS_root/mounts/clusters/xbt-test1/code/Users/stephen.haddad/XBTs_classification/experiments/xbt_param_decisionTree_country_dev.json
Uploading /mnt/batch/tasks/shared/LS_root/mounts/clusters/xbt-test1/code/Users/stephen.haddad/XBTs_classification/experiments/xbt_param_decisionTree_latLon.json
Uploading /mnt/batch/tasks/shared/LS_root/mounts/clusters/xbt-test1/code/Users/stephen.haddad/XBTs_classification/experiments/xbt_param_decisionTree_maxDepthYear.json
Uploading /mnt/batch/tasks/shared/LS_root/mounts/clusters/xbt-test1/code/Users/stephen.haddad/XBTs_classification/experiments/xbt_param_knn_c

$AZUREML_DATAREFERENCE_941e759d125544b0b67f7040b5fa140d

In [27]:
script_params = {
    '--json-experiment': '',
    '--input-dir': azml_xbt_dataset_obj.as_named_input(azml_xbt_dataset_name).as_mount(),
    '--output-dir': 'xbt-data/azml_outputs',
    '--config-dir': default_datastore.as_mount(), 
    '--output-root': xbt_datastore.as_mount(),
}


In [28]:
exp_batch_runs = {}

In [34]:
for exp_fname, exp_path in zip(exp_json_fname_list, exp_json_path_list):
    with open(exp_path) as exp_json_file:
        exp_params = json.load(exp_json_file)    
    experiment_name = exp_name_template.format(exp_name=exp_params['experiment_name'])
    experiment = azureml.core.Experiment(workspace=xbt_workspace, name=experiment_name)
    exp_json_azml_datasource_path = os.path.join(azml_datastore_exp_dir, exp_fname)
    script_params['--json-experiment'] = os.path.join(azml_datastore_exp_dir, exp_fname)
    xbt_estimator = azureml.train.sklearn.SKLearn(
        source_directory=str(root_repo_dir), 
        script_params=script_params,
        compute_target=compute_target,
        entry_script=launch_script,
        conda_packages=conda_packages,
        )
    print(f'submitting run for experiment {experiment_name}')
    exp_batch_runs[experiment_name] = experiment.submit(xbt_estimator)
    



submitting run for experiment cluster_azml_decisionTree_country




submitting run for experiment cluster_azml_decisionTree_countryLatLon




submitting run for experiment cluster_azml_decisionTree_country_dev




submitting run for experiment cluster_azml_decisionTree_latLon




submitting run for experiment cluster_azml_decisionTree_maxDepthYear




submitting run for experiment cluster_azml_knn_country




submitting run for experiment cluster_azml_logreg_country




submitting run for experiment cluster_azml_mlp_country




submitting run for experiment cluster_azml_mlp_countryLatLon




submitting run for experiment cluster_azml_RandomForest_country




submitting run for experiment cluster_azml_RandomForest_countryLatLon




submitting run for experiment cluster_azml_decisionTree_countryLatLon


In [None]:
exp_batch_runs[list(exp_batch_runs.keys())[0]].id

The trained classifier objects are saved to the output directory, one per file. There is also a JSON experiment description file, which is the same as the original description, but with inference added to experiment name and a list of classifier file names. This file can be used to create and run an inference job. The classifier files should be in the same directory as the JSON inference description files.

In [None]:
# json_inf_path = exp2_cv.inference_out_json_path

In [None]:
# TODO: plot results using metrics and scores files attched to run

In [None]:
# fig_results = matplotlib.pyplot.figure('xbt_results',figsize=(25,15))
# for label1, metrics1  in classifiers_cv.items():
#     ax_precision = fig_results.add_subplot(3,5,label1 +1, title='precision split {0}'.format(label1))
#     ax_recall = fig_results.add_subplot(3,5,label1 + 1 + 5 * 1, title='recall split {0}'.format(label1))
#     ax_f1 = fig_results.add_subplot(3,5,label1 + 1 + 5 * 2, title='f1 split {0}'.format(label1))
#     results_cv.plot.line(ax=ax_precision, x='year', y=[f'precision_train_{label1}_all',f'precision_test_{label1}_all'], color=['b', 'r'], ylim=(0.7,1.0))
#     results_cv.plot.line(ax=ax_recall, x='year', y=[f'recall_train_{label1}_all',f'recall_test_{label1}_all'], color=['b', 'r'], ylim=(0.7,1.0))
#     results_cv.plot.line(ax=ax_f1, x='year', y=[f'f1_train_{label1}_all',f'f1_test_{label1}_all'], color=['b', 'r'], ylim=(0.7,1.0))

### Inference
Once we have trained the classifiers, we want to be able to load from the saved state files and run inference on the whole dataset, with the same results. We can use the JSOn file created by the training to run the inference. The JSON inference parameters and the saved classifier object files should be in the same directory. We can then use the `run_inference` function. This will perform the following steps:
* load dataset
* load previously classifiers from file list defined in JSON inference description.
* run inference for each of the classifiers
* fill in classifications where not possible with classifiers using iMeta algorithm
* calculate vote-based probability using ensemble of previously trained classifiers
* save classification results

In [None]:
# exp3_inf = experiment.ClassificationExperiment(json_inf_path, xbt_input_dir, xbt_output_dir)


In [None]:
# %%time
# classifiers_reloaded = exp3_inf.run_inference()