# Azure Batch Python Notebook

This is is a sample Jupyter Notebook containing some code to manage Azure Batch container-based pools with the Python SDK. Some of the code is taken from the samples in the Github repo below.

It demonstrates these concepts, amongst others:

* How to create a container-enabled pool, including startup tasks
* How to access the state of compute nodes inside of a pool
* How to launch container-based tasks, with or without a custom CMD, with the default WD or with another one
* How to delete tasks, jobs and pools

See these links for further information:

* https://docs.microsoft.com/en-us/azure/batch/batch-account-create-portal: How-To Azure Batch documentation in docs.microsoft.com
* https://docs.microsoft.com/en-us/python/api/azure-batch/azure.batch?view=azure-python: Azure Batch Python SDK reference
* https://github.com/Azure/azure-batch-samples/tree/master/Python/Batch: Azure Batch Python samples, where this notebook is based from

In [None]:
import datetime
import os
import time
import io
import azure.storage.blob as azureblob
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batchauth
import azure.batch.models as batchmodels

In [None]:
# Set up the configuration
batch_account_key = 'one of your two batch account keys'
batch_account_name = 'yourbatchaccount'
batch_service_url = 'https://yourbatchaccount.westeurope.batch.azure.com'

storage_account_key = 'one of your two storage account keys'
storage_account_name = 'yourstorageaccount'       # Not the FQDN, just the account name
storage_account_suffix = 'core.windows.net'
blobfuse_container_mount = 'mountdrive'
blobfuse_connection_filename = 'connection.cfg'  # Needs to match the one in compute_node_init.sh
should_delete_container = False
should_delete_job = False
should_delete_pool = False
pool_vm_size = "STANDARD_D1_V2"
pool_vm_count = 1
vm_publisher = 'microsoft-azure-batch'
vm_offer = 'ubuntu-server-container'
vm_sku = '16-04-lts'
os_username = "batchuser"
os_password = "yoursecretpassword123!"

batch_pool_name = "mycomputepool"
batch_job_name = "myJob"
batch_task_name = "myDockerTask"
batch_bash_task_name = "myBashTask"

startup_blob_container_name = "startup"
compute_node_init = "compute_node_init.sh"
compute_node_init_path = "./" + compute_node_init
docker_run_command = "docker run"
stdout_filename = "stdout.txt"
stderr_filename = "stderr.txt"

task_blob_container_name = "task"
task_file_to_upload = "hellotask.txt"
task_file_to_upload_path = "./hellotask.txt"

acr_name = "youracrname"
acr_url = acr_name + ".azurecr.io"
acr_username = "youracrusername"
acr_password = "youracrpassword"
acr_image = "yourcontainerimage:1.0"
docker_image_name = acr_url + "/" + acr_image

# Pool creation

These cells contain functions relevant to creating a container-enabled pool

* `wrap_commands_in_shell` is used for the startup task, to serialize multiple commands (it is used for standard tasks as well)
* `create_container_pool` defines a pool object with a containerConfig, and it gives it to `create_pool_if_not_exist`
* `create_pool_if_not_exist` calls the method batch_client.pool.add
* For the startup task a startup file is uploaded to a blob container with a new SAS. `create_container_pool` calls these functions that help with the Azure Storage API: `upload_blob_and_create_sas` and `create_sas_token`
* `select_latest_verified_vm_image_with_node_agent_sku` helps by providing the latest image supported by Azure Batch corresponding to a certain Publisher, Offer and SKU prefix combination
* `wait_for_pool_to_complete` periodically checks the newly provisioned pool until the nodes are ready
* `print_pool_info` prints some details about the pool, once it is ready
* `wait_for_pool_to_be_deleted` periodically checks a certain pool until it no longer exists

In [None]:
# This function converts a list of bash commands into a single string that can be used as command for
#   a container/bash task
def wrap_commands_in_shell(ostype, commands):
    """Wrap commands in a shell
    :param list commands: list of commands to wrap
    :param str ostype: OS type, linux or windows
    :rtype: str
    :return: a shell wrapping commands
    """
    if ostype.lower() == 'linux':
        return '/bin/bash -c \'set -e; set -o pipefail; {}; wait\''.format(
            ';'.join(commands))
    elif ostype.lower() == 'windows':
        return 'cmd.exe /c "{}"'.format('&'.join(commands))
    else:
        raise ValueError('unknown ostype: {}'.format(ostype))

