In [1]:
import config
from pprint import pprint
import datetime, io, os, sys, time
import azure.storage.blob as azureblob
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batchmodels
from azure.batch.models import BatchErrorException

## Storage Methods

In [2]:
def upload_file_to_container(block_blob_client, container_name, file_path):

    blob_name = os.path.basename(file_path)

    print('Uploading file {} to container [{}]...'.format(file_path,
                                                          container_name))

    block_blob_client.create_blob_from_path(container_name,
                                            blob_name,
                                            file_path)

    sas_token = block_blob_client.generate_blob_shared_access_signature(
        container_name,
        blob_name,
        permission=azureblob.BlobPermissions.READ,
        expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2))

    sas_url = block_blob_client.make_blob_url(container_name,
                                              blob_name,
                                              sas_token=sas_token)

    return batchmodels.ResourceFile(http_url=sas_url, file_path=blob_name)

def get_container_sas_token(block_blob_client,
                            container_name, blob_permissions):
    # Obtain the SAS token for the container, setting the expiry time and
    # permissions. In this case, no start time is specified, so the shared
    # access signature becomes valid immediately.
    container_sas_token = \
        block_blob_client.generate_container_shared_access_signature(
            container_name,
            permission=blob_permissions,
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2))

    return container_sas_token

## Upload Files to Azure Storage

In [3]:
blob_client = azureblob.BlockBlobService(
    account_name=config._STORAGE_ACCOUNT_NAME,
    account_key=config._STORAGE_ACCOUNT_KEY)

input_container_name = 'input-python'
blob_client.create_container(input_container_name, fail_on_exist=False)

       

# The collection of data files that are to be processed by the tasks.
input_file_paths =  [os.path.join(sys.path[0], 'taskdata0.txt'),
                     os.path.join(sys.path[0], 'taskdata1.txt'),
                     os.path.join(sys.path[0], 'taskdata2.txt')]

# Upload the data files. 
input_files = [
    upload_file_to_container(blob_client, input_container_name, file_path)
    for file_path in input_file_paths]

Uploading file /home/quick/code/azure-batch-demo/quickstart/python-batch-quickstart/taskdata0.txt to container [input-python]...
Uploading file /home/quick/code/azure-batch-demo/quickstart/python-batch-quickstart/taskdata1.txt to container [input-python]...
Uploading file /home/quick/code/azure-batch-demo/quickstart/python-batch-quickstart/taskdata2.txt to container [input-python]...


In [4]:
# list of Batch ResourceFiles and their SAS urls for Azure Storage
for file in input_files:
    print(f"File {file.file_path} | SAS: {file.http_url}\n")

File taskdata0.txt | SAS: https://nunosbatchstorage.blob.core.windows.net/input-python/taskdata0.txt?se=2019-03-29T18%3A59%3A19Z&sp=r&sv=2018-03-28&sr=b&sig=bdj6Kjjuc42bGDPEhy/0naS9ng7adJhXio7qpftAVjI%3D

File taskdata1.txt | SAS: https://nunosbatchstorage.blob.core.windows.net/input-python/taskdata1.txt?se=2019-03-29T18%3A59%3A19Z&sp=r&sv=2018-03-28&sr=b&sig=U/ckAZ2GYyFrpPgUeL1EfYtofmd78oC6rxrz4qZ2iaY%3D

File taskdata2.txt | SAS: https://nunosbatchstorage.blob.core.windows.net/input-python/taskdata2.txt?se=2019-03-29T18%3A59%3A19Z&sp=r&sv=2018-03-28&sr=b&sig=8HA5mNMeKT2IdEzc59LkQQDb7EKAIdXBp/IwyNeNCpQ%3D



## Batch Service client

In [5]:
# Create a Batch service client. We'll now be interacting with the Batch
# service in addition to Storage
credentials = batch_auth.SharedKeyCredentials(config._BATCH_ACCOUNT_NAME,
                                              config._BATCH_ACCOUNT_KEY)

batch_client = batch.BatchServiceClient(
    credentials,
    batch_url=config._BATCH_ACCOUNT_URL)


### Create Pool

