In [1]:
# Imports

import azure.storage.blob as azureblob
import azure.batch.models as batchmodels
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batchauth
import datetime
import re
import os
import sys
import time

In [14]:
def find_files(directory, extension):
    files = []
    for file in os.listdir(directory):
        if file.endswith(extension):
            files.append(os.path.abspath(os.path.join(directory, file)))
    return sorted(files)


def get_container_sas_token(block_blob_client, container_name, blob_permissions):
    """
    Obtains a shared access signature granting the specified permissions to the container.

    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :param BlobPermissions blob_permissions:
    :rtype: str
    :return: A SAS token granting the specified permissions to the container.
    """
    # Obtain the SAS token for the container, setting the expiry time and permissions. In this case, no start time is specified, so the shared access signature becomes valid immediately. Expiration is in 2 hours.
    container_sas_token = block_blob_client.generate_container_shared_access_signature(container_name, permission=blob_permissions, expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2))

    return container_sas_token


def get_container_sas_url(block_blob_client, container_name):
    """
    Obtains a shared access signature URL that provides access to the ouput container to which the tasks will upload their output.

    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :rtype: str
    :return: A SAS URL granting the specified permissions to the container.
    """
    # Obtain the SAS token for the container.
    sas_token = get_container_sas_token(block_blob_client, container_name, azureblob.BlobPermissions(read=True, write=True))

    # Construct SAS URL for the container
    container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format(_STORAGE_ACCOUNT_NAME, container_name, sas_token)

    return container_sas_url


def upload_file_to_container(block_blob_client, container_name, file_path):
    """
    Uploads a local file to an Azure Blob storage container.

    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :param str file_path: The local path to the file.
    :rtype: `azure.batch.models.ResourceFile`
    :return: A ResourceFile initialized with a SAS URL appropriate for Batch
    tasks.
    """
    blob_name = os.path.basename(file_path)

    print('[{1:}] < {0:}'.format(os.path.relpath(file_path), container_name))

    block_blob_client.create_blob_from_path(container_name, blob_name, file_path)

    # Obtain the SAS token for the container.
    sas_token = get_container_sas_token(block_blob_client, container_name, azureblob.BlobPermissions.READ)

    sas_url = block_blob_client.make_blob_url(container_name, blob_name, sas_token=sas_token)

    return batchmodels.ResourceFile(file_path=blob_name, blob_source=sas_url)


def wrap_commands_in_shell(ostype, commands):
    """
    Wrap commands in a shell

    :param list commands: list of commands to wrap
    :param str ostype: OS type, linux or windows
    :rtype: str
    :return: a shell wrapping commands
    """
    if ostype.lower() == "linux":
        return "/bin/bash -c \"set -e; set -o pipefail; {0:}; wait\"".format(";".join(commands))
    elif ostype.lower() == "windows":
        return "cmd.exe /c {0:}".format("&".join(commands))
    else:
        raise ValueError("unknown ostype: {}".format(ostype))
        

def select_latest_verified_vm_image_with_node_agent_sku(batch_client, publisher, offer, sku_starts_with):
    """
    Select the latest verified image that Azure Batch supports given a publisher, offer and sku (starts with filter).
    :param batch_client: The batch client to use.
    :type batch_client: `batchserviceclient.BatchServiceClient`
    :param str publisher: vm image publisher
    :param str offer: vm image offer
    :param str sku_starts_with: vm sku starts with filter
    :rtype: tuple
    :return: (node agent sku id to use, vm image ref to use)
    """
    # get verified vm image list and node agent sku ids from service
    node_agent_skus = batch_client.account.list_node_agent_skus()
    # pick the latest supported sku
    skus_to_use = [
        (sku, image_ref) for sku in node_agent_skus for image_ref in sorted(
            sku.verified_image_references, key=lambda item: item.sku)
        if image_ref.publisher.lower() == publisher.lower() and
        image_ref.offer.lower() == offer.lower() and
        image_ref.sku.startswith(sku_starts_with)
    ]
    # skus are listed in reverse order, pick first for latest
    sku_to_use, image_ref_to_use = skus_to_use[0]
    return (sku_to_use.id, image_ref_to_use)

        
def print_batch_exception(batch_exception):
    """
    Prints the contents of the specified Batch exception.
    :param batch_exception:
    """
    print('-------------------------------------------')
    print('Exception encountered:')
    if (batch_exception.error and batch_exception.error.message and
            batch_exception.error.message.value):
        print(batch_exception.error.message.value)
        if batch_exception.error.values:
            print()
            for mesg in batch_exception.error.values:
                print('{}:\t{}'.format(mesg.key, mesg.value))
    print('-------------------------------------------')

In [3]:
# Global keys