In [None]:
# Create blob container and batch pool with a container configuration, including a startup task
def create_container_pool(batch_client, block_blob_client, pool_id, vm_size, vm_count):
    """Creates an Azure Batch pool with the specified id.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param block_blob_client: The storage block blob client to use.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str pool_id: The id of the pool to create.
    :param str vm_size: vm size (sku)
    :param int vm_count: number of vms to allocate
    """
    # pick the latest supported sku for UbuntuServer
    sku_to_use, image_ref_to_use = \
        select_latest_verified_vm_image_with_node_agent_sku(
            batch_client, vm_publisher, vm_offer, vm_sku)
    print("Selected reference image:", image_ref_to_use)
    
    # Create new blob container in Azure Storage with a SAS
    print('Creating blob container', startup_blob_container_name, 'and uploading file', compute_node_init)
    block_blob_client.create_container(startup_blob_container_name, fail_on_exist=False)
    if os.path.exists(compute_node_init_path):
        print('Uploading file', compute_node_init_path)
        sas_url = upload_blob_and_create_sas(
            block_blob_client,
            startup_blob_container_name,
            compute_node_init,
            compute_node_init_path,
            datetime.datetime.utcnow() + datetime.timedelta(hours=1))
    else:
        print(file_to_upload_path, 'could not be found!!')

    # Create batch pool with a container configuration and a startup task
    print('Defining image', docker_image_name, 'to be loaded from registry', acr_url)
    registry_conf = batch.models.ContainerRegistry(registry_server=acr_url,
                                                   user_name=acr_username,
                                                   password=acr_password)
    # This configuration defines the private registry and configures docker images to be pre-fetched
    linux_container_conf = batch.models.ContainerConfiguration(container_image_names=[docker_image_name], 
                                                               container_registries=[registry_conf])
    vmuser = batch.models.UserAccount(name=os_username, password=os_password, elevation_level='admin')
    # Env variables to be loaded to the init task
    env_storage_account = batch.models.EnvironmentSetting("AZURE_STORAGE_ACCOUNT", value=storage_account_name)
    env_storage_key = batch.models.EnvironmentSetting("AZURE_STORAGE_ACCESS_KEY", value=storage_account_key)
    env_test = batch.models.EnvironmentSetting("HELLO_WORLD", value='hello world!')
    # Command list to pass to the init task
    blobfuse_connection_path ='./' + blobfuse_connection_filename
    command_list=['cd $AZ_BATCH_TASK_WORKING_DIR',
                  #'export AZURE_STORAGE_ACCOUNT='+storage_account_name,
                  #'export AZURE_STORAGE_ACCESS_KEY='+storage_account_key,
                  'echo "accountName {0}" >{1}'.format(storage_account_name, blobfuse_connection_path),
                  'echo "accountKey {0}" >>{1}'.format(storage_account_key, blobfuse_connection_path),
                  'echo "containerName {0}" >>{1}'.format(blobfuse_container_mount, blobfuse_connection_path),
                  'chmod 600 ' + blobfuse_connection_path,
                  'whoami >./whoami.txt',
                  'printenv >./printenv.txt',
                  'chmod 755 ./' + compute_node_init,
                  'sudo ./' + compute_node_init]
    # User with elevated access (to be used in the startup task, required for sudo)
    user = batchmodels.UserIdentity(
        auto_user=batchmodels.AutoUserSpecification(
        elevation_level=batchmodels.ElevationLevel.admin,
        scope=batchmodels.AutoUserScope.task))
    # pool definition
    pool = batchmodels.PoolAddParameter(
        id=pool_id,
        virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
            image_reference=image_ref_to_use,
            container_configuration=linux_container_conf,
            node_agent_sku_id=sku_to_use),
        vm_size=vm_size,
        user_accounts=[vmuser],
        target_dedicated_nodes=vm_count,
        # Defines an init task
        start_task=batchmodels.StartTask(
            command_line=wrap_commands_in_shell('linux', command_list),
            environment_settings=[env_storage_account, env_storage_key],
            user_identity=user,
            resource_files=[batchmodels.ResourceFile(file_path=compute_node_init, blob_source=sas_url)]))
    create_pool_if_not_exist(batch_client, pool)

In [None]:
# Upload file to blob storage and create SAS 
def upload_blob_and_create_sas(
        block_blob_client, container_name, blob_name, file_name, expiry,
        timeout=None):
    """Uploads a file from local disk to Azure Storage and creates
    a SAS for it.
    :param block_blob_client: The storage block blob client to use.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the container to upload the blob to.
    :param str blob_name: The name of the blob to upload the local file to.
    :param str file_name: The name of the local file to upload.
    :param expiry: The SAS expiry time.
    :type expiry: `datetime.datetime`
    :param int timeout: timeout in minutes from now for expiry,
        will only be used if expiry is not specified
    :return: A SAS URL to the blob with the specified expiry time.
    :rtype: str
    """
    block_blob_client.create_container(
        container_name,
        fail_on_exist=False)

    block_blob_client.create_blob_from_path(
        container_name,
        blob_name,
        file_name)

    sas_token = create_sas_token(
        block_blob_client,
        container_name,
        blob_name,
        permission=azureblob.BlobPermissions.READ,
        expiry=expiry,
        timeout=timeout)

    sas_url = block_blob_client.make_blob_url(
        container_name,
        blob_name,
        sas_token=sas_token)

    return sas_url

