Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/automated-machine-learning/forecasting-distributed-tcn/data-partition.png)

# Data Preparation - Distributed TCN
_**Perform data partition and split the partitioned data into train, validation and test set**_

---

## 1.0 Setup

In [None]:
import time
import pandas as pd

import azureml.core
from azureml.core import Workspace, Experiment, Dataset, Environment
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.pipeline.steps import PythonScriptStep, ParallelRunConfig, ParallelRunStep
from azureml.pipeline.core import Pipeline
from azureml.data.output_dataset_config import OutputFileDatasetConfig
from azureml.data.datapath import DataPath

## 2.0 Set up workspace, datastore, experiment

In [None]:
ws = Workspace.from_config()
datastore = ws.get_default_datastore()

experiment = Experiment(ws, "pre-distributed-tcn")

output = {}
output["Subscription ID"] = ws.subscription_id
output["Workspace"] = ws.name
output["SKU"] = ws.sku
output["Resource Group"] = ws.resource_group
output["Location"] = ws.location
output["Experiment Name"] = experiment.name
output["SDK Version"] = azureml.core.VERSION
pd.set_option("display.max_colwidth", None)
outputDf = pd.DataFrame(data=output, index=[""])
outputDf.T

## 3.0 Data

The data is expected to reside in any of the datastores of the workspace. If the data is present in a blob container that is not regisitered as a datastore, please refer [this link](https://learn.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore(class)?view=azure-ml-py#azureml-core-datastore-register-azure-blob-container) to register blob container as a datastore in the workspace. Update the following variables accordingly:

| Variable | Description |
| -- | -- |
| **datastore_name** | Name of the datastore having data. |
| **input_data_path** | Path to the data in the datastore. |
| **time_series_id_column_names** | The column names used to uniquely identify timeseries in data that has multiple rows with the same timestamp. |
| **time_column_name** | The name of your time column. |
| **input_dataset_name** | Name for your dataset. Used in later steps to calculate paths and names for the final processed data. |
| **test_split** | Ratio for test data. It is the fraction of rows per time series that will be extracted from the input data to create test data. |
| **valid_split** | Ratio for valid data. It is the fraction of rows per time series that will be extracted from the remaining data after test split to create validation data. |

<br/>

> **Note**: If test_split is 0 (this is possible when test data is already available), test_split won't be performed and valid_split will be directly performed on the data. 

In [None]:
datastore_name = datastore.name
input_data_path = "my-data/*.csv"

test_split = 0.2
valid_split = 0.2

time_series_id_column_names = []
time_column_name = ""
input_dataset_name = ""

## 4.0 Build the pipeline

### 4.1 Compute

You will need to create a [compute target](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-set-up-training-targets#amlcompute) for your AutoML run. In this tutorial, you create AmlCompute as your training compute resource.

\*\*Creation of AmlCompute takes approximately 5 minutes.**

If the AmlCompute with that name is already in your workspace this code will skip the creation process. As with other Azure services, there are limits on certain resources (e.g. AmlCompute) associated with the Azure Machine Learning service. Please read this [article](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-manage-quotas) on the default limits and how to request more quota.

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
cpu_cluster_name = "data-partition-cluster"

# Verify that cluster does not exist already
try:
    compute_target = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print("Found existing cluster, use it.")
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(
        vm_size="STANDARD_D16S_V3", max_nodes=5
    )
    compute_target = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

compute_target.wait_for_completion(show_output=True)

### 4.2 Run Config

The next step is making sure that the remote pipeline run has all the dependencies needed by the pipeline steps. Dependencies and the runtime context are set by creating and configuring a ``RunConfiguration`` object.

The code below shows two options for handling dependencies. As presented, with ``USE_CURATED_ENV = True``, the configuration is based on a [curated environment](https://docs.microsoft.com/en-us/azure/machine-learning/resource-curated-environments). Curated environments have prebuilt Docker images in the [Microsoft Container Registry](https://hub.docker.com/publishers/microsoftowner). For more information, see [Azure Machine Learning curated environments](https://docs.microsoft.com/en-us/azure/machine-learning/resource-curated-environments).

The path taken if you change ``USE_CURATED_ENV`` to False shows the pattern for explicitly setting your dependencies. In that scenario, a new custom Docker image will be created and registered in an Azure Container Registry within your resource group (see [Introduction to private Docker container registries in Azure](https://docs.microsoft.com/en-us/azure/container-registry/container-registry-intro)). Building and registering this image can take quite a few minutes.

In [None]:
aml_run_config = RunConfiguration()
aml_run_config.target = compute_target

USE_CURATED_ENV = True
if USE_CURATED_ENV:
    curated_environment = Environment.get(
        workspace=ws, name="AzureML-sklearn-0.24-ubuntu18.04-py37-cpu"
    )
    aml_run_config.environment = curated_environment
else:
    aml_run_config.environment.python.user_managed_dependencies = False

    # Add some packages relied on by data prep step
    aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
        conda_packages=["pandas", "scikit-learn"],
        pip_packages=["azureml-sdk", "azureml-dataset-runtime[fuse,pandas]"],
        pin_sdk_version=False,
    )

### 4.3 Data Partitioning

#### 4.3.1 Define the output for data partitioning

In [None]:
partitioned_dataset_name = f"{input_dataset_name}_partitioned_{int(time.time())}"

output_path = OutputFileDatasetConfig(
    name="partitioned_data", destination=(datastore, partitioned_dataset_name)
).as_mount()

#### 4.3.2 Configure ``PythonScriptStep``

This step is responsible for partitioning the data using ``partition_by`` method.

``PythonScriptStep`` arguments
| Property | Description |
| -- | -- |
| **source_directory** | A folder that contains Python script. |
| **script_name** | The name of a Python script relative to **source_directory**. |
| **arguments** | Command line arguments for the Python script file. |
| **compute_target** | The compute target to use. |
| **runconfig** | The ``RunConfiguration`` to use. A ``RunConfiguration`` can be used to specify additional requirements for the run, such as conda dependencies and a docker image. |
| **allow_reuse** | Indicates whether the step should reuse previous results when re-run with the same settings. |

In [None]:
from helper import get_step_args


step_args = get_step_args(
    time_series_id_column_names,
    output_path,
    partitioned_dataset_name,
    input_data_path,
    datastore_name    
)

data_partition_step = PythonScriptStep(
    source_directory="./scripts",
    script_name="partition.py",
    arguments=step_args,
    compute_target=compute_target,
    runconfig=aml_run_config,
    allow_reuse=False,
)

### 4.4 Data Splitting

#### 4.4.1 Define the output for data splitting

In [None]:
output_location = f"data-partitioned/{input_dataset_name}/split_partitioned_{int(time.time())}"

output_path_split = OutputFileDatasetConfig(
    name="prepared_data", destination=(datastore, output_location)
).as_mount()

#### 4.4.2 Configure ``ParallelRunStep``

This step is responsible for splitting the data into train, validation and test set. The output from previous step will be the input to this step.

``ParallelRunConfig`` arguments
| Property | Description |
| -- | -- |
| **source_directory** | Path to folder that contains the ``entry_script`` and supporting files used to execute on compute target. |
| **entry_script** | User script which will be run in parallel on multiple nodes. This is specified as a local file path relative to **source_directory**. |
| **mini_batch_size** | For FileDataset input, this field is the number of files a user script can process in one ``run()`` call. |
| **error_threshold** | The number of file failures that should be ignored during processing. If the error count goes above this value, then the job will be aborted. |
| **output_action** | How the output should be organized. Current supported values are 'append_row' and 'summary_only'. |
| **environment** | The environment definition is responsible for defining the required application dependencies, such as conda or pip packages. |
| **compute_target** | Compute target to use for ParallelRunStep execution. This parameter may be specified as a compute target object or the name of a compute target in the workspace.
| **node_count** | Number of nodes in the compute target used for running the ParallelRunStep. |
| **process_count_per_node** | The number of worker processes per node to run the entry script in parallel. |

``ParallelRunStep`` arguments
| Property | Description |
| -- | -- |
| **name** | Name of the step. Must be unique to the workspace. |
| **parallel_run_config** | A ``ParallelRunConfig`` object used to determine required run properties. |
| **inputs** | List of input datasets. All datasets in the list should be of same type. |
| **arguments** | List of command-line arguments to pass to the Python entry_script. |
| **output** | Output port binding, may be used by later pipeline steps. |
| **allow_reuse** | Whether the step should reuse previous results when run with the same settings/inputs. |

<br/>

> **Note**: Please increase the value of argument ``first_task_creation_timeout`` in method ``get_prs_args`` if ``ParallelRunStep`` fails with FirstTaskCreationTimeout.

In [None]:
from helper import get_prs_args


partitioned_dataset = output_path

parallel_run_config = ParallelRunConfig(
    source_directory="./scripts",
    entry_script="train_valid_test_split.py",
    mini_batch_size=50,
    error_threshold=5,
    output_action="append_row",
    environment=aml_run_config.environment,
    compute_target=compute_target,
    node_count=5,
    process_count_per_node=64,
)

prs_args = get_prs_args(
    time_column_name,
    time_series_id_column_names,
    test_split,
    valid_split
)

parallel_run_step = ParallelRunStep(
    name="test valid train split prs",
    parallel_run_config=parallel_run_config,
    inputs=[partitioned_dataset.as_input("partitioned_data")],
    arguments=prs_args,
    output=output_path_split,
    allow_reuse=False,
)

## 5.0 Configure and Submit the pipeline

Next, we define a ``Pipeline`` object that includes data partitioning and data splitting as steps. Then, we submit the pipeline to run. The entire run can take a good amount of time, this majorly depends on the size of the data and compute used. With the config used in this notebook on a 200 GB data, it should roughly take 1.5 to 2 hours.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[data_partition_step, parallel_run_step])
run = experiment.submit(pipeline)

In [None]:
run.wait_for_completion(show_output=False)

## 6.0 Register the dataset

Finally, we register the datasets to the workspace so that they could be used directly in the actual experiment.

In [None]:
from helper import get_partition_str


partition_format, partition_path = get_partition_str(time_series_id_column_names)

train_ds = Dataset.Tabular.from_parquet_files(
    path=(datastore, f"{output_location}/train/{partition_path}"),
    partition_format=partition_format, validate=False
)
train_ds.register(ws, f"{input_dataset_name}_partitioned_train", create_new_version=True)

if valid_split > 0:
    valid_ds = Dataset.Tabular.from_parquet_files(
        path=(datastore, f"{output_location}/valid/{partition_path}"),
        partition_format=partition_format, validate=False
    )
    valid_ds.register(ws, f"{input_dataset_name}_partitioned_valid", create_new_version=True)

if test_split > 0:
    test_ds = Dataset.Tabular.from_parquet_files(
        path=(datastore, f"{output_location}/test/{partition_path}"),
        partition_format=partition_format, validate=False
    )
    test_ds.register(ws, f"{input_dataset_name}_partitioned_test", create_new_version=True)
else:
    test_ds = Dataset.Tabular.from_delimited_files(path=(datastore, "<test_data_path>"), validate=False)
    test_ds.register(ws, f"{input_dataset_name}_test", create_new_version=True)