# Training Pipeline

We are creating a pipeline using ParallelRunStep to forecast orange juice sales. This notebook demonstrates how to create a pipeline that trains and registers 11,973 time-series models.

In [None]:
import os
import pandas as pd
import azureml.core
from azureml.core import Workspace, Experiment, Run
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.widgets import RunDetails
from azureml.core import Workspace, Experiment, Datastore
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.widgets import RunDetails
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
from azureml.core.dataset import Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core import Experiment
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE
from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig
import datetime

print("SDK version:", azureml.core.VERSION)

## Prerequisites

This example runs on an Azure Machine Learning Notebook VM. If you have already run the Environment Setup and Data Preparation notebooks you are all set.

## Set up workspace, datastore, experiment

In [None]:
# ws = Workspace(subscription_id="bbd86e7d-3602-4e6d-baa4-40ae2ad9303c", resource_group="ManyModelsSA", workspace_name="ManyModelsSAv1")

# Take a look at Workspace
ws.get_details()

# choose a datastore
dstore = ws.get_default_datastore()
scoring_output_dstore = Datastore(ws, 'scoring_output_datastore')
train_output_dstore = Datastore(ws, 'training_output_datastore')

# choose a experiment
experiment = Experiment(ws, 'automl-ojforecasting')
print(dstore.name, dstore.datastore_type, dstore.account_name, dstore.container_name)

## Read the registered dataset from Workspace

We use 11,973 datasets and ParallelRunStep to build 11,973 time-series ARIMA models to predict the quantity of each store brand.

Each dataset represents a brand's 2 years orange juice sales data that contains 7 columns and 122 rows. 

You will need to register the datasets in the Workspace first. The Data Preparation notebook will walk you through this.

In [None]:
FileDst10Models = Dataset.get_by_name(ws, name='10modelsfiledataset')
FileDst10ModelsInput = FileDst10Models.as_named_input('Train10Models')

In [None]:
FileDst1000Models = Dataset.get_by_name(ws, name='1000modelsdataset')
FileDst1000ModelsInput = FileDst1000Models.as_named_input('Train1000Models')

In [None]:
FileDstAllModels = Dataset.get_by_name(ws, name='AllDataProd')
FileDstAllModelsInputs = FileDstAllModels.as_named_input('TrainAllmodels')

## Set up environment  for ParallelRunStep

Environment defines a collection of resources that we will need to run our pipelines. We configure a reproducible Python environment for machine learning experiments. We are using 2 Python libraries, sklearn and pmdarima. 

An Environment defines Python packages, environment variables, and Docker settings that are used in machine learning experiments, including in data preparation, training, and deployment to a web service. An Environment is managed and versioned in an Azure Machine Learning Workspace. You can update an existing environment and retrieve a version to reuse. Environments are exclusive to the workspace they are created in and can't be used across different workspaces.

In [None]:
batch_conda_deps = CondaDependencies.create(pip_packages=['sklearn','pmdarima', 'azureml-pipeline-core'])
batch_env = Environment(name="manymodels_environment")
batch_env.python.conda_dependencies = batch_conda_deps
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_CPU_IMAGE

## Set up ParallelRunConfig

To train the models, you will need an entry script and a list of dependencies. The entry_script is a user script as a local file path that will be run in parallel on multiple nodes. If source_directly is present, use a relative path. Otherwise, use any path that's accessible on the machine.

The <b>entry script</b> accepts requests, tain and register the model, and then returns the results.

* <b>init()</b> - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. init method can make use of following environment variables (ParallelRunStep input): 

                    AZUREML_BI_OUTPUT_PATH – output folder path
        
* <b>run(mini_batch)</b> - The method to be parallelized. Each invocation will have one minibatch.
mini_batch: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in mini_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.

* <b>run method response</b>: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch. User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.

In the ParallelRunConfig, you will want to determine the number of workers and nodes appropriate for your use case. The workercount is based off the number of cores of the compute VM. The nodecount will determine the number of master nodes to use. In time-series ARIMA model scenario, increasing the node count will speed up the training process.


* <b>node_count</b>: The number of compute nodes to be used for running the user script. We recommend to start with 3 and increase the node_count if the training time is taking too long.

* <b>process_count_per_node</b>: Workercount - the number of processes per node. We are using a 8 cores computer cluster therefore we set it to 8.