_BATCH_ACCOUNT_NAME = "climatebasedbatch"
_BATCH_ACCOUNT_KEY = "W94ukoxG2neFkk6teOVZ3IQ8IQjmPJqPcFq48I9lLzCrPEQSRFS/+euaUEkkSyPoulUgnx5IEZxztA9574Hluw=="
_BATCH_ACCOUNT_URL = "https://climatebasedbatch.westeurope.batch.azure.com"

_STORAGE_ACCOUNT_NAME = "radfiles"
_STORAGE_ACCOUNT_KEY = "aRRVzOkO/kwS35CIwNVIa18aGoMfZD5D3yAy3GlorkkU2G+9q5rAscXoC21IIylJZerBefwMgxYYF3qzquALrw=="

_POOL_ID = "batchpool"
_MIN_POOL_NODE = 1
_MAX_POOL_NODE = 100

_POOL_VM_SIZE = 'BASIC_A1'
_NODE_OS_PUBLISHER = 'Canonical'
_NODE_OS_OFFER = 'UbuntuServer'
_NODE_OS_SKU = '16'

_JOB_ID = "0000000-testjob-3513"
_JOB_DIR = "./case"

_LB_HB = "./lb_hb.tar.gz"
_RADIANCE = "./radiance-5.1.0-Linux.tar.gz"
_COPYBLOB = "./copy_to_blob.py"
_SCRIPT = "./RunHoneybeeRadiance.py"

In [6]:
# Create the blob client, for use in obtaining references to blob storage containers and uploading files to containers
blob_client = azureblob.BlockBlobService(account_name=_STORAGE_ACCOUNT_NAME, account_key=_STORAGE_ACCOUNT_KEY)

# Create a job container
blob_client.create_container(_JOB_ID, fail_on_exist=False)
print('[{}] blob container created.'.format(_JOB_ID))

# Create a common processing files container
blob_client.create_container("0000000-common", fail_on_exist=False)
print('[{}] blob container created.'.format("0000000-common"))

Blob client created
[0000000-testjob-3513] blob container created.
[0000000-common] blob container created.


In [17]:
# Upload case files to the blob

_SURFACES_FILEPATH = os.path.abspath(os.path.join(_JOB_DIR, "surfaces.json"))
surfaces_file = upload_file_to_container(blob_client, _JOB_ID, _SURFACES_FILEPATH)

_SKY_MTX_FILEPATH = os.path.abspath(os.path.join(_JOB_DIR, "sky_mtx.json"))
sky_mtx_file = upload_file_to_container(blob_client, _JOB_ID, _SKY_MTX_FILEPATH)

_ANALYSIS_GRIDS_FILEPATHS = find_files(os.path.join(_JOB_DIR, "AnalysisGrids"), "json")
analysis_grid_files = [upload_file_to_container(blob_client, _JOB_ID, file_path) for file_path in _ANALYSIS_GRIDS_FILEPATHS]

# Get a number of files to be processed
_POOL_NODE_COUNT = len(analysis_grid_files)

[0000000-testjob-3513] < case\surfaces.json
[0000000-testjob-3513] < case\sky_mtx.json
[0000000-testjob-3513] < case\AnalysisGrids\zone1.json
[0000000-testjob-3513] < case\AnalysisGrids\zone2.json


In [8]:
# Upload the processing files to the blob
radiance_file = upload_file_to_container(blob_client, "0000000-common", _RADIANCE)
lb_hb_file = upload_file_to_container(blob_client, "0000000-common", _LB_HB)
run_process_file = upload_file_to_container(blob_client, "0000000-common", _SCRIPT)

[0000000-common] < radiance-5.1.0-Linux.tar.gz
[0000000-common] < lb_hb.tar.gz
[0000000-common] < RunHoneybeeRadiance.py


In [13]:
# Get blob read/write credentials
output_container_sas_url = get_container_sas_url(blob_client, _JOB_ID)
print("Output container SAS url created:\n{0:}".format(output_container_sas_url))

Output container SAS url created:
https://radfiles.blob.core.windows.net/0000000-testjob-3513?se=2018-10-25T16%3A27%3A30Z&sp=rw&sv=2017-04-17&sr=c&sig=SFjd/rcBGCi0IOy%2BE2OaYllLPjnX6is9BqKyuj%2BnddE%3D


In [20]:
# Create a Batch service client. We'll now be interacting with the Batch service in addition to Storage
credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME, _BATCH_ACCOUNT_KEY)
batch_client = batch.BatchServiceClient(credentials, base_url=_BATCH_ACCOUNT_URL)