In [None]:
# Create SAS for a storage blob
def create_sas_token(
        block_blob_client, container_name, blob_name, permission, expiry=None,
        timeout=None):
    """Create a blob sas token
    :param block_blob_client: The storage block blob client to use.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the container to upload the blob to.
    :param str blob_name: The name of the blob to upload the local file to.
    :param expiry: The SAS expiry time.
    :type expiry: `datetime.datetime`
    :param int timeout: timeout in minutes from now for expiry,
        will only be used if expiry is not specified
    :return: A SAS token
    :rtype: str
    """
    if expiry is None:
        if timeout is None:
            timeout = 30
        expiry = datetime.datetime.utcnow() + datetime.timedelta(
            minutes=timeout)
    return block_blob_client.generate_blob_shared_access_signature(
        container_name, blob_name, permission=permission, expiry=expiry)


In [None]:
# Select a valid VM SKU provided by the Azure Batch API
def select_latest_verified_vm_image_with_node_agent_sku(
        batch_client, publisher, offer, sku_starts_with):
    """Select the latest verified image that Azure Batch supports given
    a publisher, offer and sku (starts with filter).
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str publisher: vm image publisher
    :param str offer: vm image offer
    :param str sku_starts_with: vm sku starts with filter
    :rtype: tuple
    :return: (node agent sku id to use, vm image ref to use)
    """
    # get verified vm image list and node agent sku ids from service
    node_agent_skus = batch_client.account.list_node_agent_skus()
    # pick the latest supported sku
    skus_to_use = [
        (sku, image_ref) for sku in node_agent_skus for image_ref in sorted(
            sku.verified_image_references, key=lambda item: item.sku)
        if image_ref.publisher.lower() == publisher.lower() and
        image_ref.offer.lower() == offer.lower() and
        image_ref.sku.startswith(sku_starts_with)
    ]
    # skus are listed in reverse order, pick first for latest
    if len(skus_to_use)>0:
        sku_to_use, image_ref_to_use = skus_to_use[0]
        return (sku_to_use.id, image_ref_to_use)
    else:
        print("No SKUs matching the OS image:", publisher, offer, sku_starts_with)

In [None]:
# Create batch pool using an existing batch client
def create_pool_if_not_exist(batch_client, pool):
    """Creates the specified pool if it doesn't already exist
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param pool: The pool to create.
    :type pool: `batchserviceclient.models.PoolAddParameter`
    """
    try:
        print("Attempting to create pool:", pool.id)
        batch_client.pool.add(pool)
        print("Created pool:", pool.id)
    except batchmodels.BatchErrorException as e:
        if e.error.code != "PoolExists":
            raise
        else:
            print("Pool {!r} already exists".format(pool.id))

In [None]:
# Print pool info
def print_pool_info(pool_id):
    mypool = batch_client.pool.get(pool_id=pool_id)
    print('Some info about pool {0}:'.format(pool_id))
    print('- Images to download:', mypool.virtual_machine_configuration.container_configuration.container_image_names)
    print('- Container registry:', mypool.virtual_machine_configuration.container_configuration.container_registries[0].registry_server)
    print('- Container username:', mypool.virtual_machine_configuration.container_configuration.container_registries[0].user_name)
    print('- Container password:', mypool.virtual_machine_configuration.container_configuration.container_registries[0].password)
    print('- Pool nodes:', mypool.current_dedicated_nodes)
    if mypool.current_dedicated_nodes>0:
        nodes = batch_client.compute_node.list(pool_id, raw=False)
        for node in nodes:
            #print(node)    # Debug, uncomment to see the full object
            print('  *', node.id, 'is', node.state)
            if node.state == batchmodels.ComputeNodeState.idle:
                print('    Start task "{0}" -> Status: {1}, {2}'.format(node.start_task.command_line,node.start_task_info.state, node.start_task_info.result))
                if node.start_task_info.result == batchmodels.TaskExecutionResult.failure:
                    print('      Start task failure info: {0}, {1}, {2}, {3}'.format(node.start_task_info.failure_info.category,
                                                                                      node.start_task_info.failure_info.code,
                                                                                      node.start_task_info.failure_info.message,
                                                                                      node.start_task_info.failure_info.details))
                print('Startup task output:')
                print_startup_task_output(batch_client, node.id)
                node_connection = batch_client.compute_node.get_remote_login_settings(pool_id=pool_id, node_id=node.id)
                print('    Connection: "ssh {0}@{1} -p {2}"'.format(os_username, node_connection.remote_login_ip_address, node_connection.remote_login_port))
            if node.errors:
                for error in node.errors:
                    print('    Error:', error.code, '-', error.message)
            else:
                print('    No errors :)')