* <b>compute_target</b>: Only AmlCompute is supported. You can change to a different compute cluster if one fails.

* <b>run_invocation_timeout</b>: The run() method invocation timeout in seconds. The timeout should be set to be higher than the maximum training time of one model (in seconds), by default it's 60. Since the model that takes the longest to train is about 120 seconds, we set it to be 500 which is greater than 120.   

* <b>entry_script</b>: The name of the training script.

* <b>source_directory</b>: Paths to folders that contain all files to execute on the compute target (optional).

* <b>environment</b>: The Python environment definition. You can configure it to use an existing Python environment or to set up a temporary environment for the experiment. The definition is also responsible for setting the required application dependencies (optional). 
    
* <b>mini_batch_size</b>: The size of the mini-batch passed to a single run() call (optional). 

    * For FileDataset, it's the number of files with a minimum value of 1. You can combine multiple files into one mini-batch. The default value is 1. In this orange juice sales example, we're using FileDataset and set mini_batch_size to be 1 because we're iterating through a list of FileDataset as our input data.

    * For TabularDataset, it's the size of data. Example values are 1024, 1024KB, 10MB, and 1GB. The recommended value is 1MB. The mini-batch from TabularDataset will never cross file boundaries. For example, if you have .csv files with various sizes, the smallest file is 100 KB and the largest is 10 MB. If you set mini_batch_size = 1MB, then files with a size smaller than 1 MB will be treated as one mini-batch. Files with a size larger than 1 MB will be split into multiple mini-batches. 

* <b>error_threshold</b>: The number of record failures for TabularDataset and file failures for FileDataset that should be ignored during processing. If the error count for the entire input goes above this value, the job will be stopped. The error threshold is for the entire input and not for individual mini-batches sent to the run() method. The range is [-1, int.max]. The -1 part indicates ignoring all failures during processing. You can customize the error threshold based on your fault tolerance. Here we set it to 10, meaning that if 10 or more jobs failed, the job will be stopped and canceled.

* <b>output_action</b>: One of the following values indicates how the output will be organized -
    * <b>summary_only</b>: The user script will store the output. ParallelRunStep will use the output only for the error threshold calculation. The parallel_run_step.txt will return 
    * <b>append_row</b>: For all input files, only one file will be created in the output folder to append all outputs separated by line. The file name will be parallel_run_step.txt. We set it to 'append_row' here because we collect the aggregated output file as our training log.
    
We also added tags to preserve the information about our training cluster's node count, process count per node and dataset name. You can find the 'Tags' column in Azure Machine Learning Studio.

[ParallelRunConfig Documentation](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.runconfiguration?view=azure-ml-py)

In [None]:
workercount=8
nodecount=5
timeout=500
compute = AmlCompute(ws, "train-many-model")

datasetname='AllStoresFileDatasets'

tags1={}
tags1['DatasetName']=datasetname
tags1['Nodes']=nodecount
tags1['WorkersPerNode']=workercount
tags1['Timeout']=timeout

parallel_run_config = ParallelRunConfig(
    source_directory='./scripts',
    entry_script='train.py',
    mini_batch_size="1",
    run_invocation_timeout=timeout,
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    process_count_per_node=workercount,
    compute_target=compute,
    node_count=nodecount)

## Set up ParallelRunStep

First, we set up the output directory and define the Pipeline's output name.

In [None]:
output_dir = PipelineData(name="AllARIMAModels", 
                          datastore=dstore, 
                          output_path_on_compute="AllARIMAModels/")

We created 4 input arguments that the user can adjust for the forecasting use case.

* <b>target_column</b>: The target column is the column name you'd like to predict on.
* <b>n_test_periods</b>: The n test periods is the number of periods you'd like to hold off for testing/scoring.
* <b>timestamp_column</b>: We set the timestamp column to be the index column for the ARIMA models to train on. 
* <b>stepwise_training</b>: Stepwise training can be set to 'True' or 'False'. 'False' will conduct a full grid search on each model when training hence will take longer to compelete. 'True' will perform stepwise training and the grid search will be stopped as soon as one of the thresholds are hit, and the best fit model at that time is returned. 'True' will speed up the training process dramatically.



We specify the following parameters:

* <b>name</b>: We set a name for our ParallelRunStep.

