# Processing a large csv file in parallel as part of AzureML Pipelines

This example will look at manipuating csv data using a Pipeline, however, this example can be extended to manipulating data to any scenario that you can achieve through a python script.

## Example Data and Local Scenario

We will be using an example csv that is structured like so: `Transaction_date,Product,Price,...` and summating the Price field to calculate the total revenue. However, any statisitcal or data transformation could be done.

This data has already had the header removed, and split into 10 files as part of the preparation of doing the batch job.

In [None]:
import os

# take a look at one of the file contents
file_path = os.path.join('sample-data', 'SalesJan2009.0.csv')
with open(file_path, 'r') as csvfile:
    for line in csvfile.read().splitlines()[:5]:
        print(line)


Locally, aggregating these totals would be easy to do. Just iterate over the rows of the file, select the transactionTotal field and summate the values. If we were to write this in code, it would look something like this:

In [None]:
import csv

# ['SalesJan2009.0.csv', ... 'SalesJan2009.9.csv' ]
files = [ 'SalesJan2009.{}.csv'.format(i) for i in range (0,10) ]

for file in files:
    total = 0
    file_path = os.path.join('sample-data', file)
    with open(file_path, 'r') as csvfile:
        rows = csv.reader(csvfile)
    
        for fields in rows:
            total += float(fields[2])

    print('Total in {}: {:.2f}'.format(file, total))


## Translating the Scenario to Pipelines

For AzureML Pipeline to work we need to define three main things: our script to run, our inputs, and our outputs. For the script, we are almost done with the above logic, we just need to establish how our data will come into the pipeline and where the results of our transformation will be stored.

### Defining Where our Data Will Live

First we need to establish where our data will live. In the local scenario, it was just a file on our local machine, in the cloud it needs to be accessible by all machines that will be processing that information. First, we need to access our workspace, where we can arrange our data and/or AI resources.

<strong>Note:</strong> If you're unfamiliar with creating a workspace, please visit [this helpful guide](https://docs.microsoft.com/en-us/azure/machine-learning/studio/create-workspace).
We will need to access our cloud resouces to upload our data.

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()
print('Workspace name: ' + ws.name,
      'Azure region: ' + ws.location,
      'Subscription id: ' + ws.subscription_id,
      'Resource group: ' + ws.resource_group, sep = '\n')

## Uploading our inputs to Datastore

Now that we have our workspace loaded, we can upload our files to a datastore. This part can be skipped if it was already uploaded.

In [None]:
# This may be skipped if files were already uploaded
datastore = ws.get_default_datastore()

input_folder = datastore.upload(src_dir='sample-data', target_path='sample-data-on-cloud', overwrite=True, show_progress=True)

## Target Already Uploaded File Inputs.

If your data already exists within a datastore, you're able to  use this to reference that location.

In [None]:
# This needs to be ran if the above step was skipped. However, it won't hurt to run both.
from azureml.data.data_reference import DataReference

datastore = ws.get_default_datastore()

input_folder = DataReference(datastore, path_on_datastore='sample-data-on-cloud')

## Define Output Folder

This will be where the results of our batch job will be located.

In [None]:
from azureml.pipeline.core import PipelineData

# designate output location
output_folder = PipelineData(name='output', datastore=datastore)

### Creating our Python Script

Let's edit our previous csv logic into a Pipelines python script. Pipelines expects each script to define a function called run, which it will be used as an entry point into executing your script. A second init function can be defined to ensure that the script's environment is constructed properly.

In [None]:
# total_transactions.py
import csv
import os


output_directory = 'results/'

def init():
    os.makedirs(output_directory, exist_ok=True)


def run(files):
    total = 0
    for input_file in files:
        # "SalesJan2009.1.csv" -> "1"
        file_number = input_file.split('.')[1]
        with open(input_file, 'r') as csvfile:
            rows = csv.reader(csvfile)

            for fields in rows:
                total += float(fields[2])

            result_file_path = os.path.join(output_directory, 'totals-{}.txt'.format(file_number))
            with open(result_file_path) as result_file:
                result_file.write('Total for {}: {}\n'.format(day, total))

Instead of opening local files, reading the contents, and writing the results to console; we are now reading from a list of files, writing the result to a file, and wrapped all of our logic in a function called run.

### Creating Compute resources

We will need something to run our Pipeline, here we will an aml compute cluster with only one vm.

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.core.webservice.batch import BatchServiceDeploymentConfiguration

# AmlCompute
cpu_cluster_name = "cpucluster"
try:
    cpu_cluster = AmlCompute(ws, cpu_cluster_name)
    print("found existing cluster.")
except ComputeTargetException:
    print("creating new cluster")
    provisioning_config = AmlCompute.provisioning_configuration(vm_size="STANDARD_D2_v2",max_nodes=3)

    # create the cluster
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
deployment_config = BatchServiceDeploymentConfiguration(compute_target=cpu_cluster, 
                                                        node_count=3, process_count_per_node=2)

### Defining our Cloud Transform Step

We need to communicate to AzureML how what we would like to compute. Although named BatchInferenceConfig for legacy reasons, this can accept any type of python script; for our purposes, this will be our csv totaling logic.

In [None]:
from azureml.core import Environment
from azureml.core.model import BatchInferenceConfig
from azureml.pipeline.steps.predictor_step import PredictorStep

inference_config = BatchInferenceConfig(
                    environment=Environment('batchinferencing'),
                    entry_script='total_transactions.py',  # the user script to run against each input
                    input_format='file',
                    error_threshold=100,
                    source_directory='./scripts',
                    output_action='summary_only',
                    description=None)

# create a Predictor step for distributing style transfer step across multiple nodes in AmlCompute
distributed_style_transfer_step = PredictorStep(
    name='weekly-transactions',
    inputs=[input_folder], # Input file share/blob container
    output=output_folder, # Output file share/blob container
    models=[],
    arguments=['--logging_level', 'DEBUG', '--test', 'test'],
    inference_config=inference_config,
    deployment_config=deployment_config,
    allow_reuse=False #[optional - default value True]
)

### Tieing Everything Together

Now that we have a our data and pipeline configured, now all that is left is to construct the pipeline and execute the job.

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[distributed_style_transfer_step])

pipeline_run = Experiment(ws, 'file_summary_only').submit(pipeline, pipeline_params={'aml_node_count': 3})