# Create a pool ready to spin up some nodes
start_commands = [
    # Create a node with radiance, honeybee, the processing script anfd the copy to blob script available
    "touch ./do_i_exist.txt",
    "cp -p do_i_exist.txt $AZ_BATCH_NODE_SHARED_DIR",
#     "sudo apt-get update",
#     "apt-get install rsync"
#     "cp -p radiance-5.1.0-Linux.tar.gz",
#     "tar xzf radiance-5.1.0-Linux.tar.gz",
#     "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/bin/ /usr/local/bin/",
#     "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/lib/ /usr/local/lib/ray/",
#     "cp -p lb_hb.tar.gz $AZ_BATCH_NODE_SHARED_DIR"
#     "tar xzf lb_hb.tar.gz",
]

resources = [
    radiance_file,
    lb_hb_file,
    run_process_file,
    surfaces_file,
    sky_mtx_file,
]

# Get the node agent SKU and image reference for the virtual machine configuration.
sku_to_use, image_ref_to_use = select_latest_verified_vm_image_with_node_agent_sku(batch_client, _NODE_OS_PUBLISHER, _NODE_OS_OFFER, _NODE_OS_SKU)

# Specify the user permissions and level
user = batchmodels.AutoUserSpecification(scope=batchmodels.AutoUserScope.pool, elevation_level=batchmodels.ElevationLevel.admin)

# Define the start task for the pool
start_task = batch.models.StartTask(
    command_line=wrap_commands_in_shell("linux", start_commands),
    user_identity=batchmodels.UserIdentity(auto_user=user),
    wait_for_success=True,
    resource_files=resources
)

# Define the pool
new_pool = batch.models.PoolAddParameter(
    id=_POOL_ID,
    virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
        image_reference=image_ref_to_use,
        node_agent_sku_id=sku_to_use
    ),
    vm_size=_POOL_VM_SIZE,
    enable_auto_scale=True,
    auto_scale_formula='pendingTaskSamplePercent =$PendingTasks.GetSamplePercent(180 * TimeInterval_Second);pendingTaskSamples = pendingTaskSamplePercent < 70 ? 1 : avg($PendingTasks.GetSample(180 * TimeInterval_Second)); $TargetDedicatedNodes = min(100, pendingTaskSamples);', 
    auto_scale_evaluation_interval=datetime.timedelta(minutes=5),
    start_task=start_task,
)

# Try to create the pool, and tell us why not
try:
    batch_client.pool.add(new_pool)
except batchmodels.batch_error.BatchErrorException as err:
    print_batch_exception(err)
    raise
    
print('[{0:}] pool created'.format(_POOL_ID))

[batchpool] pool created


In [None]:
# Create the job to which tasks will be assigned

batch_client.job.add(batch.models.JobAddParameter(_JOB_ID, batch.models.PoolInformation(pool_id=_POOL_ID)))
print('[{}] job created...'.format(_JOB_ID))

In [None]:
# Add tasks to the job




tasks = []
for idx, analysis_grid_file in enumerate(analysis_grid_files):
    grid_file_path = analysis_grid_file.file_path
    sky_mtx_file_path = sky_mtx_file.file_path
    surfaces_file_path = surfaces_file.file_path
    results_file_path = grid_file_path.replace(".json", "_result.json")

    commands = [
        "apt-get update", "apt-get install wget rsync"
        "wget https://github.com/NREL/Radiance/releases/download/5.1.0/radiance-5.1.0-Linux.tar.gz",
        "tar xzf radiance-5.1.0-Linux.tar.gz",
        "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/bin/ /usr/local/bin/",
        "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/lib/ /usr/local/lib/ray/",
        "tar xzf lb_hb.tar.gz",
        "python3 RunHoneybeeRadiance.py -sm {0:} -s {1:} -p {2:}".format(sky_mtx_file_path, surfaces_file_path, grid_file_path),
    ]

    command = wrap_commands_in_shell("linux", commands)
    
    task_id = '{0:}_simulation'.format(re.sub("[^0-9a-zA-Z]", "", grid_file_path.replace(".json", "")))

    tasks.append(
        batch.models.TaskAddParameter(
            id=task_id,
            command_line=command,
            resource_files=[analysis_grid_file, sky_mtx_file, surfaces_file, lb_hb_file, run_process_file],
            output_files=[
                batchmodels.OutputFile(
                    results_file_path,
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            output_container_sas_url
                        )
                    ),
                    upload_options=batchmodels.OutputFileUploadOptions(
                        batchmodels.OutputFileUploadCondition.task_success
                    )
                )
            ]
        )
    )
    
    print("Task [{0:}] created".format(task_id))

batch_tasks = batch_client.task.add_collection(_JOB_ID, tasks)

print("Tasks added to job [{0:}]".format(_JOB_ID))

In [None]:
commands = [
    "apt-get update", "apt-get install wget rsync"
    "wget https://github.com/NREL/Radiance/releases/download/5.1.0/radiance-5.1.0-Linux.tar.gz",
    "tar xzf radiance-5.1.0-Linux.tar.gz",
    "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/bin/ /usr/local/bin/",
    "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/lib/ /usr/local/lib/ray/",
    "tar xzf lb_hb.tar.gz",
]