* <b>parallel_run_config</b>: We then pass the previously defined ParallelRunConfig.

* <b>inputs</b>: We are going to use the registered FileDataset that we called earlier in the Notebook. _inputs_ points to a registered file dataset in AML studio that points to a path in the blob container. The number of files in that path determines the number of models will be trained in the ParallelRunStep. 

* <b>output</b>: The output directory we just defined. A PipelineData object that corresponds to the output directory.

* <b>models</b>: Zero or more model names already registered in the Azure Machine Learning model registry.

* <b>allow_reuse</b>: Whether the step should reuse previous results when run with the same settings/inputs. If this parameter is False, a new run will always be generated for this step during pipeline execution. The default value is True.


[ParallelRunStep Documentation](https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallelrunstep?view=azure-ml-py)

In [None]:
parallelrun_step = ParallelRunStep(
    name="many-models-training",
    parallel_run_config=parallel_run_config,
    inputs=[FileDstAllModelsInputs],
    output=output_dir,
    models=[],
    arguments=['--target_column','Quantity', '--n_test_periods',6, '--timestamp_column','WeekStarting', '--stepwise_training',True],
    allow_reuse=False
)

## Set up RunConfiguration for PythonScriptStep

Run configuration represents configuration for experiment runs targeting different compute targets in Azure Machine Learning. The RunConfiguration object encapsulates the information necessary to submit a training run in an experiment. Here we define azureml-pipeline-core and Pandas packages.

In [None]:
conda_run_config = RunConfiguration(framework="python")
conda_run_config.target = compute
conda_run_config.environment.docker.enabled = True
conda_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
cd = CondaDependencies.create(pip_packages=['azureml-pipeline-core'], conda_packages=['pandas'])
conda_run_config.environment.python.conda_dependencies = cd

## Set up PythonScriptStep

We then set up a PythonScriptStep to process our log file.

We specify the following parameters:

* <b>name</b>: We set a name for our PythonScriptStep.

* <b>script_name</b>: The name of the log script.

* <b>compute_target</b>: Set AML compute target. 

* <b>runconfig</b>: The RunConfiguration defined during the previous step.

* <b>arguments</b>: The arguments you can specify based on your setup.


We created 5 input arguments that the user can adjust for the forecasting use case.

* <b>ParallelRunStep_name</b>: The ParallelRunStep name that you defined in the ParallelRunStep.
* <b>pipeline_output_name</b>: The ParallelRunStep output directory name. 
* <b>datastore</b>: The registered datastore name where you put your log file.
* <b>experiment</b>: The name of the experiment that ParallelRunStep is running on. 
* <b>overwrite_logs</b>: 'True' will overwrite the existing log files. 'False' will not overwrite.


In [None]:
from azureml.pipeline.steps import PythonScriptStep

log_python_script_step = PythonScriptStep(name="logging",
                        script_name="log.py",
                        compute_target=compute,
                        source_directory='./scripts',
                        runconfig=conda_run_config,
                        arguments=['--ParallelRunStep_name','many-models-training','--datastore', 'training_output_datastore', '--experiment', 'automl-ojforecasting', '--overwrite_logs', True, '--pipeline_output_name', 'AllARIMAModels'],
                        allow_reuse=False)

## Set up the step sequence

We set up a step sequence to make sure to execute ParallelRunStep and PythonScriptStep execute in the correct order. In this example, we want to run PythonScriptStep after ParallelRunStep.

In [None]:
from azureml.pipeline.core import StepSequence

all_steps = StepSequence(steps=[parallelrun_step, log_python_script_step])

## Submit the pipeline to run

Next we submit our pipeline to run. The whole training pipeline takes about 1h 16m using a Standard_D13_V2 VM with our current ParallelRunConfig setting.

In [None]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=all_steps)
run = experiment.submit(pipeline,tags=tags1)
RunDetails(run).show()

You can run the folowing command if you'd like to monitor the training process in jupyter notebook. It will stream logs live while training.

In [None]:
run.wait_for_completion(show_output=True)

Succesfully trained, registered and logged 11,973 ARIMA models. 

## Publish the pipeline

In [None]:
published_pipeline = pipeline.publish(name = 'train-many-models',
                                     description = 'train 11,973 models with log',
                                     version = '1',
                                     continue_on_step_failure = True
                                     )

## Train script

