Implemented on top of AzureML.
Reference: https://docs.microsoft.com/en-us/azure/machine-learning/

Pipelines are going to be created inside some Azure ML workspace.
If you already have one, you can get the reference:

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()

In case if you do not have, any you can either create it on Azure portal or programmatically:

In [None]:
from azureml.core import Workspace

ws = Workspace.create(name='ataymano-mlw3',
                      subscription_id='9a2d7383-3c7d-492c-94fc-ba65be408672', 
                      resource_group='cps-dev-ataymano',
                      create_resource_group=False,
                      location='eastus' 
                     )
ws.write_config()
ws.get_details()

This is a library of pipeline steps / primitives for creating batch training pipelines from logs produced by personalization service. Also, you can find some recipe pipelines which can be either used as-is or as a reference.

For creating pipeline application context is required:

In [None]:
import application
from application import context

ctx = context.Context(
    accountName = 'ataymanodev', # Azure storage account name
    accountKey = 'WxDBTcax+bxLNc6ECKWLKU0ZTj5ZKUV67ei5eiyi2dR2NlONZrh8jby9YpONf8sHH8kJGA9ZAz6FDR9CQOyd2g==', # Azure storage key
    appId = 'batchpresentation', # application name
    appFolder = '20190212000000' # application folder
)

Now you can create batch training pipeline for vw batch training:

In [None]:
import pipelines
from pipelines import vw_train

pipeline = vw_train.create_pipeline(ws, ctx)

Created pipeline can be either submitted for execution:

In [None]:
from azureml.core import Experiment

pipeline_run = Experiment(ws, 'VW_presentation').submit(
    pipeline,
    pipeline_params = {
        "start_datetime":'02/19/2019 00:00:00',
        "end_datetime":'02/19/2019 23:59:54'})

print("Train Pipeline is submitted for execution")

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

Or published to be available via REST endpoint:

In [None]:
published_pipeline = pipeline.publish(name="VW_train_demo_1", description="Example vw train pipeline")
print(published_pipeline.id)
print(published_pipeline.endpoint)

Example of REST call:
Reference for better understanding of Azure ML authentication:
https://github.com/Azure/MachineLearningNotebooks/blob/f16bf27e26d6ff7a2cf76e8cbcda4ccc9c878a0c/how-to-use-azureml/manage-azureml-service/authentication-in-azureml/authentication-in-azure-ml.ipynb

In [None]:

from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

response = requests.post('https://eastus2.aether.ms/api/v1.0/subscriptions/9a2d7383-3c7d-492c-94fc-ba65be408672/resourceGroups/cps-dev-ataymano/providers/Microsoft.MachineLearningServices/workspaces/ataymano-mlw/PipelineRuns/PipelineSubmit/3b89b658-f571-4354-8c02-f521148af0eb', 
    headers=aad_token, 
    json={"ExperimentName": "VW_train_demo_1",
        "ParameterAssignments": {"start_datetime":'02/19/2019 00:00:00',"end_datetime":'02/19/2019 23:59:59'}})

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

Hyperparameter sweep

In [None]:
from azureml.core import Workspace
from application import context
from pipelines import iris_vw_sweep
ws = Workspace.from_config()
ctx_small = context.Context(
    accountName = 'ataymanodev',
    accountKey = 'WxDBTcax+bxLNc6ECKWLKU0ZTj5ZKUV67ei5eiyi2dR2NlONZrh8jby9YpONf8sHH8kJGA9ZAz6FDR9CQOyd2g==',
    appId = 'iris',
    appFolder = 'folder')
pipeline_sweep = iris_vw_sweep.create_pipeline(ws, ctx_small, parallel_jobs = 100)

In [None]:
from azureml.core import Workspace
from application import context
from pipelines import vw_sweep
ws = Workspace.from_config()
ctx_small = context.Context(
    accountName = 'deliveryengine',
    accountKey = 'kgKgenBIeaEmJCPn0eF+CzTEiKjU319y9iICReMj9OEZTHHpX6gLRoPxj1f7j3e6XVa3HbtyiJHaGV0Vrtp8Rw==',
    appId = 'iris1-prod',
    appFolder = '20181101180623')
pipeline_sweep = vw_sweep.create_pipeline(ws, ctx_small, parallel_jobs = 100)

In [None]:
from azureml.core import Workspace
from application import context
from pipelines import vw_sweep
ws = Workspace.from_config()
ctx_small = context.Context(
    accountName = 'cpsdevtesting',
    accountKey = 'fClbjXXTI1tobGIH6t+EvWpPm3GSqq6U4CzXc7rS6UDCLw2DIVNYtp9u9Wd+IZkTyx9AYmsOQ9oMDYBktX2bIg==',
    appId = 'tutorial-003-sl',
    appFolder = '20181019175455')
pipeline_sweep = vw_sweep.create_pipeline(ws, ctx_small, parallel_jobs = 100)

In [1]:
from azureml.core import Workspace
from application import context
from pipelines import sweep_clean
ws = Workspace.from_config()
pipeline_sweep = sweep_clean.create_pipeline(ws, parallel_jobs = 100)

Found the config file in: C:\Users\ataymano\source\reinforcement_learning\batch\aml_config\config.json
Found existing cluster: python
Best command selection step is created successfully


AttributeError: 'PipelineParameter' object has no attribute 'replace'

