Copyright (c) Rahul Kumar. All rights reserved.

Licensed under the MIT License.

.

# Test-Train Data Split using parallel_run_function
This notebook demonstrates how to carry out the test train split for larger dataset using ``parallel_run_function``.

## Workspace

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(
    DefaultAzureCredential()
)

ws = ml_client.workspaces.get(name=ml_client.workspace_name)

## Compute

In [None]:
from azure.core.exceptions import ResourceNotFoundError
from azure.ai.ml.entities import AmlCompute

cluster_name = "gen-purpose"

try:
    # Retrieve an already attached Azure Machine Learning Compute.
    compute = ml_client.compute.get(cluster_name)
except ResourceNotFoundError as e:
    compute = AmlCompute(
        name=cluster_name,
        size="STANDARD_D16S_V3",
        type="amlcompute",
        min_instances=0,
        max_instances=10,
        idle_time_before_scale_down=120,
    )
    poller = ml_client.begin_create_or_update(compute)
    poller.wait()

## Environment

In [None]:
from azure.ai.ml.entities import Environment
import os

USE_CURATED_ENV = True

if USE_CURATED_ENV:
    environment = ml_client.environments.get(
        name="AzureML-sklearn-0.24-ubuntu18.04-py37-cpu",
        label="latest"
    )
else:
    dependencies_dir = "../dependencies"
    custom_env_name = "aml-scikit-learn"

    environment = Environment(
        name=custom_env_name,
        description="Custom environment for demo",
        tags={"scikit-learn": "0.24.2"},
        conda_file=os.path.join(dependencies_dir, "conda.yml"),
        image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest",
    )
    environment = ml_client.environments.create_or_update(environment)

    print(
        f"Environment with name {environment.name} is registered to workspace, the environment version is {environment.version}"
    )

## Data
Prepare the data for input and output.
- The original dataset is present in the default datastore of the workspace under ``my_files/original/``.
- The data after splitting will be stored in the default datastore of the workspace under ``my_files/split/``.

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml import Input, Output

# Supported paths include:
# local: './<path>'
# blob:  'https://<account_name>.blob.core.windows.net/<container_name>/<path>'
# ADLS gen2: 'abfss://<file_system>@<account_name>.dfs.core.windows.net/<path>/'
# Datastore: 'azureml://datastores/<data_store_name>/paths/<path>'

dstore = ml_client.datastores.get_default()
input_path = f"azureml://datastores/{dstore.name}/paths/my_files/original"
output_path_train = f"azureml://datastores/{dstore.name}/paths/my_files/split/train"
output_path_test = f"azureml://datastores/{dstore.name}/paths/my_files/split/test"

input_data = Input(
    path=input_path,
    type=AssetTypes.URI_FOLDER,
    description="The data to be split",
    mode=InputOutputModes.RO_MOUNT
)

output_train = Output(
    path=output_path_train,
    type=AssetTypes.URI_FOLDER,
    description="The test data after split",
    mode=InputOutputModes.RW_MOUNT
)

output_test = Output(
    path=output_path_test,
    type=AssetTypes.URI_FOLDER,
    description="The test data after split",
    mode=InputOutputModes.RW_MOUNT
)

## Configure parallel_run_function

In [None]:
from azure.ai.ml.parallel import parallel_run_function, RunFunction
from azure.ai.ml.constants import AssetTypes

# parallel job to process file data
parallel_task = parallel_run_function(
    name="test train split prs",
    display_name="test train split prs",
    description="test train split prs",
    inputs=dict(
        my_input_path=Input(
            type=AssetTypes.URI_FOLDER,
            description="The data to be split",
        )
    ),
    outputs=dict(
        my_output_path_train=output_train,
        my_output_path_test=output_test,
        job_output_path=Output(
            type=AssetTypes.URI_FILE,
            description="The path for default output",
        )
    ),
    input_data="${{inputs.my_input_path}}",
    instance_count=1,
    mini_batch_size="1",
    error_threshold=10,
    mini_batch_error_threshold=1,
    max_concurrency_per_instance=5,
    compute=compute.name,
    is_deterministic=False,
    task=RunFunction(
        code="../scripts",
        entry_script="test_train_split_prs.py",
        program_arguments="--output_dir_train ${{outputs.my_output_path_train}} --output_dir_test ${{outputs.my_output_path_test}}",
        environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        # environment=environment,
        append_row_to="${{outputs.job_output_path}}",
    ),
)

## Build pipeline

In [None]:
from azure.ai.ml.dsl import pipeline

@pipeline()
def parallel_in_pipeline(data_in):
    parallel_split = parallel_task(my_input_path=data_in)

# create a pipeline
pipeline_job = parallel_in_pipeline(input_data)

# set pipeline level compute
pipeline_job.settings.default_compute = compute.name

## Submit the pipeline job

In [None]:
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="demo-pipeline"
)
pipeline_job