In [None]:
%%writefile ./scripts/train.py

from azureml.core.run import Run
import pandas as pd
import os
import uuid
import argparse
import datetime

from azureml.core.model import Model
from sklearn import metrics
import pickle
from azureml.core import Experiment, Workspace, Run
from azureml.core import ScriptRunConfig
from entry_script_helper import EntryScriptHelper
import logging

from sklearn.externals import joblib
from joblib import dump, load
import pmdarima as pm
import time
from datetime import timedelta

thisrun = Run.get_context()

LOG_NAME = "user_log"

print("Split the data into train and test")

parser = argparse.ArgumentParser("split")
parser.add_argument("--target_column", type=str, help="input target column")
parser.add_argument("--n_test_periods", type=int, help="input number of test periods")
parser.add_argument("--timestamp_column", type=str, help="input timestamp column")
parser.add_argument("--stepwise_training", type=str, help="input stepwise training True or False")

args, unknown = parser.parse_known_args()

print("Argument 1(n_test_periods): %s" % args.n_test_periods)
print("Argument 2(target_column): %s" % args.target_column)
print("Argument 3(timestamp_column): %s" % args.timestamp_column)
print("Argument 4(stepwise_training): %s" % args.stepwise_training)


def init():
    EntryScriptHelper().config(LOG_NAME)
    logger = logging.getLogger(LOG_NAME)
    output_folder = os.path.join(os.environ.get("AZ_BATCHAI_INPUT_AZUREML", ""), "temp/output")
    logger.info(f"{__file__}.output_folder:{output_folder}")
    logger.info("init()")
    return


def run(input_data):
    # 0. Set up logging
    logger = logging.getLogger(LOG_NAME)
    os.makedirs('./outputs', exist_ok=True)
    logger.info('processing all files')
    resultList = []

    # 1. Read in the data file
    for idx, csv_file_path in enumerate(input_data):       
        u1 = uuid.uuid4()
        mname='arima'+str(u1)[0:16]
        logs = []
        date1=datetime.datetime.now()
        logger.info('starting ('+csv_file_path+') ' + str(date1))
        thisrun.log(mname,'starttime-'+str(date1))
            
        data = pd.read_csv(csv_file_path,header=0)
        logger.info(data.head())

        # 2. Split the data into train and test sets based on dates
        data = data.set_index(args.timestamp_column)
        max_date = datetime.datetime.strptime(data.index.max(),'%Y-%m-%d')
        split_date = max_date - timedelta(days=7*args.n_test_periods)
        data.index = pd.to_datetime(data.index)
        train = data[data.index <= split_date]
        test = data[data.index > split_date]

        # 3.Train the model
        model = pm.auto_arima(train[args.target_column],
                  start_p=0,
                  start_q=0,
                  test='adf', #default stationarity test is kpps
                  max_p =3,
                  max_d = 2,
                  max_q=3,
                  m=3, #number of observations per seasonal cycle
                  #d=None,
                  seasonal=True,
                  #trend = None, # adjust this if the series have trend
                  #start_P=0,
                  #D=0,
                  information_criterion = 'aic',
                  trace=True, #prints status on the fits
                  #error_action='ignore',
                  stepwise = args.stepwise_training, # this increments instead of doing a grid search
                  suppress_warnings = True,
                  out_of_sample_size = 16
                 )
        model = model.fit(train[args.target_column])
        logger.info('done training')

        # 4. Save the model
        logger.info(model)
        logger.info(mname)
        with open(mname, 'wb') as file:
            joblib.dump(value=model, filename=os.path.join('./outputs/', mname))

        # 5. Register the model to the workspace
        ws1 = thisrun.experiment.workspace
        try:
            thisrun.upload_file(mname, os.path.join('./outputs/', mname))
        except:
            logger.info('dont need to upload')
        logger.info('register model, skip the outputs prefix')
        model_name = 'arima_'+str(input_data).split('/')[-1][:-6]
        print('Trained '+ model_name)
        
        thisrun.register_model(model_path=mname, model_name=model_name, model_framework='pmdarima',tags={'Store': str(csv_file_path).split('/')[-1][:-4].split('_')[0], 'Brand': str(csv_file_path).split('/')[-1][:-4].split('_')[1], 'ModelType':'ARIMA'}) 
        print('Registered '+ model_name)
        
        #6. Log some metrics       
        date2=datetime.datetime.now()
        logger.info('ending ('+str(csv_file_path)+') ' + str(date2))
        
        logs.append(str(csv_file_path).split('/')[-1][:-4].split('_')[0])
        logs.append(str(csv_file_path).split('/')[-1][:-4].split('_')[1])
        logs.append('ARIMA')
        logs.append(str(csv_file_path).split('/')[-1][:-4])
        logs.append(model_name)
        logs.append(str(date1))
        logs.append(str(date2))
        logs.append(str(date2-date1))
        logs.append(idx)
        logs.append(len(input_data))
        logs.append(thisrun.get_status())

        thisrun.log(mname,'endtime-'+str(date2))
        thisrun.log(mname,'auc-1')
        
    resultList.append(logs)
    return resultList

