In [1]:
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter
from azureml.pipeline.steps import EstimatorStep, PythonScriptStep, MpiStep, DatabricksStep
from azureml.core.databricks import PyPiLibrary

ws = Workspace.from_config()

In [2]:
NUM_NODES = 2
LABEL_COLUMN_NAME = 'i_year1_renewal_flag'

In [3]:
training_cluster = ws.compute_targets['amlcluster']
databricks_cluster = ws.compute_targets['databricks']

training_data = ws.datasets['renewal_train_csv']
validation_data = ws.datasets['renewal_test_csv']

lgbm_env = ws.environments['lightgbm-cli']

In [4]:
lgbm_env.get_image_details(ws)['dockerImage']['name']

'azureml/azureml_530807073dcffa9aa4f21724cf8481da'

### Training Data Processing

In [5]:
# Create Pipeline Data to pass data from Coalesce -> Renaming steps
db_saved_training_data = PipelineData('spark_csv_training', 
                                      datastore=ws.get_default_datastore(), 
                                      output_name='output_path')

# Create Pipeline Data to pass data from Renaming Step -> LightGBM Step
renamed_csv_train_data = PipelineData('renamed_csv_training', 
                                      datastore=ws.get_default_datastore(), 
                                      output_name='train_csv',
                                      is_directory=True)

In [6]:
#####################################
#          COALESCE FILES           #
#####################################
# Spark is used to load any number  #
# of CSV files and write them to    #
# a specified number - passed as    #
# '--number-of-files' parameter     #
#####################################

db_train_script_params = ['--number-of-files', str(NUM_NODES)]

train_databricks_step = DatabricksStep(name='split_training_data', 
                                       spark_version='5.5.x-scala2.11',
                                       inputs=[training_data.as_named_input('input_path').as_mount()], 
                                       outputs=[db_saved_training_data],
                                       source_directory='../code/data_splitter',
                                       python_script_params=db_train_script_params,
                                       python_script_name='splitter.py',
                                       num_workers=NUM_NODES,
                                       node_type='Standard_DS4_v2',
                                       compute_target=databricks_cluster,
                                       pypi_libraries=[PyPiLibrary('click')],
                                       allow_reuse=True)

In [7]:
#####################################
#          RENAMING FILES           #
#####################################
# Files are renamed from generic    #
# Spark name to 'valid_[i].csv'     #
# Where i will be used to choose    #
# which MPI Rank will load the file #
#####################################

train_copy_files_params = ['--input_path', db_saved_training_data,
                           '--output_path', renamed_csv_train_data,
                           '--file_prefix', "train"]

copy_train_files = PythonScriptStep(script_name='rename.py',
                                    name='rename_training_data',
                                    source_directory='../code/data_renamer',
                                    inputs=[db_saved_training_data],
                                    outputs=[renamed_csv_train_data],
                                    compute_target=training_cluster,
                                    arguments=train_copy_files_params,
                                    allow_reuse=True)

### Validation Data Processing

In [8]:
# Create Pipeline Data to pass data from Coalesce -> Renaming steps
db_saved_validation_data = PipelineData('spark_csv_valid', 
                                      datastore=ws.get_default_datastore(), 
                                      output_name='output_path')

# Create Pipeline Data to pass data from Renaming Step to LightGBM Step
renamed_csv_valid_data = PipelineData('renamed_csv_validation', 
                                      datastore=ws.get_default_datastore(), 
                                      output_name='valid_csv',
                                      is_directory=True)

In [9]:
#####################################
#          COALESCE FILES           #
#####################################
# Spark is used to load any number  #
# of CSV files and write them to    #
# a specified number - passed as    #
# '--number-of-files' parameter     #
#####################################

db_valid_script_params = ['--number-of-files', str(NUM_NODES)]