In [None]:
# Waits until the compute nodes in the pool are idle/unusable/failed
def wait_for_pool_to_complete(batch_client, pool_id, timeout):
    """Waits for nodes in a pool to be operational.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str job_id: The id of the job to monitor.
    :param timeout: The maximum amount of time to wait.
    :type timeout: `datetime.timedelta`
    """
    time_to_timeout_at = datetime.datetime.now() + timeout

    while datetime.datetime.now() < time_to_timeout_at:
        mypool = batch_client.pool.get(pool_id=pool_id)
        print("Pool", pool_id, "has", mypool.current_dedicated_nodes, "current dedicated nodes")
        if mypool.current_dedicated_nodes>0:
            nodes = batch_client.compute_node.list(pool_id, raw=False)
            starting_nodes=0
            running_nodes=0
            waiting_nodes=0
            failed_nodes=0
            idle_nodes=0
            unusable_nodes=0
            for node in nodes:
                if node.state==batchmodels.ComputeNodeState.starting:
                    starting_nodes+=1
                elif node.state==batchmodels.ComputeNodeState.running:
                    running_nodes+=1
                elif node.state==batchmodels.ComputeNodeState.waiting_for_start_task:
                    waiting_nodes+=1
                elif node.state==batchmodels.ComputeNodeState.start_task_failed:
                    failed_nodes+=1
                elif node.state==batchmodels.ComputeNodeState.idle:
                    idle_nodes+=1
                elif node.state==batchmodels.ComputeNodeState.unusable:
                    unusable_nodes+=1
            print(' *', starting_nodes, "starting nodes -",
                  waiting_nodes, "waiting-for-start-task nodes -",
                  running_nodes, "running nodes -",
                  failed_nodes, "failed nodes -",
                  unusable_nodes, "unusable nodes -",
                  idle_nodes, "idle nodes")
            if (running_nodes+failed_nodes+idle_nodes+unusable_nodes)>=mypool.current_dedicated_nodes:
                print('Pool', pool_id, 'is ready')
                return
        time.sleep(30)

    raise TimeoutError("Timed out waiting for pool to be deployed")

In [None]:
# Check pool status and wait until it does not exist
def wait_for_pool_to_be_deleted(pool_id):
    timeout=datetime.timedelta(minutes=10)
    time_to_timeout_at = datetime.datetime.now() + timeout
    print("Checking status for pool", pool_id, "...")
    while datetime.datetime.now() < time_to_timeout_at:
        pools = batch_client.pool.list()
        poolExists=False
        for pool in pools:
            if pool.id == pool_id:
                poolExists=True
                mypool = batch_client.pool.get(pool_id=pool_id)
                print("Status for pool", pool_id, "->", mypool.state)
        if not poolExists:
            print('Pool', pool_id, "does not exist")
            break
        time.sleep(30)

# Job and Task Creation

These functions help with the creation of a job and task:

* First, `container_enabled_pool` helps detecting whether a pool is container-enabled or not. The reason is because container-enabled pools only support container-based tasks.
* `submit_job_and_add_bash_task` creates a job with an unique name and a bash-based task inside of that job. Only works in non-container-enabled pools
* `submit_job_and_add_container_task` creates a job with an unique name and a bash-based task inside of that job. Only works in container-enabled pools
* `generate_unique_resource_name` is used to generate an unique ID for the job ID, which uses the current date and time as parts of the name
* `print_task_output` will print stderr and stdout from a provided list of task IDs, uses `read_task_file_as_string`
* `print_startup_task_output` will print stderr and stdout for the startup task of a given compute node ID, uses `read_node_file_as_string`
* Both leverage the function `_read_stream_as_string` to stream the contents of a file in a compute node to the output of the jupyter notebook
* `wait_for_tasks_to_complete` waits until all tasks have the status of complete

In [None]:
# Returns true if the pool has containerSettings defined
# Bash-tasks cannot run on container-enabled pools,
#   they give the error "Task failed "Container-enabled compute node requires task container settings"
def container_enabled_pool(pool_id):
    mypool = batch_client.pool.get(pool_id=pool_id)
    try:
        containerConfig=mypool.virtual_machine_configuration.container_configuration
        if containerConfig.type == 'docker':
            return True
        else:
            return False
    except:
        return False

In [None]:
# Create job and task with a bash command
def submit_job_and_add_bash_task(batch_client, block_blob_client, job_id, pool_id, command_list):
    """Submits a job to the Azure Batch service and adds
    a task that runs a python script.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param block_blob_client: The storage block blob client to use.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str job_id: The id of the job to create.
    :param str pool_id: The id of the pool to use.
    """
    # Add job to pool
    print('Adding job...')
    job = batchmodels.JobAddParameter(
        id=job_id,
        pool_info=batchmodels.PoolInformation(pool_id=pool_id))
    batch_client.job.add(job)
    # Create a blob container, upload a file, and add a SAS
    print('Creating storage container {0}...'.format(task_blob_container_name))
    block_blob_client.create_container(
        task_blob_container_name,
        fail_on_exist=False)
    sas_url = upload_blob_and_create_sas(
        block_blob_client,
        task_blob_container_name,
        task_file_to_upload,
        task_file_to_upload_path,
        datetime.datetime.utcnow() + datetime.timedelta(hours=1))
    print('Creating bash task')
    task = batchmodels.TaskAddParameter(
        id=batch_bash_task_name,
        command_line=wrap_commands_in_shell('linux', command_list))
        #resource_files=[batchmodels.ResourceFile(file_path=task_file_to_upload_path, blob_source=sas_url)])
    batch_client.task.add(job_id=job.id, task=task)


# Job/Task creation

Creates a container task with different container settings