## Log Script

In [None]:
%%writefile ./scripts/log.py

import pandas as pd
from azureml.core.run import Run
from azureml.core import Workspace, Experiment, Datastore
import os
import datetime
from azureml.pipeline.core import PipelineRun
import argparse

# parse input arguments
parser = argparse.ArgumentParser("Logging arguments")

parser.add_argument("--ParallelRunStep_name", type=str, help="input ParralelRunStep name")
parser.add_argument("--datastore", type=str, help="input datastore name")
parser.add_argument("--experiment", type=str, help="input experiment name")
parser.add_argument("--overwrite_logs", type=str, help="input overwrite logs True or False")
parser.add_argument("--pipeline_output_name", type=str, help="input ParralelRunStep output name")

args, unknown = parser.parse_known_args()
print("Argument1 (ParallelRunStep_name): %s" % args.ParallelRunStep_name)
print("Argument2 (datastore): %s" % args.datastore)
print("Argument3 (experiment): %s" % args.experiment)
print("Argument4 (overwrite_logs): %s" % args.overwrite_logs)
print("Argument5 (pipeline_output_name): %s" % args.pipeline_output_name)

# set workspace and experiment 
thisrun = Run.get_context()
ws = thisrun.experiment.workspace
experiment = Experiment(ws, args.experiment)

# retrieve the log file
pipeline_runId = thisrun.get_details()['properties']['azureml.pipelinerunid']
pipeline_run = PipelineRun(experiment, pipeline_runId)
step_run = pipeline_run.find_step_run(args.ParallelRunStep_name)[0]
prediction_output = step_run.get_output_data(args.pipeline_output_name)
prediction_output.download(local_path="logs")
print('Downloaded the log file of Pipeline Id: '+pipeline_runId)

# check the log file path
for root, dirs, files in os.walk("logs"):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)
            print ('Log file path: ' + result_file)
            
# read the file and clean up data
df_log = pd.read_csv(result_file, converters={0: lambda x: x.strip("["),10: lambda x: x.strip("]")}, delimiter=",", header=None)
df_log.columns=['Store','Brand','ModelType','FileName','ModelName','StartTime','EndTime','Duration','Index','BatchSize','Status']
df_log['Store'] = df_log['Store'].apply(str).str.replace("'", '')
df_log['Brand'] = df_log['Brand'].apply(str).str.replace("'", '')
df_log['ModelType'] = df_log['ModelType'].apply(str).str.replace("'", '')
df_log['FileName'] = df_log['FileName'].apply(str).str.replace("'", '')
df_log['ModelName'] = df_log['ModelName'].apply(str).str.replace("'", '')
df_log['StartTime'] = df_log['StartTime'].apply(str).str.replace("'", '')
df_log['EndTime'] = df_log['EndTime'].apply(str).str.replace("'", '')
df_log['Duration'] = df_log['Duration'].apply(str).str.replace("'", '')
df_log['Status'] = df_log['Status'].apply(str).str.replace("'", '')
print (df_log.head())
print ('Read and cleaned the log file')

# save the log file
output_path = os.path.join('./logs/', 'training_log')
df_log.to_csv(path_or_buf=output_path + '.csv', index = False)
print('Saved the training_log.csv')

# upload the log file
log_dstore = Datastore(ws, args.datastore)
log_dstore.upload_files(['./logs/training_log'+'.csv'], target_path='training_log_'+str(datetime.datetime.now().date()), overwrite=args.overwrite_logs, show_progress=True)
print('Uploaded the training_log.csv')