# Batch scoring for large dataset, 01.03.2021

In [None]:
import azureml.core
from azureml.core import Dataset, Workspace, Experiment
import os
azureml.core.VERSION


In [None]:
# My info
ws = Workspace.from_config()
datastore = ws.get_default_datastore()

print(ws.name, ws.resource_group, ws.location, ws.subscription_id, datastore.name, sep = '\n')

In [None]:
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
import os

# choose a name for your cluster
compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "automl-compute")
compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 4)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D2_V2")


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                                min_nodes=compute_min_nodes,
                                                                max_nodes=compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(
        ws, compute_name, provisioning_config)

    # can poll for a minimum number of nodes and for a specific timeout.
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(
        show_output=True, min_node_count=None, timeout_in_minutes=20)

    # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

## Tabular dataset - registering at once

In [None]:
datastore = ws.get_default_datastore()

##########################
#diabetes data
##############
diabetes_data = Dataset.Tabular.from_delimited_files(path=[(datastore, '/helen/data/diabetes_data.txt')],separator=' ')
diabetes_data = diabetes_data.register(workspace=ws, 
                                 name='diabetes_data',
                                 description='diabetes data',
                                 create_new_version=True)


##########################
#diabetes labels
##############
diabetes_labels = Dataset.Tabular.from_delimited_files(path=[(datastore, '/helen/data/diabetes_labels.txt')],separator=' ')
diabetes_labels = diabetes_labels.register(workspace=ws,
                                 name='diabetes_labels',
                                 description='diabetes labels',
                                 create_new_version=True)


## Tabular dataset - accessing in script

In [None]:
#Accessing dataset which is already registered
# get dataset by dataset name
diabetes_data = Dataset.get_by_name(workspace=ws, name='diabetes_data')

df = diabetes_data.to_pandas_dataframe()
df.head(10)

## Model - accessing in script

In [None]:
# Find all models called "best_model" and display their version numbers
from azureml.core.model import Model
models = Model.list(ws, name='helen_test')
for m in models:
    print(m.name, m.version)

In [None]:
# Uploading model into variable
modelname='helen_test'
model = Model(ws, modelname, version=60)
model

## Scoring script

In [None]:
%%writefile ./helen/script/helen_score.py


import io
import pickle
import argparse
import numpy as np
import pandas as pd

from azureml.core.model import Model
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib

def init():
    global iris_model

    parser = argparse.ArgumentParser(description="Iris model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    args, unknown_args = parser.parse_known_args()
    
    model_path = Model.get_model_path(args.model_name)
    
    print (model_path)
    print (args.model_name)
        
    iris_model = joblib.load(open(model_path, 'rb'))
       
    #with open(model_path, 'rb') as model_file:
    #    iris_model = pickle.load(model_file)
    print (iris_model)


def run(input_data):
    # make inference
    
    print ('type initial ', type (input_data))
    
    num_rows, num_cols = input_data.shape
    print ('data initial',input_data)
    print ('inp df', input_data.shape)
    #pred = iris_model.predict(input_data).reshape((num_rows, 1))
    #print ('out ', pred.shape)
    
    df_np=input_data.to_numpy()
    num_rows, num_cols = df_np.shape
    print ('inp np', df_np.shape)
    pred = iris_model.predict(df_np).reshape((num_rows, 1))
    print ('data out',pred)
    print ('out ', pred.shape)
  
    # cleanup output
    result_numpy = np.append (input_data, pred, 1)
    # CORRECT TO LIST result=result_numpy.tolist()
    # CORRECT TO NUMPY result=result_numpy
    result=pd.DataFrame(data=result_numpy)
    
    print ('type final ', type (result))
    return result

In [None]:
# create a directory in my local comuter
script_folder = './helen/script'
os.makedirs(script_folder, exist_ok=True)
os.listdir(script_folder)

In [None]:

scripts_folder = "./helen/script"
script_file = "helen_score.py"

# peek at contents
with open(os.path.join(scripts_folder, script_file)) as inference_file:
    print(inference_file.read())

# Creating pipeline

## Creating - Dataset to be used to output pipeline steps i.e. scored dataset

In [None]:
from azureml.pipeline.core import PipelineData

datastore = ws.get_default_datastore()
output_folder = PipelineData(name='inferences', datastore=datastore,output_path_on_compute="helen/results")




## Creating - Parallel run step i.e. to score large dataset in parallel

In [None]:
# env to run script
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies

predict_conda_deps = CondaDependencies.create(pip_packages=[ "scikit-learn==0.20.3" ])

predict_env = Environment(name="predict_environment")
predict_env.python.conda_dependencies = predict_conda_deps
predict_env.docker.enabled = True
predict_env.spark.precache_packages = False


In [None]:
from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_config = ParallelRunConfig(
                    source_directory=scripts_folder,
                    entry_script=script_file,  # the user script to run against each input
                    mini_batch_size='1GB',
                    error_threshold=5,
                    output_action='append_row',
                    environment=predict_env,
                    compute_target=compute_target, 
                    node_count=1,
                    process_count_per_node=1,
                    run_invocation_timeout=600,
                    #logging_level='DEBUG'
                    logging_level='INFO')
parallel_run_config

In [None]:
#run_config.source_directory_data_store ('workspaceblobstore')
#help (parallel_run_config)
parallel_run_config.logging_level


In [None]:
distributed_helen_step = ParallelRunStep(
    name='example-diabetes',
    inputs=[diabetes_data.as_named_input('diabetes_data')],
    output=output_folder,
    parallel_run_config=parallel_run_config,
    models=[model],
    arguments=['--model_name', 'helen_test'],
    #CORRECT allow_reuse=True
    allow_reuse=False
)

In [None]:
# run pipeline 
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[distributed_helen_step])

