## Batch Inference for DAN Model

In [1]:
import os
from azureml.core import  Workspace
from azureml.core import Model
from azureml.core.resource_configuration import ResourceConfiguration
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core.environment import Environment 
from azureml.core.webservice import AciWebservice,Webservice
from azureml.core.model import Model,InferenceConfig
from azureml.core.runconfig import DEFAULT_CPU_IMAGE, DEFAULT_GPU_IMAGE
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core import Workspace, Experiment, Run, RunConfiguration,ScriptRunConfig
import numpy as np
import pickle as pkl
from tensorflow import keras
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.steps import PythonScriptStep


ws = Workspace.from_config()

Performing interactive authentication. Please follow the instructions on the terminal.


The default web browser has been opened at https://login.microsoftonline.com/organizations/oauth2/v2.0/authorize. Please continue the login in the web browser. If no web browser is available or if the web browser fails to open, use device code flow with `az login --use-device-code`.


Interactive authentication successfully completed.


In [3]:
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Workspace name: cdh-azml-dev-mlw
Azure region: eastus
Subscription id: 320d8d57-c87c-4434-827f-59ee7d86687a
Resource group: CSELS-CDH-DEV


#### Create CPU Compute for feature extraction

In [4]:
clustername = 'StandardD13v2'
is_new_cluster = False
try:
    aml_compute_cpu = ComputeTarget(workspace = ws,name= clustername)
    print("Find the existing cluster")
except ComputeTargetException:
    print("Cluster not find - Creating cluster.....")
    is_new_cluster = True
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS13_V2',
                                                           max_nodes=2)
    aml_compute_cpu = ComputeTarget.create(ws, clustername, compute_config)

aml_compute_cpu.wait_for_completion(show_output=True)

Find the existing cluster
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


#### Create GPU Compute for training

In [5]:
clustername = 'StandardNC6'
is_new_cluster = False
try:
    aml_cluster_gpu = ComputeTarget(workspace = ws,name= clustername)
    print("Find the existing cluster")
except ComputeTargetException:
    print("Cluster not find - Creating cluster.....")
    is_new_cluster = True
    compute_config = AmlCompute.provisioning_configuration(vm_size='StandardNC6',
                                                           max_nodes=2)
    aml_cluster_gpu = ComputeTarget.create(ws, clustername, compute_config)

aml_cluster_gpu.wait_for_completion(show_output=True)

Find the existing cluster
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


#### Score scipt

In [30]:
#%%writefile ./score/score.py

import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from azureml.core import Model
import joblib
import pickle as pkl

def init():
    global model, model_vocab, model_vocab_demog,N_VOCAB,N_DEMOG
    
    model_path =  Model.get_model_path(model_name='dan_d1_icu',version=1)
    model = keras.models.load_model(model_path)

def run(input_data):

    resultList = []

   
    num_rows, num_cols = input_data.shape
    print("input data shape:",input_data.shape)
    print("input data type:",type(input_data))
    print(input_data)
    #Read comma-delimited data into an array
    # data = np.expand_dims(input_data,axis=0)
    # Reshape into a 2-dimensional array for model input
    prediction = model.predict(input_data).reshape((num_rows, 1))
    print("prediction shape:", prediction.shape)
    # Append prediction to results
    resultList.append(prediction)

    return pd.DataFrame(prediction)


#### Feature processing script

In [46]:
%%writefile ./score/feature_preprocessing.py
import os
import time
from importlib import reload
import pandas as pd
import argparse
import numpy as np
import tensorflow as tf
from tensorflow import keras
from azureml.core import Model,Dataset
import joblib
import pickle as pkl
from azureml.core import Workspace, Experiment, Run, RunConfiguration

def preprocessing(inputs):

    run = Run.get_context()
    print("run name:",run.display_name)
    print("run details:",run.get_details())
    
    ws = run.experiment.workspace
        
    #model_vocab_path = Model.get_model_path(model_name='vocab_lstm_icu',_workspace=ws)
    #model_demog_vocab_path = model_vocab = Model.get_model_path(model_name='vocab_demog_lstm_icu',_workspace=ws)
    data_store = ws.get_default_datastore()
    
    vocab = Dataset.File.from_files(path=data_store.path('output/pkl/all_ftrs_dict.pkl'))
    demog_dict = Dataset.File.from_files(path=data_store.path('output/pkl/demog_dict.pkl'))
    
    pwd = os.path.dirname(__file__)
    output_dir = os.path.abspath(os.path.join(pwd,"output"))
    pkl_dir = os.path.join(output_dir, "pkl")

    os.makedirs(pkl_dir, exist_ok=True)
    vocab.download(target_path=pkl_dir,overwrite=True,ignore_not_found=True)
    demog_dict.download(target_path=pkl_dir,overwrite=True,ignore_not_found=True)


    with open(inputs, "rb") as f:
        inputs = pkl.load(f)

    with open(os.path.join(pkl_dir, "all_ftrs_dict.pkl"), "rb") as f:
        model_vocab = pkl.load(f)

    with open(os.path.join(pkl_dir, "demog_dict.pkl"), "rb") as f:
        model_vocab_demog = pkl.load(f)

    features = [l[0][-1] for l in inputs]
    N_VOCAB = len(model_vocab) + 1
    N_DEMOG = len(model_vocab_demog) + 1
    print(N_VOCAB,N_DEMOG)
    new_demog = [[i + N_VOCAB - 1 for i in l[1]] for l in inputs]
    features = [
                    features[i] + new_demog[i] for i in range(len(features))
                ]
    demog_vocab = {k: v + N_VOCAB - 1 for k, v in model_vocab_demog.items()}
    model_vocab.update(demog_vocab)
    N_VOCAB = np.max([np.max(l) for l in features]) + 1
    print(N_VOCAB,N_DEMOG)
    X = keras.preprocessing.sequence.pad_sequences(features,padding='post')
    print(X.shape)

    return X

