# Use Parallel Run step for distributed data preprocessing

In this example we will use [ParallRun step](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-use-parallel-run-step) in pipeline to process academic papers from [Covid-19 open research dataset](https://azure.microsoft.com/en-us/services/open-datasets/catalog/covid-19-open-research/) and write the output back to ADLS Gen 2 datastore. 

## Install private build with output dataset feature

In [None]:
!pip install --extra-index-url https://azuremlsdktestpypi.azureedge.net/Create-Dev-Index/15335858/ --pre "azureml-sdk[automl]<0.1.1"

## Set up your development environment

All the setup for your development work can be accomplished in a Python notebook.  Setup includes:

* Importing Python packages
* Connecting to a workspace to enable communication between your local computer and remote resources
* Creating an experiment to track all your runs
* Creating a remote compute target to use for training

### Import packages

Import Python packages you need in this session. Also display the Azure Machine Learning SDK version.

In [None]:
import os
import azureml.core
from azureml.core import Workspace, Dataset, Datastore, ComputeTarget, RunConfiguration, Experiment
from azureml.core.runconfig import CondaDependencies
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.pipeline.core import Pipeline
# check core SDK version number
print("Azure ML SDK Version: ", azureml.core.VERSION)

### Connect to workspace

Create a workspace object from the existing workspace. `Workspace.from_config()` reads the file **config.json** and loads the details into an object named `workspace`.

In [None]:
# load workspace
workspace = Workspace.from_config()
print('Workspace name: ' + workspace.name, 
      'Azure region: ' + workspace.location, 
      'Subscription id: ' + workspace.subscription_id, 
      'Resource group: ' + workspace.resource_group, sep='\n')

## Access Data from COVID-19 Open Research Dataset

We will now create 2 dataset from the open dataset blob container. Learn more about the file structure in open dataset storage [here](https://azure.microsoft.com/en-us/services/open-datasets/catalog/covid-19-open-research/)

In [None]:
covid_dstore = Datastore.register_azure_blob_container(workspace=workspace,
                                                       datastore_name='covid_data',
                                                       container_name='covid19temp',
                                                       account_name='azureopendatastorage',
                                                       sas_token='sv=2019-02-02&ss=bfqt&srt=sco&sp=rlcup&se=2025-04-14T00:21:16Z&st=2020-04-13T16:21:16Z&spr=https&sig=JgwLYbdGruHxRYTpr5dxfJqobKbhGap8WUtKFadcivQ%3D')

In [None]:
# The CORD-19 dataset comes with metadata.csv - a single file that records basic information on all the papers available in the CORD-19 dataset.
covid_meta = Dataset.File.from_files(covid_dstore.path('metadata.csv'))

In [None]:
# Each .json file corresponds to an individual article in the dataset. This is where the title, authors, abstract and (where available) the full text data is stored.
covid_ds = Dataset.File.from_files(covid_dstore.path('**/*.json'))

In [None]:
covid_meta = covid_meta.register(workspace, 'covid-19 metadata', create_new_version=True)
covid_ds = covid_ds.register(workspace, 'covid-19 dataset', create_new_version=True)

## Data Exploration

We will do some data exploration to understand the academic papers referenced by covid_ds by mounting the covid_ds to local compute. [Learn more](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-train-with-datasets#mount-vs-download)<br>
**Note:** Dataset mounting only works with Linux based compute. If you are using AzureML compute instance, you can run the following code without any issue.

In [None]:
# list the files referenced by covid_ds
covid_ds.to_path()

In [None]:
# it should returns the total number research papers referenced by the dataset
len(covid_ds.to_path())

In [None]:
# mount covid_ds to your local compute. This way you will be able to interact with the data like a local path 
import tempfile
mounted_path = tempfile.mkdtemp()
# mount covid_ds onto compute instance
mount_context = covid_ds.mount(mounted_path)
mount_context.start()

In [None]:
# load the one papaer from the dataset to view the paper content
from utils import FileReader, get_breaks
first_row = FileReader(mounted_path+'/biorxiv_medrxiv/pdf_json/0015023cc06b5362d332b3baf348d11567ca2fbb.json')
print(first_row)

## Run data preprocessing in parallel

In [None]:
# get the conpute target registered with your workspace
compute_target = workspace.compute_targets['mlc']

# define the script folder where you saved your data preparation script
scripts_folder="script"
script_file="dataprep.py"

Output of the parallelrun step is represented by a OutputFileDatasetConfig object. You can register a OutputFileDatasetConfig as a dataset and version the output data automatically.

In [None]:
from azureml.data import OutputFileDatasetConfig

# learn more about the output config
help(OutputFileDatasetConfig)

In [None]:
# get adls gen 2 datastore already registered with the workspace
datastore = workspace.datastores['may_adlsgen2']

# write output to adlsgen2 datastore under folder `outputdataset/parallelrun` and registger it as a dataset after the experiment completes
# make sure the service principal in your adlsgen2 datastore has blob data contributor role in order to write data back
prepared_covid_ds = OutputFileDatasetConfig(destination=(datastore, 'outputdataset/parallelrun')).register_on_complete(name='prepared_covid_ds')

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE

batch_conda_deps = CondaDependencies.create(pip_packages=['pandas','azureml-sdk<0.1.1'],
                                            pip_indexurl='https://azuremlsdktestpypi.azureedge.net/Create-Dev-Index/15335858/')

batch_env = Environment(name="batch_environment")
batch_env.python.conda_dependencies = batch_conda_deps
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_CPU_IMAGE

In [None]:
parallel_run_config = ParallelRunConfig(
    source_directory=scripts_folder,
    entry_script=script_file,
    mini_batch_size="100",
    error_threshold=3000,
    output_action="append_row",
    environment=batch_env,
    compute_target=compute_target,
    node_count=2)

In [None]:
parallelrun_step = ParallelRunStep(
    name="data-pre-process",
    parallel_run_config=parallel_run_config,
    inputs=[ covid_ds.take(10000).as_named_input('covid_ds') ],
    output=prepared_covid_ds,
    allow_reuse=True
)

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=workspace, steps=[parallelrun_step])

pipeline_run = Experiment(workspace, 'covid-19').submit(pipeline)

In [None]:
pipeline_run.wait_for_completion()

# Explore output

In [None]:
import pandas as pd

# the output dataset has been registered with the workspace. now you can just get the output dataset by name
output_ds = workspace.datasets['prepared_covid_ds']
download_path = output_ds.download('output')

In [None]:
# cleanup output format
df = pd.read_csv(download_path[0], delimiter=" ", header=None)
df.columns = ['paper_id', 'doi', 'abstract', 'body_text', 'authors', 'title', 'journal', 'abstract_summary']
df