pipeline_run = Experiment(ws, 'simple_score_pipeline').submit(pipeline)

In [None]:
# this will output a table with link to the run details in azure portal
pipeline_run

In [None]:
# GUI
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show() 

In [None]:
#Console logs
pipeline_run.wait_for_completion(show_output=True)

## To see results

In [None]:
# VIEW RESULTS
import pandas as pd
import shutil

shutil.rmtree("iris_results", ignore_errors=True)

prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data("inferences")
prediction_output.download(local_path="iris_results")


for root, dirs, files in os.walk("iris_results"):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=" ", header=None)
df.columns = ['age', 'gender', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6','pred']
print("Prediction has ", df.shape[0], " rows")

random_subset = df.sample(n=20)
random_subset.head(20)

# Appendix: testing that run () works 

In [None]:
# scoring script
import io
import pickle
import argparse
import numpy as np

from azureml.core.model import Model
from sklearn.linear_model import LogisticRegression


def init():
    global iris_model

    parser = argparse.ArgumentParser(description="Iris model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    
    model_path = Model.get_model_path(args.model_name)
   
    with open(model_path, 'rb') as model_file:
        iris_model = pickle.load(model_file)


def run(input_data):
    # make inference
    print ('inp ' , input_data.shape)
    num_rows, num_cols = input_data.shape
    print ('inp ' , input_data.shape)
    pred = iris_model.predict(input_data).reshape((num_rows, 1))
    print ('out ', pred.shape)
    # cleanup output
    #result = input_data.drop(input_data.columns[9:], axis=1)
    result_numpy = np.append (input_data, pred, 1)
    
   
    # CORRECT AS LIST -----result=result_numpy.tolist()
    result=result_numpy

    
    return result



In [None]:
# downloading model for testing
# SCORING 
# Registered model - downlaoding it, in order to use it for scoring
from azureml.core import Workspace
from azureml.core.model import Model, Dataset
from sklearn.externals import joblib
import pandas as pd
import os
ws = Workspace.from_config()


modelname='helen_test'
model_file= "diabetes_helen.pkl"

output_dir='./helen/download'
os.makedirs (output_dir,exist_ok=True)

model = Model(ws, modelname, version=4)
model.download(target_dir=output_dir, exist_ok=True)
print (model)


model_file_name = os.path.join(output_dir, model_file)
os.stat(model_file_name)
iris_model = joblib.load(open(model_file_name, 'rb'))

#Accessing dataset which is already registered
# get dataset by dataset name
diabetes_data = Dataset.get_by_name(workspace=ws, name='diabetes_data')

df = diabetes_data.to_pandas_dataframe()

In [None]:
#testing that my run scripts works
df = diabetes_data.to_pandas_dataframe()
df_np=df.to_numpy()
print (df_np.shape)

helen_numpy=run (df_np)
print (type (helen_numpy))
print (helen_numpy[1:5,:])

helen_list=helen_numpy.tolist()
print (type(helen_list))

helen_pandas=pd.DataFrame(data=helen_numpy)
print (type(helen_pandas))
helen_pandas

## Appendix: Another way to write scoring script

In [None]:
%%writefile ./helen/script/helen_score.py
# Alternative: THIS DOES NOT WORK !!!!!!!!!!!!!!!!!!!!

import io
import pickle
import argparse
import numpy as np

from azureml.core.model import Model
from sklearn.linear_model import LogisticRegression



def init():

    global diabetes_model

    parser = argparse.ArgumentParser(description="Diabetes model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    args, unknown_args = parser.parse_known_args()

    model_path = Model.get_model_path(args.model_name)
   
    
    with open(model_path, 'rb') as model_file:
        diabetes_model = pickle.load(model_file)


def run(input_data):
    # make inference
    num_rows, num_cols = input_data.shape
    pred = diabetes_model.predict(input_data).reshape((num_rows, 1))

    # cleanup output
    #result = input_data.drop(input_data.columns[4:], axis=1)
    #result = input_data.drop(input_data.columns[9:], axis=1)
    #result['variety'] = pred
    result=pred
    
    return result