In [None]:
import azureml.core
from azureml.telemetry import set_diagnostics_collection
set_diagnostics_collection(send_diagnostics=True)

from azureml.core.workspace import Workspace
ws = Workspace.from_config()
print('Workspace name: ' + ws.name,
      'Azure region: ' + ws.location,
      'Subscription id: ' + ws.subscription_id,
      'Resource group: ' + ws.resource_group, sep='\n')

In [None]:
from azureml.core.compute import ComputeTarget
from azureml.core.compute_target import ComputeTargetException

cpu_cluster_name = "cpu-8x-f8s"
cpu_compute_target = ComputeTarget(workspace=ws, name=cpu_cluster_name)
print(f"CPU Compute target: {cpu_compute_target.name}")

In [None]:
from azureml.core import Environment,Datastore
from azureml.core.dataset import Dataset
from azureml.pipeline.core import Pipeline,PipelineData
from azureml.pipeline.core.pipeline_output_dataset import PipelineOutputFileDataset
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# Environment Definitions
cpu_env = Environment.from_conda_specification(name = "ubuntu",file_path = "./conda-cpu.yml")
cpu_env.docker.enabled = True
cpu_env.docker.base_image = "mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04"

# control parallelism
cpu_node_count=1
cpu_process_count_per_node=8

#
datastore_name = 'data'
datastore = Datastore.get(ws, datastore_name)

# pdf input
pdf_ni = Dataset.get_by_name(ws, name='1_raw').as_named_input("pdf")

# for intermediate data files
png_pd = PipelineOutputFileDataset(PipelineData(name="png",datastore=datastore))
documents_pd = PipelineOutputFileDataset(PipelineData(name="classification",datastore=datastore))
metadata_pd = PipelineOutputFileDataset(PipelineData(name="metadata",datastore=datastore))

# convert to png
pdf_to_png_config = ParallelRunConfig(
    source_directory='1_pdf_to_png',
    entry_script="pdf_to_png.py",
    mini_batch_size="5", 
    error_threshold=1,
    output_action="append_row",
    append_row_file_name="1_pdf_to_png.csv",
    environment=cpu_env,
    compute_target=cpu_compute_target,
    node_count=cpu_node_count,
    process_count_per_node=cpu_process_count_per_node
)
pdf_to_png_step = ParallelRunStep(
    name="pdf-to-png",
    parallel_run_config=pdf_to_png_config,
    arguments=["--png-folder", png_pd],
    inputs=[pdf_ni],
    output=png_pd,
    allow_reuse=True
)

# classify document
document_classification_config = ParallelRunConfig(
    source_directory='2_document_classification',
    entry_script="document_classification.py",
    mini_batch_size="5", 
    error_threshold=1,
    output_action="append_row",
    append_row_file_name="2_document_classification.csv",
    environment=cpu_env,
    compute_target=cpu_compute_target,
    node_count=cpu_node_count,
    process_count_per_node=cpu_process_count_per_node
)
document_classification_step = ParallelRunStep(
    name="document-classification",
    parallel_run_config=document_classification_config,
    arguments=["--documents-folder", documents_pd],
    inputs=[png_pd],
    output=documents_pd,
    allow_reuse=True
)

# extract metadata
form_metadata_extraction_config = ParallelRunConfig(
    source_directory='3_form_metadata_extraction',
    entry_script="form_metadata_extraction.py",
    mini_batch_size="5", 
    error_threshold=1,
    output_action="append_row",
    append_row_file_name="3_form_metadata_extraction.csv",
    environment=cpu_env,
    compute_target=cpu_compute_target,
    node_count=cpu_node_count,
    process_count_per_node=cpu_process_count_per_node
)
form_metadata_extraction_step = ParallelRunStep(
    name="form-metadata-extraction",
    parallel_run_config=form_metadata_extraction_config,
    arguments=["--png-folder", png_pd,
              "--metadata-folder", metadata_pd],
    inputs=[documents_pd],
    output=metadata_pd,
    allow_reuse=True
)

pipeline = Pipeline(workspace=ws, steps=[form_metadata_extraction_step])
print(pipeline)

In [None]:
from azureml.core import Experiment

experiment_name = 'pdf-to-cosmosDB'
experiment = Experiment(ws, name=experiment_name)
run = experiment.submit(pipeline,tags={'cpu_nodes': str(cpu_node_count)})

In [None]:
run.wait_for_completion(show_output=True)