In [6]:
def create_pool(batch_service_client, pool_id):
    print('Creating pool [{}]...'.format(pool_id))

    # Create a new pool of Linux compute nodes using an Azure Virtual Machines
    # Marketplace image. For more information about creating pools of Linux
    # nodes, see:
    # https://azure.microsoft.com/documentation/articles/batch-linux-nodes/
    new_pool = batch.models.PoolAddParameter(
        id=pool_id,
        virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
            image_reference=batchmodels.ImageReference(
                    publisher="Canonical",
                    offer="UbuntuServer",
                    sku="18.04-LTS",
                    version="latest"
                ),
        node_agent_sku_id="batch.node.ubuntu 18.04"),
        vm_size=config._POOL_VM_SIZE,
        target_dedicated_nodes=config._POOL_NODE_COUNT
    )
    try:
        batch_service_client.pool.add(new_pool)
    except BatchErrorException:
        print("Pool already exists. Reusing current pool")

# Create the pool that will contain the compute nodes that will execute the tasks.
create_pool(batch_client, config._POOL_ID)

Creating pool [PythonQuickstartPool]...


In [7]:
pool = batch_client.pool.get(config._POOL_ID)
pprint(pool.as_dict())

{'allocation_state': 'resizing',
 'allocation_state_transition_time': '2019-03-29T16:59:28.755735Z',
 'creation_time': '2019-03-29T16:59:28.755735Z',
 'current_dedicated_nodes': 0,
 'current_low_priority_nodes': 0,
 'e_tag': '0x8D6B467E810E8EE',
 'enable_auto_scale': False,
 'enable_inter_node_communication': False,
 'id': 'PythonQuickstartPool',
 'last_modified': '2019-03-29T16:59:28.755735Z',
 'max_tasks_per_node': 1,
 'resize_timeout': 'PT15M',
 'state': 'active',
 'state_transition_time': '2019-03-29T16:59:28.755735Z',
 'target_dedicated_nodes': 2,
 'target_low_priority_nodes': 0,
 'task_scheduling_policy': {'node_fill_type': 'spread'},
 'url': 'https://nunos.westeurope.batch.azure.com/pools/PythonQuickstartPool',
 'virtual_machine_configuration': {'image_reference': {'offer': 'UbuntuServer',
                                                       'publisher': 'Canonical',
                                                       'sku': '18.04-LTS',
                                    

### Create Job

In [8]:
def create_job(batch_service_client, job_id, pool_id):

    print('Creating job [{}]...'.format(job_id))

    job = batch.models.JobAddParameter(
        id=job_id,
        pool_info=batch.models.PoolInformation(pool_id=pool_id))

    try:
        batch_service_client.job.add(job)
    except BatchErrorException:
        print("Job already exists")
    

# Create the job that will run the tasks.
create_job(batch_client, config._JOB_ID, config._POOL_ID)

Creating job [PythonQuickstartJob]...


In [9]:
job = batch_client.job.get(config._JOB_ID)
pprint(job.as_dict())

{'constraints': {'max_task_retry_count': 0,
                 'max_wall_clock_time': 'P10675199DT2H48M5.477581S'},
 'creation_time': '2019-03-29T16:59:47.944139Z',
 'e_tag': '0x8D6B467F383BB4F',
 'execution_info': {'pool_id': 'PythonQuickstartPool',
                    'start_time': '2019-03-29T16:59:47.963169Z'},
 'id': 'PythonQuickstartJob',
 'last_modified': '2019-03-29T16:59:47.963169Z',
 'on_all_tasks_complete': 'noaction',
 'on_task_failure': 'noaction',
 'pool_info': {'pool_id': 'PythonQuickstartPool'},
 'priority': 0,
 'state': 'active',
 'state_transition_time': '2019-03-29T16:59:47.963169Z',
 'url': 'https://nunos.westeurope.batch.azure.com/jobs/PythonQuickstartJob',
 'uses_task_dependencies': False}


### Create Tasks

In [10]:
def add_tasks(batch_service_client, job_id, input_files):

    print('Adding {} tasks to job [{}]...'.format(len(input_files), job_id))

    tasks = list()

    for idx, input_file in enumerate(input_files): 

        command = "/bin/bash -c \"cat {}\"".format(input_file.file_path)
        tasks.append(batch.models.TaskAddParameter(
                id='Task{}'.format(idx),
                command_line=command,
                resource_files=[input_file]
                )
        )
    
    batch_service_client.task.add_collection(job_id, tasks)
    return tasks

# Add the tasks to the job. 
tasks = add_tasks(batch_client, config._JOB_ID, input_files)   

Adding 3 tasks to job [PythonQuickstartJob]...


In [11]:
for task in tasks:
    pprint(task.as_dict())
    print()

{'command_line': '/bin/bash -c "cat taskdata0.txt"',
 'id': 'Task0',
 'resource_files': [{'file_path': 'taskdata0.txt',
                     'http_url': 'https://nunosbatchstorage.blob.core.windows.net/input-python/taskdata0.txt?se=2019-03-29T18%3A59%3A19Z&sp=r&sv=2018-03-28&sr=b&sig=bdj6Kjjuc42bGDPEhy/0naS9ng7adJhXio7qpftAVjI%3D'}]}

{'command_line': '/bin/bash -c "cat taskdata1.txt"',
 'id': 'Task1',
 'resource_files': [{'file_path': 'taskdata1.txt',
                     'http_url': 'https://nunosbatchstorage.blob.core.windows.net/input-python/taskdata1.txt?se=2019-03-29T18%3A59%3A19Z&sp=r&sv=2018-03-28&sr=b&sig=U/ckAZ2GYyFrpPgUeL1EfYtofmd78oC6rxrz4qZ2iaY%3D'}]}

{'command_line': '/bin/bash -c "cat taskdata2.txt"',
 'id': 'Task2',
 'resource_files': [{'file_path': 'taskdata2.txt',
                     'http_url': 'https://nunosbatchstorage.blob.core.windows.net/input-python/taskdata2.txt?se=2019-03-29T18%3A59%3A19Z&sp=r&sv=2018-03-28&sr=b&sig=8HA5mNMeKT2IdEzc59LkQQDb7EKAIdXBp/IwyNeNC

### Wait for tasks to complete

In [12]:
def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
    timeout_expiration = datetime.datetime.now() + timeout

    print("Monitoring all tasks for 'Completed' state, timeout in {}..."
          .format(timeout), end='')

    while datetime.datetime.now() < timeout_expiration:
        print('.', end='')
        sys.stdout.flush()
        tasks = batch_service_client.task.list(job_id)

        incomplete_tasks = [task for task in tasks if
                            task.state != batchmodels.TaskState.completed]
        if not incomplete_tasks:
            print()
            return True
        else:
            time.sleep(1)

    print()
    raise RuntimeError("ERROR: Tasks did not reach 'Completed' state within "
                       "timeout period of " + str(timeout))

wait_for_tasks_to_complete(batch_client,
                           config._JOB_ID,
                           datetime.timedelta(minutes=30))

print("  Success! All tasks reached the 'Completed' state within the "
          "specified timeout period.")

Monitoring all tasks for 'Completed' state, timeout in 0:30:00..........................................................................................................................
  Success! All tasks reached the 'Completed' state within the specified timeout period.


### Check output

In [13]:
def print_task_output(batch_service_client, job_id, encoding=None):

    print('Printing task output...')

    tasks = batch_service_client.task.list(job_id)

    for task in tasks:

        node_id = batch_service_client.task.get(job_id, task.id).node_info.node_id
        print("Task: {}".format(task.id))
        print("Node: {}".format(node_id))

        stream = batch_service_client.file.get_from_task(job_id, task.id, config._STANDARD_OUT_FILE_NAME)

        file_text = _read_stream_as_string(
            stream,
            encoding)
        print("Standard output:")
        print(file_text)

def _read_stream_as_string(stream, encoding):
    output = io.BytesIO()
    try:
        for data in stream:
            output.write(data)
        if encoding is None:
            encoding = 'utf-8'
        return output.getvalue().decode(encoding)
    finally:
        output.close()
    raise RuntimeError('could not write data to stream or decode bytes')

    
# Print the stdout.txt and stderr.txt files for each task to the console
print_task_output(batch_client, config._JOB_ID)

Printing task output...
Task: Task0
Node: tvm-905457941_2-20190329t170027z
Standard output:
Batch processing began with mainframe computers and punch cards. Today it still plays a central role in business, engineering, science, and other pursuits that require running lots of automated tasks—processing bills and payroll, calculating portfolio risk, designing new products, rendering animated films, testing software, searching for energy, predicting the weather, and finding new cures for disease. Previously only a few had access to the computing power for these scenarios. With Azure, that power is available to you when you need it, without a massive capital investment.

Task: Task1
Node: tvm-905457941_2-20190329t170027z
Standard output:
Azure Virtual Machines lets you deploy a wide range of computing solutions in an agile way. Deploy a virtual machine nearly instantly, and pay by the minute. With support for Microsoft Windows, Linux, Microsoft SQL Server, Oracle, IBM, SAP, and Azure BizTa

### Cleanup

In [14]:
# Clean up storage resources
print('Deleting container [{}]...'.format(input_container_name))
blob_client.delete_container(input_container_name)

Deleting container [input-python]...


True

In [17]:
# delete job
batch_client.job.delete(config._JOB_ID)

In [18]:
# delete pool
batch_client.pool.delete(config._POOL_ID)