### Features

* Can run multiple commands at once
* Can override default Batch settings back to the default container settings, such as CMD or WORKDIR
* Can upload to the task working dir specific files stored in blob storage

### Problems

* The -v flag does not seem to work with blobfuse mounts, maybe due to the 770 permissions that blobfuse configures. Maybe trying with this would help? https://docs.microsoft.com/en-us/python/api/azure-batch/azure.batch.models.outputfileblobcontainerdestination?view=azure-python

### To Do

* Test this: https://docs.microsoft.com/en-us/python/api/azure-batch/azure.batch.models.outputfileblobcontainerdestination?view=azure-python

In [None]:
# Create task with a Docker container (requires a container-enabled pool)
def submit_job_and_add_container_task(batch_client, block_blob_client, job_id, pool_id, command_list):
    """Submits a job to the Azure Batch service and adds
    a task that runs a python script.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param block_blob_client: The storage block blob client to use.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str job_id: The id of the job to create.
    :param str pool_id: The id of the pool to use.
    """
    # Add job to pool
    print('Adding job...')
    job = batchmodels.JobAddParameter(
        id=job_id,
        pool_info=batchmodels.PoolInformation(pool_id=pool_id))
    batch_client.job.add(job)
    # Create a blob container, upload a file, and add a SAS
    print('Creating storage container {0}...'.format(task_blob_container_name))
    block_blob_client.create_container(
        task_blob_container_name,
        fail_on_exist=False)
    sas_url = upload_blob_and_create_sas(
        block_blob_client,
        task_blob_container_name,
        task_file_to_upload,
        task_file_to_upload_path,
        datetime.datetime.utcnow() + datetime.timedelta(hours=1))
    # Explicitly allow for an empty command list, in which case the default container CMD would be run
    if len(command_list)>0:
        command = wrap_commands_in_shell('linux', command_list)
    else:
        command = ''
    # Define container settings for the task (note that the registry name is all lower case!!!!)
    task_container_settings = batch.models.TaskContainerSettings(
        image_name=docker_image_name.lower(), 
        #container_run_options='--rm --workdir=""')
        container_run_options='-v /mnt/blobdrive:/mnt/blobdrive:ro')
        #container_run_options='--rm')
    # Add a container-type task with the previous settings
    print('Creating task with Docker image {0}...'.format(docker_image_name.lower()))
    task = batch.models.TaskAddParameter(
        id=batch_task_name,
        command_line=command,
        container_settings=task_container_settings)
    batch_client.task.add(job_id=job.id, task=task)


In [None]:
# Print stdout and stderr files from a given task list
def print_task_output(batch_client, job_id, task_ids, encoding=None):
    """Prints the stdout and stderr for each task specified.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str job_id: The id of the job to monitor.
    :param task_ids: The collection of tasks to print the output for.
    :type task_ids: `list`
    :param str encoding: The encoding to use when downloading the file.
    """
    for task_id in task_ids:
        #stdout
        try:
            file_text = read_task_file_as_string(
                batch_client,
                job_id,
                task_id,
                stdout_filename,
                encoding)

            print("{} content for task {}: ".format(stdout_filename, task_id))
            print(file_text)
        except:
            print('I could not find stdout file', stdout_filename, 'for task', task_id)
        #stderr
        try:
            file_text = read_task_file_as_string(
                batch_client,
                job_id,
                task_id,
                stderr_filename,
                encoding)
            print("{} content for task {}: ".format(
                stderr_filename,
                task_id))
            print(file_text)
        except:
            print('I could not find stderr file', stderr_filename, 'for task', task_id)


In [None]:
# Print stdout and stderr for the startup task of a compute node
# It seems that sometimes it cannot find one or both of the files, for some reason
def print_startup_task_output(batch_client, node_id, encoding=None):
    """Prints the stdout and stderr for the startup task of each specified node.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str job_id: The id of the job to monitor.
    :param task_ids: The collection of tasks to print the output for.
    :type task_ids: `list`
    :param str encoding: The encoding to use when downloading the file.
    """
    #stdout
    task_stdout_filename=os.path.join('startup', stdout_filename)
    task_stderr_filename=os.path.join('startup', stderr_filename)
    try:
        file_text = read_node_file_as_string(
            batch_client,
            node_id,
            task_stdout_filename,
            encoding)

        print("{} content for startup task in node {}: ".format(task_stdout_filename, node_id))
        print(file_text)
    except:
        print('I could not find stdout file', task_stdout_filename, 'for startup task in node', node_id)
    #stderr
    try:
        file_text = read_node_file_as_string(
            batch_client,
            node_id,
            task_stderr_filename,
            encoding)
        print("{} content for startup task in node {}: ".format(task_stderr_filename, task_id))
        print(file_text)
    except:
        print('I could not find stderr file', task_stderr_filename, 'for startup task in node', node_id)


