Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Forecasting Pipeline - Automated ML

---

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/automated-machine-learning/manymodels/03_Forecasting/03_Forecasting_Pipeline.png)

In this notebook we create a pipeline for doing batch forecasting of 11,973 AutoML models. The training and scoring of these models was completed in the Training notebook in this repository. We will set up the Pipeline for forecasting given the desired forecasting horizon. We utitlize the AutoMLPipelineBuilder to parallelize the process. For more information about the Data and Models refer to the Data Preparation and Training Notebooks. 

The pipeline set up is similar to the Training Pipeline in this repository. For more details on the steps and functions refer to the Training folder. 

### Prerequisites 
At this point, you should have already:

1. Created your AML Workspace using the [00_Setup_AML_Workspace notebook](../../00_Setup_AML_Workspace.ipynb)
2. Run [01_Data_Preparation.ipynb](../../01_Data_Preparation.ipynb) to create the dataset
3. Run [02_AutoML_Training_Pipeline.ipynb](../02_AutoML_Training_Pipeline/02_AutoML_Training_Pipeline.ipynb) to train the models

Install the azureml.contrib.automl.pipeline.steps package

In [None]:
# !pip install azureml-contrib-automl-pipeline-steps

## 1.0 Call the Workspace, Datastore, and Compute

As we did in the Training Pipeline notebook, we need to call the Workspace. We also want to create variables for the datastore and compute cluster. 

### Connect to the workspace


In [1]:
import azureml.core
from azureml.core import Workspace, Datastore
import pandas as pd

# set up workspace
ws= Workspace.from_config(path = '../../config.json') 

# Take a look at Workspace
ws.get_details()

# set up datastores
dstore = ws.get_default_datastore()

output = {}
output['SDK version'] = azureml.core.VERSION
output['Subscription ID'] = ws.subscription_id
output['Workspace'] = ws.name
output['Resource Group'] = ws.resource_group
output['Location'] = ws.location
output['Default datastore name'] = dstore.name
pd.set_option('display.max_colwidth', -1)
outputDf = pd.DataFrame(data = output, index = [''])
outputDf.T

  pd.set_option('display.max_colwidth', -1)


Unnamed: 0,Unnamed: 1
SDK version,1.51.0
Subscription ID,0c19fc19-85fd-4aa4-b133-61dd20fa93df
Workspace,auotml-example-workspace
Resource Group,edwin.spartan117-rg
Location,southeastasia
Default datastore name,workspaceblobstore


### Attach existing compute resource


In [2]:
from azureml.core.compute import AmlCompute, ComputeTarget

# Choose a name for your cluster.
amlcompute_cluster_name = "automl-cluster"

found = False
# Check if this compute target already exists in the workspace.
cts = ws.compute_targets
if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':
    found = True
    print('Found existing compute target.')
    compute = cts[amlcompute_cluster_name]
    
if not found:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D16S_V3',
                                                           min_nodes=2,
                                                           max_nodes=20)
    # Create the cluster.
    compute = ComputeTarget.create(ws, amlcompute_cluster_name, provisioning_config)
    
print('Checking cluster status...')
# Can poll for a minimum number of nodes and for a specific timeout.
# If no min_node_count is provided, it will use the scale settings for the cluster.
compute.wait_for_completion(show_output = True, min_node_count = None, timeout_in_minutes = 20)
    
# For a more detailed view of current AmlCompute status, use get_status().

Found existing compute target.
Checking cluster status...
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


### Set up an Experiment

In [3]:
from azureml.core import Experiment

experiment = Experiment(ws, 'MMSA-Forecasting-Pipeline')

## 2.0 Call Registered FileDataset
In the Data Preparation notebook, we registered the orange juice inference data to the Workspace. You can choose to run the pipeline on the subet of 10 series or the full dataset of 11,973 series. We recommend starting with 10 series then expanding. 

In [5]:
from azureml.core.dataset import Dataset

filedst_10_models = Dataset.get_by_name(ws, name = 'Orange Juice Sales (Simulated) Subset - Inference')
filedst_10_models_input = filedst_10_models.as_named_input('forecast_10_models')
 
#filedst_all_models = Dataset.get_by_name(ws, name='oj_data_inference')
#filedst_all_models_input = filedst_all_models.as_named_input('forecast_all_models')

## 3.0 Build forecasting pipeline
Now that the data, models, and compute resources are set up, we can put together a pipeline for forecasting. 
### Set up the environment to run the script
Specify the conda dependencies for your script. This will allow us to install packages and configure the environment.

In [6]:
training_experiment_name = 'MMSA-Training-Pipeline'
training_pipeline_run_id = "d6c5edbc-f851-4882-a936-5df59b9ddaa7"

