In [1]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.19.0


In [2]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Datastore, Environment
from azureml.widgets import RunDetails
from azureml.core.authentication import InteractiveLoginAuthentication

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)
interactive_auth = InteractiveLoginAuthentication(tenant_id="dadbf9da-3f3b-44a8-8097-f3512ff34da8")
ws = Workspace.from_config(auth=interactive_auth)
# ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

# def_blob_store = ws.get_default_datastore()
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

SDK version: 1.19.0
Workspace name: fin-ws-wus2
Azure region: westus2
Subscription id: 63a4bc7f-cd60-49a3-b139-49202d485eac
Resource group: fin-research
Blobstore's name: workspaceblobstore


In [3]:
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core import (
    RunConfiguration,
    ScriptRunConfig,
    Experiment,
    Environment,
    Dataset,
    Datastore,
    Workspace,
)

print("Pipeline SDK-specific imports completed")


Pipeline SDK-specific imports completed


In [4]:
gpu_cluster_name = "gpu-cluster"

try:
    gpu_compute_target = ComputeTarget(workspace=ws, name=gpu_cluster_name)
    print('Found existing compute target')
except ComputeTargetException:
    print('Creating a new compute target...')
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_NC6', 
                                                           max_nodes=1)

    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)

    gpu_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Found existing compute target


In [5]:
# cpu_cluster_name = "cpu-cluster2"
cpu_cluster_name = "CPU-D13V2"

try:
    cpu_compute_target = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing compute target')
except ComputeTargetException:
    print('Creating a new compute target...')
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D11_V2', 
                                                           max_nodes=1)

    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)

    cpu_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Found existing compute target


In [6]:
from azureml.core.dataset import Dataset
def_data_store = ws.get_default_datastore()

input_data_dataset = Dataset.File.from_files((def_data_store, 'datasets/data_mp_1000_0.0005'), validate=True)

# Setup environments

In [7]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE

data_process_conda_deps = CondaDependencies.create(pip_packages=["pandas", 
                                                          "azureml-core", "azureml-dataset-runtime[fuse]", "tqdm", "sklearn"])
cpu_env = Environment(name="cpu_environment")
cpu_env.python.conda_dependencies = data_process_conda_deps
cpu_env.docker.enabled = True
cpu_env.docker.base_image = DEFAULT_CPU_IMAGE

In [8]:
tf_keras_env = Environment.from_conda_specification(name='tensorflow-keras-2.0-gpu', file_path='../curated_env_packages/conda_dependencies.yml')
tf_keras_env.docker.enabled = True
tf_keras_env.docker.base_image = 'mcr.microsoft.com/azureml/openmpi3.1.2-cuda10.0-cudnn7-ubuntu18.04'
# from azureml.core.runconfig import DEFAULT_GPU_IMAGE
# tf_keras_env.docker.base_image = DEFAULT_GPU_IMAGE

# Pipeline Paramters

In [9]:
pt_level = 0.0005
vol_tick = 1000

In [10]:
pt_sl_level_param = PipelineParameter(name="pt_sl_level", default_value=0.0005)
vol_tick_param = PipelineParameter(name="vol_tick", default_value=1000)


### Intermediate/Output Data
Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.


In [21]:
from azureml.data import OutputFileDatasetConfig

output_path = "datasets/trained_models"
# output_data_ref = OutputFileDatasetConfig(
#     destination=(def_data_store, output_path), name="model_out"
# )

bars_dataset = Dataset.File.from_files(
    (def_data_store, "datasets/labels_mp_0.0005_1000/"), validate=True
)
training_input_data = OutputFileDatasetConfig(name=f"training_input_data_{pt_level}_{vol_tick}".replace('.','_')).register_on_complete(f"training_input_data_{pt_level}_{vol_tick}".replace('.','_'))
testing_input_data = OutputFileDatasetConfig(name=f"testing_input_data_{pt_level}_{vol_tick}".replace('.','_')).register_on_complete(f"testing_input_data_{pt_level}_{vol_tick}".replace('.','_'))
output_model = OutputFileDatasetConfig(name=f"lstm_model_{pt_level}_{vol_tick}".replace('.','_')).register_on_complete(f"lstm_model_{pt_level}_{vol_tick}".replace('.','_'))
# PipelineData(f"training_input_data_{pt_level}_{vol_tick}".replace('.','_'), datastore=def_blob_store).as_dataset()

Class OutputFileDatasetConfig: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.
Class OutputDatasetConfig: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.
Class RegistrationConfiguration: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.
Class OutputFileDatasetConfig: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.
Class OutputDatasetConfig: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.
Class RegistrationConfiguration: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.
Class OutputFileDatasetConfig: This is an experimental class, and ma

# Process/Train Pipline 

## Feature Extraction

In [39]:
args = ['--sample_days',-1,
       '--input_data', bars_dataset.as_mount(),
       '--output_train_data', training_input_data,
       '--output_test_data', testing_input_data]
script_folder = '../modules/feature_extraction/'

run_config = RunConfiguration()
run_config.environment = cpu_env

feature_extraction_step = PythonScriptStep(
    name = 'Feature Extraction',
    script_name="run.py", 
    arguments=args,
    inputs=[bars_dataset.as_mount()],
    outputs=[training_input_data,testing_input_data],
    compute_target=cpu_compute_target, 
    source_directory=script_folder,
    runconfig=run_config
)

## Training

In [40]:
from azureml.core import ScriptRunConfig

# args = ['--data-folder', dataset.as_mount(),
#         '--batch-size', 64,
#         '--first-layer-neurons', 256,
#         '--second-layer-neurons', 128,
#         '--learning-rate', 0.01]
train_args = ['--sample_data_size', -1,
       '--input_data', training_input_data,    
       '--output_model', output_model,
       "--epochs", 20]
script_folder = '../modules/train'
gpu_run_config = RunConfiguration()
gpu_run_config.environment = tf_keras_env

train_step = PythonScriptStep(
    name = 'Train',
    script_name="run.py", 
    arguments=train_args,
    inputs=[training_input_data],
    outputs=[output_model],
    compute_target=gpu_compute_target, 
    source_directory=script_folder,
    runconfig=gpu_run_config
)



## Pipeline Definition

In [41]:
pipeline_1 = Pipeline(workspace=ws, steps=[feature_extraction_step, train_step])
print ("Pipeline is built")

Pipeline is built


In [42]:
pipeline_run1 = Experiment(ws, 'fin_pipeline').submit(pipeline_1)
print("Pipeline is submitted for execution")

Created step Feature Extraction [b3f56521][701b6ed5-5280-4962-9b87-b1089f09a71a], (This step will run and generate new outputs)
Created step Train [93704eae][d6748c9f-55f8-4271-848e-74c4e443c712], (This step will run and generate new outputs)
Submitted PipelineRun 21e27a7f-bfe5-4cbb-8ca4-1beb7a6705a5
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/fin_pipeline/runs/21e27a7f-bfe5-4cbb-8ca4-1beb7a6705a5?wsid=/subscriptions/63a4bc7f-cd60-49a3-b139-49202d485eac/resourcegroups/fin-research/workspaces/fin-ws-wus2
Pipeline is submitted for execution