In [None]:
# Get a file from a task and return a stream
def read_task_file_as_string(
    batch_client, job_id, task_id, file_name, encoding=None):
    """Reads the specified file as a string.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str job_id: The id of the job.
    :param str task_id: The id of the task.
    :param str file_name: The name of the file to read.
    :param str encoding: The encoding of the file. The default is utf-8.
    :return: The file content.
    :rtype: str
    """
    stream = batch_client.file.get_from_task(job_id, task_id, file_name)
    return _read_stream_as_string(stream, encoding)

In [None]:
# Get a file from a compute node and return a stream
def read_node_file_as_string(
    batch_client, node_id, file_name, encoding=None):
    """Reads the specified file as a string.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str job_id: The id of the job.
    :param str task_id: The id of the task.
    :param str file_name: The name of the file to read.
    :param str encoding: The encoding of the file. The default is utf-8.
    :return: The file content.
    :rtype: str
    """
    stream = batch_client.file.get_from_compute_node(pool_id, node_id, file_name)
    return _read_stream_as_string(stream, encoding)

In [None]:
# Create a file stream, this function is used by read_task_file_as_string and read_node_file_as_string
def _read_stream_as_string(stream, encoding):
    """Read stream as string
    :param stream: input stream generator
    :param str encoding: The encoding of the file. The default is utf-8.
    :return: The file content.
    :rtype: str
    """
    output = io.BytesIO()
    try:
        for data in stream:
            output.write(data)
        if encoding is None:
            encoding = 'utf-8'
        return output.getvalue().decode(encoding)
    finally:
        output.close()
    raise RuntimeError('could not write data to stream or decode bytes')

In [None]:
# Wait until all tasks have the status TaskState.completed
def wait_for_tasks_to_complete(batch_client, job_id, timeout):
    """Waits for all the tasks in a particular job to complete.
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str job_id: The id of the job to monitor.
    :param timeout: The maximum amount of time to wait.
    :type timeout: `datetime.timedelta`
    """
    time_to_timeout_at = datetime.datetime.now() + timeout

    while datetime.datetime.now() < time_to_timeout_at:
        print("Checking if all tasks are complete...")
        tasks = batch_client.task.list(job_id)
        incomplete_tasks = [task for task in tasks if
                            task.state != batchmodels.TaskState.completed]
        if not incomplete_tasks:
            return
        time.sleep(10)
    raise TimeoutError("Timed out waiting for tasks to complete")

In [None]:
# Used to create unique job IDs
def generate_unique_resource_name(resource_prefix):
    """Generates a unique resource name by appending a time
    string after the specified prefix.
    :param str resource_prefix: The resource prefix to use.
    :return: A string with the format "resource_prefix-<time>".
    :rtype: str
    """
    return resource_prefix + "-" + \
        datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")

# Main

This is the main routine, that comprises three steps:
1. Pool creation
2. Job/task creation (only container-based tasks will work on a container-enabled pool)
3. Cleanup, either using some conditional variables or unconditionally (to be sure that no resources persist)

In [None]:
# Create a batch client, that will be used throughout the rest of the notebook to operate Azure Batch
print("Creating Azure Batch client...")
credentials = batchauth.SharedKeyCredentials(
    batch_account_name,
    batch_account_key)
batch_client = batch.BatchServiceClient(
    credentials,
    base_url=batch_service_url)
# Retry 5 times -- default is 3
batch_client.config.retry_policy.retries = 5

print("Creating Azure Storage client...")
block_blob_client = azureblob.BlockBlobService(
    account_name=storage_account_name,
    account_key=storage_account_key,
    endpoint_suffix=storage_account_suffix)

print ("Clients created successfully")

In [None]:
# Create container-based pool
pool_id = batch_pool_name
print("Creating Azure Batch pool", pool_id, "for Docker containers...")
create_container_pool(
    batch_client,
    block_blob_client,
    pool_id,
    pool_vm_size,
    pool_vm_count)
wait_for_pool_to_complete(batch_client, pool_id, datetime.timedelta(minutes=10))
print_pool_info(pool_id)

In [None]:
# Container-based task (only works for container-enabled pools)
# Learnings:
# - Tasks run per default under the user '_azbatch', and have access to each other's files
# - Tasks run in the working dir '/mnt/batch/tasks/workitems/{job_id}/job-1/{task_id}/wd'
# - The directory /mnt/batch is mounted in the container automatically by Azure Batch
if container_enabled_pool(pool_id):   
    command_list = ['pwd', 'whoami', 'ls -al /mnt/', 'ls -al /mnt/blobdrive', 'echo hello >/mnt/blobdrive/helloworld.txt']
    job_id = generate_unique_resource_name(batch_job_name)
    print('Submitting Azure Batch job', job_id, '- Command list is ->', command_list)
    submit_job_and_add_container_task(
        batch_client,
        block_blob_client,
        job_id, pool_id, command_list)
    print("Waiting for tasks to complete...")
    wait_for_tasks_to_complete(
        batch_client,
        job_id,
        datetime.timedelta(minutes=25))
    tasks = batch_client.task.list(job_id)
    task_ids = [task.id for task in tasks]
    print_task_output(batch_client, job_id, task_ids)