### Create the configuration to wrap the entry script 
AutoMLPipelineBuilder is used to build the inference step for many models. You will need to determine the number of workers and nodes appropriate for your use case. The process_count_per_node is based off the number of cores of the compute VM. The node_count will determine the number of master nodes to use, increasing the node count will speed up the training process.

* <b>experiment</b>: Current experiment.

* <b>inference_data</b>: Inference dataset.

* <b>compute_target</b>: Compute target for inference.

* <b>node_count</b>: The number of compute nodes to be used for running the user script. We recommend to start with 3 and increase the node_count if the training time is taking too long.

* <b>process_count_per_node</b>: The number of processes per node.

* <b>run_invocation_timeout</b>: The run() method invocation timeout in seconds. The timeout should be set to maximum training time of one AutoML run(with some buffer), by default it's 60 seconds.

* <b>output_datastore</b>: Output datastore to output the inference results.

* <b>train_experiment_name</b>: Training experiment name where many models were trained.

* <b>train_run_id</b>: Training run id where many models were trained.

* <b>partition_column_names</b>: Partition column names.

* <b>time_column_name(Optional)</b>: Time column name if it is timeseries

* <b>target_column_name(Optional)</b>: Target column name if the inference dataset has the target column

<span style="color:red"><b>NOTE: There are limits on how many runs we can do in parallel per workspace, and we currently recommend to set the parallelism to maximum of 320 runs per experiment per workspace. If users want to have more parallelism and increase this limit they might encounter Too Many Requests errors (HTTP 429). </b></span>

In [7]:
from azureml.contrib.automl.pipeline.steps import AutoMLPipelineBuilder

partition_column_names = ['Store', 'Brand']

inference_steps = AutoMLPipelineBuilder.get_many_models_batch_inference_steps(experiment = experiment,
                                                                              inference_data = filedst_10_models_input,
                                                                              compute_target = compute,
                                                                              node_count = 1,
                                                                              process_count_per_node = 4,
                                                                              run_invocation_timeout=300,
                                                                              output_datastore = dstore,
                                                                              train_experiment_name = training_experiment_name,
                                                                              train_run_id = training_pipeline_run_id,
                                                                              partition_column_names = partition_column_names,
                                                                              time_column_name = "WeekStarting",
                                                                              target_column_name = "Quantity")

Parameter target_column_names will be deprecated in the future. Please use ManyModelsParameters instead.
Parameter time_column_name will be deprecated in the future. Please use ManyModelsParameters instead.
Parameter partition_column_names will be deprecated in the future. Please use ManyModelsParameters instead.
Output in the txt file does not include column header, use 'csv' file extension in 'append_row_file_name' parameter in 'get_many_models_batch_inference_steps' method to get column header in the output file.


## 4.0 Run the forecast pipeline
We can use the Experiment we created to track the runs of the pipeline and review the output.

In [8]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace = ws, steps = inference_steps)
run = experiment.submit(pipeline)

Created step many-models-inference [7e006a13][569c3a3c-cb77-445e-bf6f-86a43250ac11], (This step will run and generate new outputs)
Submitted PipelineRun 43771824-6025-4783-8086-a4b096f8bd2d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/43771824-6025-4783-8086-a4b096f8bd2d?wsid=/subscriptions/0c19fc19-85fd-4aa4-b133-61dd20fa93df/resourcegroups/edwin.spartan117-rg/workspaces/auotml-example-workspace&tid=c5f4b1c2-b533-4788-b1c5-99d0f10fb9b6


You can run the folowing command if you'd like to monitor the forecasting process in jupyter notebook. It will stream logs live while forecasting. 

**Note**: this command may not work for Notebook VM, however it should work on your local laptop.

In [9]:
run.wait_for_completion(show_output = True)

PipelineRunId: 43771824-6025-4783-8086-a4b096f8bd2d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/43771824-6025-4783-8086-a4b096f8bd2d?wsid=/subscriptions/0c19fc19-85fd-4aa4-b133-61dd20fa93df/resourcegroups/edwin.spartan117-rg/workspaces/auotml-example-workspace&tid=c5f4b1c2-b533-4788-b1c5-99d0f10fb9b6
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 595e6f95-c95d-475e-9363-d2673b88139d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/595e6f95-c95d-475e-9363-d2673b88139d?wsid=/subscriptions/0c19fc19-85fd-4aa4-b133-61dd20fa93df/resourcegroups/edwin.spartan117-rg/workspaces/auotml-example-workspace&tid=c5f4b1c2-b533-4788-b1c5-99d0f10fb9b6
StepRun( many-models-inference ) Status: NotStarted
StepRun( many-models-inference ) Status: Running