In [None]:
from azureml.core import Experiment  
regen_outputs = False
sweep_pipeline_run = Experiment(ws, 'tutorial-demo-012').submit(pipeline_sweep, 
                                                                     pipeline_params={
                                                                            "start_datetime":'10/12/2018 00:00:00',
                                                                            "end_datetime":'10/12/2018 23:59:59',
                                                                            "base_command": '--cb_adf --dsjson',
                                                                          #  "extra_1": '--cb_explore_adf --epsilon 0.2 --dsjson --cb_type mtr --power_t 0.5 -l 0.001  --marginal A -q PA',
                                                                          #  "extra_2": '--cb_explore_adf --epsilon 0.2 --dsjson --cb_type mtr --l1 1E-06 --power_t 0.5 -l 0.001 -q PA'
                                                                       }, regenerate_outputs=regen_outputs)
print("Sweep Pipeline is submitted for execution", )
from azureml.widgets import RunDetails
RunDetails(sweep_pipeline_run).show()

In [None]:
sweep_pipeline_run.cancel()

In [None]:
published_sweep_pipeline = pipeline_sweep.publish(name="Tutorial_Demo_05", description="Hyper parameter tuning pipeline")
print(published_sweep_pipeline.id)
print(published_sweep_pipeline.endpoint)

In [None]:

from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

#/pipelines/subscriptions/9a2d7383-3c7d-492c-94fc-ba65be408672/resourceGroups/cps-dev-ataymano/providers/Microsoft.MachineLearningServices/workspaces/ataymano-mlw3/experiment/undefined/run/264d80ae-efe4-46ee-98f0-a48daef6e703?accessedAt=1551131966105&clientOptimizations=&l=en.en-us&trustedAuthority=https%3A%2F%2Fms.portal.azure.com&shellVersion=undefined
#endpoint = published_sweep_pipeline.endpoint
endpoint = "https://eastus.aether.ms/api/v1.0/subscriptions/9a2d7383-3c7d-492c-94fc-ba65be408672/resourceGroups/cps-dev-ataymano/providers/Microsoft.MachineLearningServices/workspaces/ataymano-mlw3/PipelineRuns/PipelineSubmit/eed1411e-968f-40d3-9688-0735f81ebac4"
response = requests.post(endpoint, 
    headers=aad_token, 
    json={"ExperimentName": "Iris_demo_tags_1",
        "ParameterAssignments": 
          {
                                                                            "start_datetime":'11/8/2018 00:00:00',
                                                                            "end_datetime":'11/20/2018 23:59:59',
                                                                            "base_command": '--cb_adf --dsjson',
                                                                            "extra_1": '--cb_explore_adf --epsilon 0.2 --dsjson --cb_type mtr --power_t 0.5 -l 0.001  --marginal A -q PA',
                                                                            "extra_2": '--cb_explore_adf --epsilon 0.2 --dsjson --cb_type mtr --l1 1E-06 --power_t 0.5 -l 0.001 -q PA'
#            "start_datetime":'11/08/2018 00:00:00',
#            "end_datetime":'11/20/2018 23:59:59',
#            "base_command": '--cb_adf --dsjson'
          }})
response

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
auth.get_authentication_header()

In [None]:
from azureml.core import Experiment
regen_outputs = False
sweep_pipeline_run = Experiment(ws, 'vw_cache_sweep_e2e_middle_1').submit(pipeline_sweep, 
                                                                       pipeline_params={
                                                                           "start_datetime":'10/28/2018 00:00:00',
                                                                           "end_datetime":'10/29/2018 23:59:55',
                                                                           "base_command": '--cb_explore_adf --dsjson'
                                                                       }, regenerate_outputs=regen_outputs)
print("Train Pipeline is submitted for execution", )

In [None]:
from azureml.widgets import RunDetails
RunDetails(sweep_pipeline_run).show()

In [None]:
sweep_pipeline_run.cancel()


In [None]:
from azureml.core import Workspace
from application import context
from pipelines import vw_sweep
ws = Workspace.from_config()
ctx_large = context.Context(
    accountName = 'ataymanodev',
    accountKey = 'WxDBTcax+bxLNc6ECKWLKU0ZTj5ZKUV67ei5eiyi2dR2NlONZrh8jby9YpONf8sHH8kJGA9ZAz6FDR9CQOyd2g==',
    appId = 'large',
    appFolder = 'folder')
pipeline_sweep_large = vw_sweep.create_pipeline(ws, ctx_large, power_t_range = '1e-9:2e-9:2e-9', parallel_jobs = 100)
from azureml.core import Experiment
sweep_pipeline_run = Experiment(ws, 'vw_cache_sweep_large').submit(pipeline_sweep_large, pipeline_params={"start_datetime":'10/29/2018 00:00:00',"end_datetime":'10/29/2018 23:59:55'})
print("Train Pipeline is submitted for execution")

In [None]:
from azureml.core import Workspace
from application import context
from pipelines import vw_sweep
ws = Workspace.from_config()
ctx_large = context.Context(
    accountName = 'ataymanodev',
    accountKey = 'WxDBTcax+bxLNc6ECKWLKU0ZTj5ZKUV67ei5eiyi2dR2NlONZrh8jby9YpONf8sHH8kJGA9ZAz6FDR9CQOyd2g==',
    appId = 'large',
    appFolder = 'folder')
pipeline_sweep_large = vw_sweep.create_pipeline(ws, ctx_large, power_t_range = '1e-9:2e-9:5e-10', parallel_jobs = 100)
from azureml.core import Experiment
sweep_pipeline_run = Experiment(ws, 'vw_cache_sweep_large_whole_file').submit(pipeline_sweep_large, pipeline_params={"start_datetime":'10/28/2018 00:00:00',"end_datetime":'10/30/2018 23:59:55'})
print("Train Pipeline is submitted for execution")

In [None]:
from azureml.widgets import RunDetails
RunDetails(sweep_pipeline_run).show()

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()

In [None]:
auth


In [None]:
auth.