command = wrap_commands_in_shell("linux", commands)

command

In [None]:
# Spin up a pool of nodes capable of running the case



def create_pool(batch_service_client, pool_id, start_commands, resource_files, publisher, offer, sku, node_count):
    """
    Creates a pool of compute nodes with the specified OS settings.
    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str pool_id: An ID for the new pool.
    :param list resource_files: A collection of resource files for the pool's start task.
    :param str publisher: Marketplace image publisher
    :param str offer: Marketplace image offer
    :param str sku: Marketplace image sku
    """
    
    start_task = batch.models.StartTask(command_line=wrap_commands_in_shell(
        "linux",
        start_commands),
            user_identity=batchmodels.UserIdentity(auto_user=user),
            wait_for_success=True,
            resource_files=resource_files)
    user = batchmodels.AutoUserSpecification(
        scope=batchmodels.AutoUserScope.pool, 
        elevation_level=batchmodels.ElevationLevel.admin
    )
    new_pool = batch.models.PoolAddParameter(
        id=pool_id,
        virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
            image_reference=image_ref_to_use,
            node_agent_sku_id=sku_to_use),
        vm_size=_POOL_VM_SIZE,
        enable_auto_scale=True,
        auto_scale_formula='pendingTaskSamplePercent =$PendingTasks.GetSamplePercent(180 * TimeInterval_Second);pendingTaskSamples = pendingTaskSamplePercent < 70 ? 1 : avg($PendingTasks.GetSample(180 * TimeInterval_Second)); $TargetDedicatedNodes = min(100, pendingTaskSamples);', 
        auto_scale_evaluation_interval=datetime.timedelta(minutes=5),
        start_task=start_task,
    )

    try:
        batch_service_client.pool.add(new_pool)
    except batchmodels.batch_error.BatchErrorException as err:
        print_batch_exception(err)
        raise
    
    print('[{0:}] pool created...'.format(pool_id))

_POOL_NODE_COUNT = len(analysis_grid_files)

pool = create_pool(batch_client, _POOL_ID, [], _NODE_OS_PUBLISHER, _NODE_OS_OFFER, _NODE_OS_SKU, _POOL_NODE_COUNT)

In [None]:
# Check pool status

if batch_client.pool.exists(_POOL_ID):
    my_pool = batch_client.pool.get(_POOL_ID)
    print("Current state: {}".format(my_pool.allocation_state))

In [None]:
# Add tasks to the job

print('Adding {} tasks to job [{}]...'.format(len(analysis_grid_files), _JOB_ID))

tasks = []

for idx, analysis_grid_file in enumerate(analysis_grid_files):
    grid_file_path = analysis_grid_file.file_path
    sky_mtx_file_path = sky_mtx_file.file_path
    surfaces_file_path = surfaces_file.file_path
    results_file_path = grid_file_path.replace(".json", "_result.json")

    # Commands to be issued in each job
    commands = [
        "apt-get update",
        "apt-get install wget",
        "apt-get install rsync",
        "wget https://github.com/NREL/Radiance/releases/download/5.1.0/radiance-5.1.0-Linux.tar.gz",
        "tar xzf radiance-5.1.0-Linux.tar.gz",
        "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/bin/ /usr/local/bin/",
        "rsync -av /radiance-5.1.0-Linux/usr/local/radiance/lib/ /usr/local/lib/ray/",
        "tar xzf lb_hb.tar.gz",
        "python3 RunHoneybeeRadiance.py -sm {0:} -s {1:} -p {2:}".format(sky_mtx_file_path, surfaces_file_path, grid_file_path),
    ]

    command = wrap_commands_in_shell("linux", commands)

    # print(command)

    print()

    tasks.append(
        batch.models.TaskAddParameter(
            id='task_{0:}'.format(re.sub("[^0-9a-zA-Z]", "", grid_file_path.replace(".json", ""))),
            command_line=command,
            resource_files=[
                analysis_grid_file,
                sky_mtx_file,
                surfaces_file,
                lb_hb_file,
                run_process_file,
            ],
            output_files=[
                batchmodels.OutputFile(
                    results_file_path,
                    destination=batchmodels.OutputFileDestination(
                        container=batchmodels.OutputFileBlobContainerDestination(
                            output_container_sas_url
                        )
                    ),
                    upload_options=batchmodels.OutputFileUploadOptions(
                        batchmodels.OutputFileUploadCondition.task_success
                    )
                )
            ]
        )
    )

batch_tasks = batch_client.task.add_collection(_JOB_ID, tasks)

print(batch_tasks)