# Create a Databricks step that takes an arbitrary # of CSV files and writes them out to a specified number of files
valid_databricks_step = DatabricksStep(name='split_validation_data', 
                                       spark_version='5.5.x-scala2.11',
                                       inputs=[validation_data.as_named_input('input_path').as_mount()], 
                                       outputs=[db_saved_validation_data],
                                       source_directory='../code/data_splitter',
                                       python_script_params=db_valid_script_params,
                                       python_script_name='splitter.py',
                                       num_workers=NUM_NODES,
                                       node_type='Standard_DS4_v2',
                                       compute_target=databricks_cluster,
                                       pypi_libraries=[PyPiLibrary('click')],
                                       allow_reuse=True)

In [10]:
#####################################
#          RENAMING FILES           #
#####################################
# Files are renamed from generic    #
# Spark name to 'valid_[i].csv'     #
# Where i will be used to choose    #
# which MPI Rank will load the file #
#####################################

valid_copy_files_params = ['--input_path', db_saved_validation_data,
                           '--output_path', renamed_csv_valid_data,
                           '--file_prefix', "valid"]

copy_valid_files = PythonScriptStep(script_name='rename.py',
                                    name='rename_validation_data', 
                                    source_directory='../code/data_renamer',
                                    inputs=[db_saved_validation_data],
                                    outputs=[renamed_csv_valid_data],
                                    compute_target=training_cluster, 
                                    arguments=valid_copy_files_params,
                                    allow_reuse=True)

### LightGBM Step

In [27]:
lgbm_params = [
    '--train_data', renamed_csv_train_data,
    '--valid_data', renamed_csv_valid_data,
    '--task', 'train',
    '--conf_file', 'train.conf',
    '--metric', 'auc,binary_logloss,binary_error,mean_absolute_error',
    '--num_machines', NUM_NODES,
    '--label_column', f"name:{LABEL_COLUMN_NAME}",
    '--num_iterations', 100,
    '--tree_learner', 'data'
]

from azureml.core import RunConfiguration

mpi_run = MpiStep(name='distributed_lgbm', 
                  source_directory='../code/lightgbm/', 
                  script_name='train.py', 
                  arguments = lgbm_params, 
                  node_count=NUM_NODES, 
                  process_count_per_node=1, 
                  inputs=[renamed_csv_train_data, renamed_csv_valid_data],
                  environment_definition=lgbm_env,
                  compute_target=training_cluster)



In [28]:
pipeline = Pipeline(ws, [mpi_run])

In [29]:
pipeline.validate()

Step distributed_lgbm is ready to be created [27673a03]


[]

In [30]:
pipeline.submit('test_datamunging')

Created step distributed_lgbm [27673a03][cefc0c20-ebde-4615-934e-69a4cc10d3c5], (This step will run and generate new outputs)
Created step rename_training_data [9fbe37df][d1834bb7-7f9f-40fb-944a-8d918ed30ae8], (This step is eligible to reuse a previous run's output)
Created step split_training_data [cdf7765e][a7421e61-184e-4d82-bb74-5ce474b4898e], (This step is eligible to reuse a previous run's output)
Created step rename_validation_data [0c99ff97][cc2ce07d-1512-42d9-9457-48992af6a921], (This step is eligible to reuse a previous run's output)
Created step split_validation_data [031f0a27][b1deb162-b869-41b7-a6c4-feaad4d0d074], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun 6ba3ab3e-05a4-477b-b595-2cabbbb581d7
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/test_datamunging/runs/6ba3ab3e-05a4-477b-b595-2cabbbb581d7?wsid=/subscriptions/dcdc374c-3ce4-4e43-92ad-10070b3b2941/resourcegroups/smart-assistant-ds/workspaces/smart-assistant-

Experiment,Id,Type,Status,Details Page,Docs Page
test_datamunging,6ba3ab3e-05a4-477b-b595-2cabbbb581d7,azureml.PipelineRun,NotStarted,Link to Azure Machine Learning studio,Link to Documentation