if __name__ == '__main__':
    parser = argparse.ArgumentParser("feature")
    parser.add_argument("--inputs",type=str)
    parser.add_argument("--features_file",type=str)

    args = parser.parse_args()
    inputs = args.inputs
    print(inputs)
    print(args.features_file)

    features = preprocessing(inputs)
    #with open(os.path.join(args.features_file,"features.pkl"), "wb") as f:
    # pkl.dump(features, f)
    
    os.makedirs(args.features_file,exist_ok=True)
    np.savetxt(os.path.join(args.features_file,"features.csv"), features, delimiter=",")
    run = Run.get_context()
    ws = run.experiment.workspace
    
    data_store = ws.get_default_datastore()
    data_store.upload(src_dir=args.features_file,target_path=args.features_file,overwrite=True,show_progress=True)
    datastore_paths = [(data_store, args.features_file)]
    inputs = Dataset.Tabular.from_delimited_files(path=datastore_paths)
    inputs.register(name='premier_features',workspace=ws,create_new_version=True)




Overwriting ./score/feature_preprocessing.py


#### Reference to input and output data in the datastore

In [6]:
from azureml.core import Workspace, Dataset
from azureml.core import Run
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.pipeline.core import PipelineParameter
from azureml.data import OutputFileDatasetConfig
data_store = ws.get_default_datastore()

##########Loading the data from datastore
print("Creating dataset from Datastore")
inputs = Dataset.File.from_files(path=data_store.path('output/pkl/trimmed_seqs.pkl'))
features_file = OutputFileDatasetConfig(destination=(data_store, 'output/csv'))


Creating dataset from Datastore


##### Environment for feature processing and  batch inference

In [50]:
premier_score_model_env = Environment.from_conda_specification(name='premier_score_model_env', file_path='./environments/conda_dependencies_score.yml')
# Specify a CPU base image
# premier_train_model_env.docker.enabled = True
premier_score_model_env.docker.base_image = DEFAULT_CPU_IMAGE
premier_score_model_env.register(workspace=ws)
run_config_feature = RunConfiguration()
run_config_feature.environment = premier_score_model_env


### Feature Preprocessing job for batch inference

In [51]:
source_directory ='./score'
job_feature_processing = ScriptRunConfig(
                         script="feature_preprocessing.py", 
                         arguments=["--inputs",inputs.as_mount(),"--features_file",features_file],
                         compute_target=aml_compute_cpu, 
                         environment=premier_score_model_env,
                         source_directory=source_directory)
print("job_feature_processing created")

job_feature_processing created


In [52]:
exp_name = f"Job-feature-preprocess-inference"
print("Submit Experiment:",exp_name)
# Create experiment
experiment = Experiment(workspace=ws, name = exp_name)
run = experiment.submit(job_feature_processing)

Submit Experiment: Job-feature-preprocess-inference


## Batch Inference

#### Create Pararallel Run step

In [28]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.core import Pipeline

OUTCOME = 'icu'

# Get the batch dataset for input
batch_data_set = ws.datasets['premier_features']

# Set the output location
output_dir = OutputFileDatasetConfig(name='inferences',destination=(data_store, 'output/inferences'))

# Define the parallel run step step configuration
parallel_run_config = ParallelRunConfig(
    source_directory='./score',
    entry_script="score.py",
    mini_batch_size="10MB",
    error_threshold=10000,
    output_action='append_row',
    append_row_file_name="file_size_outputs.txt",
    environment=premier_score_model_env,
    compute_target=aml_cluster_gpu,
    node_count=2)

# Create the parallel run step
parallelrun_step = ParallelRunStep(
    name='batch-score',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input("premier_features")],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)
# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

### Submit parallel batch inference job

In [29]:
from azureml.core import Experiment

# Run the pipeline as an experiment
### WHICH OUTCOME??
exp_name = f"Job-batch-prediction-{OUTCOME}"
pipeline_run = Experiment(ws, exp_name).submit(pipeline)
#pipeline_run.wait_for_completion(show_output=False)

Created step batch-score [dd5de4d8][b7c40756-33bb-42c7-bc6e-176d439dae33], (This step will run and generate new outputs)
Submitted PipelineRun d8845d54-be45-4b01-9230-6d1ea035e0f9
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/d8845d54-be45-4b01-9230-6d1ea035e0f9?wsid=/subscriptions/320d8d57-c87c-4434-827f-59ee7d86687a/resourcegroups/CSELS-CDH-DEV/workspaces/cdh-azml-dev-mlw&tid=9ce70869-60db-44fd-abe8-d2767077fc8f