StepRun(many-models-inference) Execution Summary
StepRun( many-models-inference ) Status: Finished
{'runId': '595e6f95-c95d-475e-9363-d2673b88139d', 'target': 'automl-cluster', 'status': 'Complet



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '43771824-6025-4783-8086-a4b096f8bd2d', 'status': 'Completed', 'startTimeUtc': '2023-11-17T18:03:04.65228Z', 'endTimeUtc': '2023-11-17T18:07:14.836746Z', 'services': {}, 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}', 'azureml.continue_on_step_failure': 'False', 'azureml.continue_on_failed_optional_input': 'True', 'azureml.pipelineComponent': 'pipelinerun', 'azureml.pipelines.stages': '{"Initialization":null,"Execution":{"StartTime":"2023-11-17T18:03:05.1118187+00:00","EndTime":"2023-11-17T18:07:14.7180816+00:00","Status":"Finished"}}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://auotmlexamplew5880114168.blob.core.windows.net/azureml/ExperimentRun/dcid.43771824-6025-4783-8086-a4b096f8bd2d/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=Ggabce2uDfd8jC7tMkTjw9DYSLzo3eJ31Ivx%2FjxE2l

'Finished'

Succesfully forecasted on AutoML Models.

## 5.0 Pipeline Outputs
The forecasting pipeline forecasts the orange juice quantity for a Store by Brand. The pipeline returns one file with the predictions for each store and outputs the result to the forecasting_output Blob container. The details of the blob container is listed in 'forecasting_output.txt' under Outputs+logs. 

The following code snippet:
1. Downloads the contents of the output folder that is passed in the parallel run step 
2. Reads the parallel_run_step.txt file that has the predictions as pandas dataframe and 
3. Displays the top 10 rows of the predictions

In [10]:
import pandas as pd
import shutil
import os
import sys 
from scripts.helper import get_forecasting_output

forecasting_results_name = "forecasting_results"
forecasting_output_name = "many_models_inference_output"

forecast_file = get_forecasting_output(run, forecasting_results_name, forecasting_output_name)
df = pd.read_csv(forecast_file, delimiter = " ", header = None)
df.columns = ["Week Starting", "Store", "Brand", "Quantity",  "Advert", "Price" , "Revenue", "Predicted" ]
print("Prediction has ", df.shape[0], " rows. Here the first 10 rows are being displayed.")
df.head(10)

Prediction has  190  rows. Here the first 10 rows are being displayed.


Unnamed: 0,Week Starting,Store,Brand,Quantity,Advert,Price,Revenue,Predicted
0,1992-05-28,1000,dominicks,9130,1,2.4,21912.0,9084.28
1,1992-06-04,1000,dominicks,11225,1,2.47,27725.75,11164.66
2,1992-06-11,1000,dominicks,14025,1,2.53,35483.25,14212.47
3,1992-06-18,1000,dominicks,16433,1,2.44,40096.52,16621.67
4,1992-06-25,1000,dominicks,16962,1,1.93,32736.66,16616.88
5,1992-07-02,1000,dominicks,10222,1,2.64,26986.08,9663.17
6,1992-07-09,1000,dominicks,18117,1,2.06,37321.02,17738.26
7,1992-07-16,1000,dominicks,11754,1,2.53,29737.62,11516.38
8,1992-07-23,1000,dominicks,13080,1,2.02,26421.6,13146.02
9,1992-07-30,1000,dominicks,19628,1,2.17,42592.76,19250.35


## 6.0 Publish and schedule the pipeline (Optional)

### 6.1 Publish the pipeline

Once you have a pipeline you're happy with, you can publish a pipeline so you can call it programmatically later on. See this [tutorial](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-create-your-first-pipeline#publish-a-pipeline) for additional information on publishing and calling pipelines.

In [11]:
published_pipeline = pipeline.publish(name = 'automl_forecast_many_models', description = 'forecast many models',
                                      version = '1', continue_on_step_failure = False)

### 6.2 Schedule the pipeline
You can also [schedule the pipeline](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-schedule-pipelines) to run on a time-based or change-based schedule. This could be used to automatically retrain or forecast models every month or based on another trigger such as data drift.

In [None]:
# from azureml.pipeline.core import Schedule, ScheduleRecurrence
    
# forecasting_pipeline_id = published_pipeline.id

# recurrence = ScheduleRecurrence(frequency="Month", interval=1, start_time="2020-01-01T09:00:00")
# recurring_schedule = Schedule.create(ws, name="automl_forecasting_recurring_schedule", 
#                             description="Schedule Forecasting Pipeline to run on the first day of every week",
#                             pipeline_id=forecasting_pipeline_id, 
#                             experiment_name=experiment.name, 
#                             recurrence=recurrence)