else:
    print('Container tasks can only run on container-enabled pools, but {0} does not seem to have a container configuration'.format(pool_id))

# Docker settings in a container-enabled task

Here you can see some the output (truncated for readability) of the docker inspect <container_name> for a container-enabled task:

Notice the following:

* Host binds: all of /mnt/batch/tasks (RW, including other tasks' files), /etc/passwd (RO), /etc/group (RO), /etc/sudoers (RO)
* You can add additional volumes with the Docker -v option when launching the task. See here for more info: https://docs.docker.com/storage/bind-mounts/ 
* Env variables
* Working dir (you can override it with specific container settings in the task, if you want to use the default container wd)
* Command (you can override it with specific container settings in the task, if you want to use the default container CMD)
* Networking is the default Docker networking

```
[
    {
        "Id": "906199ee65d7fa7ae36c2213a5f2d4817cd3abb2132fb9a6882b5ceec2095d7e",
        "Created": "2018-07-25T07:45:45.613969371Z",
        "Path": "/bin/bash",
        "Args": [
            "-c",
            "set -e; set -o pipefail; pwd;whoami >./whoami.txt;ls /mnt/;sleep 300; wait"
        ],
        "Image": "sha256:f4b156f12e76a7e364de18c8869131a36cd562dba984ccee47d84c121d16c7a9",
        "ResolvConfPath": "/mnt/docker/containers/906199ee65d7fa7ae36c2213a5f2d4817cd3abb2132fb9a6882b5ceec2095d7e/resolv.conf",
        "HostnamePath": "/mnt/docker/containers/906199ee65d7fa7ae36c2213a5f2d4817cd3abb2132fb9a6882b5ceec2095d7e/hostname",
        "HostsPath": "/mnt/docker/containers/906199ee65d7fa7ae36c2213a5f2d4817cd3abb2132fb9a6882b5ceec2095d7e/hosts",
        "Driver": "overlay2",
        "Platform": "linux",
        "MountLabel": "",
        "ProcessLabel": "",
        "AppArmorProfile": "docker-default",
        "HostConfig": {
            "Binds": [
                "/mnt/batch/tasks:/mnt/batch/tasks:rw",
                "/etc/passwd:/etc/passwd:ro",
                "/etc/group:/etc/group:ro",
                "/etc/sudoers:/etc/sudoers:ro"
            ],
        },
        "Mounts": [
            {
                "Type": "bind",
                "Source": "/mnt/batch/tasks",
                "Destination": "/mnt/batch/tasks",
                "Mode": "rw",
                "RW": true,
                "Propagation": "rprivate"
            },
            {
                "Type": "bind",
                "Source": "/etc/passwd",
                "Destination": "/etc/passwd",
                "Mode": "ro",
                "RW": false,
                "Propagation": "rprivate"
            },
            {
                "Type": "bind",
                "Source": "/etc/group",
                "Destination": "/etc/group",
                "Mode": "ro",
                "RW": false,
                "Propagation": "rprivate"
            },
            {
                "Type": "bind",
                "Source": "/etc/sudoers",
                "Destination": "/etc/sudoers",
                "Mode": "ro",
                "RW": false,
                "Propagation": "rprivate"
            }
        ],
        "Config": {
            "Hostname": "906199ee65d7",
            "Domainname": "",
            "User": "1000:1000",
            "AttachStdin": false,
            "AttachStdout": true,
            "AttachStderr": true,
            "Tty": false,
            "OpenStdin": false,
            "StdinOnce": false,
            "Env": [
                "AZ_BATCH_NODE_SHARED_DIR=/mnt/batch/tasks/shared",
                "AZ_BATCH_ACCOUNT_NAME=batcherjosito",
                "AZ_BATCH_TASK_USER_IDENTITY=PoolNonAdmin",
                "AZ_BATCH_JOB_ID=myJob-20180725-074543",
                "AZ_BATCH_TASK_USER=_azbatch",
                "AZ_BATCH_TASK_WORKING_DIR=/mnt/batch/tasks/workitems/myJob-20180725-074543/job-1/myDockerTask/wd",
                "AZ_BATCH_POOL_ID=mycomputepool",
                "AZ_BATCH_NODE_ID=tvm-2803204300_1-20180725t062808z",
                "AZ_BATCH_TASK_ID=myDockerTask",
                "AZ_BATCH_ACCOUNT_URL=https://batcherjosito.westeurope.batch.azure.com/",
                "AZ_BATCH_NODE_STARTUP_DIR=/mnt/batch/tasks/startup",
                "AZ_BATCH_CERTIFICATES_DIR=/mnt/batch/tasks/workitems/myJob-20180725-074543/job-1/myDockerTask/certs",
                "AZ_BATCH_TASK_DIR=/mnt/batch/tasks/workitems/myJob-20180725-074543/job-1/myDockerTask",
                "AZ_BATCH_NODE_IS_DEDICATED=true",
                "AZ_BATCH_NODE_ROOT_DIR=/mnt/batch/tasks",            
                "PATH=/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
                "LANG=C.UTF-8",
                "GPG_KEY=0D96DF4D4110E5C43FBFB17F2D347EA6AA65421D",
                "PYTHON_VERSION=3.7.0",
                "PYTHON_PIP_VERSION=10.0.1"
            ],
            "Cmd": [
                "/bin/bash",
                "-c",
                "set -e; set -o pipefail; pwd;whoami >./whoami.txt;ls /mnt/;sleep 300; wait"
            ],
            "Image": "erjositoacr.azurecr.io/helloworld:2.1",
            "Volumes": {
                "/etc/group": {},
                "/etc/passwd": {},
                "/etc/sudoers": {},
                "/mnt/batch/tasks": {}
            },
            "WorkingDir": "/mnt/batch/tasks/workitems/myJob-20180725-074543/job-1/myDockerTask/wd",
            "Entrypoint": null,
            "OnBuild": null,
            "Labels": {
                "batchtaskid": "batcherjosito 22F442DAB2831FFB$myjob-20180725-074543 22F43673538A9979$job-1$mydockertask$0"
            }
        },
        "NetworkSettings": {
            "Bridge": "",
            "SandboxID": "839d9da52100d83269df1f241d05d1e90d50f3bf0cb2980a007a315d44a932dd",
            "HairpinMode": false,
            "LinkLocalIPv6Address": "",
            "LinkLocalIPv6PrefixLen": 0,
            "Ports": {},
            "SandboxKey": "/var/run/docker/netns/839d9da52100",
            "SecondaryIPAddresses": null,
            "SecondaryIPv6Addresses": null,
            "EndpointID": "9a9d9ee42dfd804334c2fe6622d7d70b75010c84e3c57dad5cee31b707baabc0",
            "Gateway": "172.17.0.1",
            "GlobalIPv6Address": "",
            "GlobalIPv6PrefixLen": 0,
            "IPAddress": "172.17.0.2",
            "IPPrefixLen": 16,
            "IPv6Gateway": "",
            "MacAddress": "02:42:ac:11:00:02",
            "Networks": {
                "bridge": {
                    "IPAMConfig": null,
                    "Links": null,
                    "Aliases": null,
                    "NetworkID": "f8a3cc56b2f72abad3ab24fc2a5a5c70db03efe47ddb39ba1fc473ac457a3faf",
                    "EndpointID": "9a9d9ee42dfd804334c2fe6622d7d70b75010c84e3c57dad5cee31b707baabc0",
                    "Gateway": "172.17.0.1",
                    "IPAddress": "172.17.0.2",
                    "IPPrefixLen": 16,
                    "IPv6Gateway": "",
                    "GlobalIPv6Address": "",
                    "GlobalIPv6PrefixLen": 0,
                    "MacAddress": "02:42:ac:11:00:02",
                    "DriverOpts": null
                }
            }
        }
    }
]    
```

In [None]:
# Bash-based task (only works for non-container-enabled pools)
if not container_enabled_pool(pool_id):
    command_line_list= ['printenv >./printenv.txt', 'whoami >./whoami.txt', 'ls -a $AZ_BATCH_TASK_WORKING_DIR/*']
    print('Submitting Azure Batch job with bash commands')
    job_id = generate_unique_resource_name(batch_job_name)
    submit_job_and_add_bash_task(batch_client, block_blob_client, job_id, pool_id, command_line_list)
    print("Waiting for tasks to complete...")
    wait_for_tasks_to_complete(
        batch_client,
        job_id,
        datetime.timedelta(minutes=25))
    tasks = batch_client.task.list(job_id)
    task_ids = [task.id for task in tasks]
    print_task_output(batch_client, job_id, task_ids)
else:
    print('Bash tasks cannot run on container-enabled pools, {0} seems to have a container configuration'.format(pool_id))


In [None]:
# Conditional clean up
if should_delete_container:
    block_blob_client.delete_container(blob_container_name, fail_not_exist=False)
if should_delete_job:
    print("Deleting job: ", job_id)
    batch_client.job.delete(job_id)
if should_delete_pool:
    print("Deleting pool: ", pool_id)
    batch_client.pool.delete(pool_id)

## Unconditional cleanup

Do this to stop all costs (delete the pool)

In [None]:
# Unconditional clean up
# Blob storage
print('Deleting blob containers', startup_blob_container_name, 'and', task_blob_container_name)
block_blob_client.delete_container(startup_blob_container_name, fail_not_exist=False)
block_blob_client.delete_container(task_blob_container_name, fail_not_exist=False)
# Delete job, if it exists
print("Deleting jobs")
jobs = batch_client.job.list()
try:
    for job in jobs: 
        if job.id == job_id:
            print("Deleting batch job", job_id)
            batch_client.job.delete(job_id)
# If the try block does not work, it is probably because the variable job_id does not exist
except:
    print('  * Looks like there were no jobs to delete')
# Delete pool
print("Deleting batch pool", pool_id)
try:
    batch_client.pool.delete(pool_id)
    wait_for_pool_to_be_deleted(pool_id)
except:
    print('  * Looks like the pool', pool_id, 'does not exist any more')