In [1]:
import os
import io
import uuid
import sys
import time
import config
from datetime import datetime, timedelta
import azure.storage.blob as azureblob
import azure.batch.models as batchmodels
import azure.batch._batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.operations as batchops
from azure.batch.models import (VirtualMachineConfiguration, CloudServiceConfiguration, ImageReference,
                                BatchErrorException, PoolInformation, JobAddParameter)

In [2]:
def print_batch_exception(batch_exception):
    """
    Prints the contents of the specified Batch exception.

    :param batch_exception:
    """
    print('-------------------------------------------')
    print('Exception encountered:')
    if batch_exception.error and \
            batch_exception.error.message and \
            batch_exception.error.message.value:
        print(batch_exception.error.message.value)
        if batch_exception.error.values:
            print()
            for mesg in batch_exception.error.values:
                print('{}:\t{}'.format(mesg.key, mesg.value))
    print('-------------------------------------------')

In [3]:
def upload_file_to_container(block_blob_client, container_name, file_path):
    """
    Uploads a local file to an Azure Blob storage container.

    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :param str file_path: The local path to the file.
    :rtype: `azure.batch.models.ResourceFile`
    :return: A ResourceFile initialized with a SAS URL appropriate for Batch
    tasks.
    """
    blob_name = os.path.basename(file_path)

    print('Uploading file {} to container [{}]...'.format(file_path,
                                                          container_name))

    block_blob_client.create_blob_from_path(container_name,
                                            blob_name,
                                            file_path)

    sas_token = block_blob_client.generate_blob_shared_access_signature(
        container_name,
        blob_name,
        permission=azureblob.BlobPermissions.READ,
        expiry=datetime.utcnow() + timedelta(hours=2))

    sas_url = block_blob_client.make_blob_url(container_name,
                                              blob_name,
                                              sas_token=sas_token)

    return batchmodels.ResourceFile(http_url=sas_url, file_path=blob_name)

In [4]:
def get_container_sas_token(block_blob_client,
                            container_name, blob_permissions):
    """
    Obtains a shared access signature granting the specified permissions to the
    container.

    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :param BlobPermissions blob_permissions:
    :rtype: str
    :return: A SAS token granting the specified permissions to the container.
    """
    # Obtain the SAS token for the container, setting the expiry time and
    # permissions. In this case, no start time is specified, so the shared
    # access signature becomes valid immediately.
    container_sas_token = \
        block_blob_client.generate_container_shared_access_signature(
            container_name,
            permission=blob_permissions,
            expiry=datetime.utcnow() + timedelta(hours=2))

    return container_sas_token

# Creating Batch Client

In [5]:
credentials = batch_auth.SharedKeyCredentials(account_name=config._BATCH_ACCOUNT_NAME,key=config._BATCH_ACCOUNT_KEY)

In [6]:
batch_client = batch.BatchServiceClient(credentials=credentials, batch_url=config._BATCH_ACCOUNT_URL)

# Evaluate an autoscale formula for the Pool

In [7]:
formula = '''startingNumberOfVMs = 0;
maxNumberofVMs = 10;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetLowPriorityNodes=min(maxNumberofVMs, pendingTaskSamples);
$NodeDeallocationOption = taskcompletion;'''

In [8]:
# pool_eval = batch_client.pool.evaluate_auto_scale(pool_id=config._POOL_ID,auto_scale_formula=formula) # To work upon

In [9]:
# pool_eval = pool_eval.as_dict()

In [10]:
#Converting to list
# pp = pool_eval['results'].replace('$','').replace('=',";").split(';')

In [11]:
#Converting to Dictionary
# pool_stat = dict(zip(pp[::2],pp[1::2]))

In [13]:
#Need to have a check here as well that pendingTaskSamples must be 0 only then we can go ahead
while True:
    pool_eval = batch_client.pool.evaluate_auto_scale(pool_id=config._POOL_ID,auto_scale_formula=formula) # To work upon
    pool_eval = pool_eval.as_dict()
    pp = pool_eval['results'].replace('$','').replace('=',";").split(';')
    pool_stat = dict(zip(pp[::2],pp[1::2]))
    if pool_stat['pendingTaskSamples']=='0':
        break

# Create a Container

In [15]:
# Create the blob client, for use in obtaining references to
    # blob storage containers and uploading files to containers.

blob_client = azureblob.BlockBlobService(
    account_name=config._STORAGE_ACCOUNT_NAME,
    account_key=config._STORAGE_ACCOUNT_KEY)

In [16]:
# Use the blob client to create the containers in Azure Storage if they
    # don't yet exist.

input_container_name = 'input'
blob_client.create_container(input_container_name, fail_on_exist=False)

False

In [17]:
input_file_paths = [os.path.join(sys.path[0], 'sleep.py'),
#                     os.path.join(sys.path[0], 'taskdata1.txt'),
#                     os.path.join(sys.path[0], 'taskdata2.txt'),
                    os.path.join(sys.path[0],'commands.bat')]

In [18]:
# Upload the data files.
input_files = [
    upload_file_to_container(blob_client, input_container_name, file_path)
    for file_path in input_file_paths]

Uploading file /home/fission/Desktop/Azure_batch/Practice/sleep.py to container [input]...
Uploading file /home/fission/Desktop/Azure_batch/Practice/commands.bat to container [input]...


# Add Task

In [19]:
response = batch_client.pool.enable_auto_scale(pool_id=config._POOL_ID,auto_scale_formula=formula,
                                           auto_scale_evaluation_interval=timedelta(minutes=5))

In [20]:
tasks = list()

for idx, input_file in enumerate(input_files):
    command = "/bin/bash -c \"bash {}\"".format(input_file.file_path)
    tasks.append(batch.models.TaskAddParameter(
        id='Task{}'.format(idx),
        command_line=command,
        resource_files=[input_file]
    ))
batch_client.task.add_collection(config._JOB_ID, tasks) #Adding tasks to the Job

<azure.batch.models._models_py3.TaskAddCollectionResult at 0x7f29002216d0>

In [21]:
#Retrieving informating for each task
task_id = []
boolean = []
for i in range(len(tasks)):
    id_ = list(batch_client.task.list(job_id=config._JOB_ID))[i].as_dict()
    boolean.append(False)
    task_id.append(id_['id'])

In [22]:
boolean

[False, False]

In [23]:
# We may need to need check the status of pool autoscale before going to the next line. Until the Nodes
# are allocated, we cant go check the status of Task
starttime = datetime.now().replace(microsecond=0)
while True:
    pool_stat = batch_client.pool.get(pool_id=config._POOL_ID)
    pool_stat = pool_stat.as_dict()
    if pool_stat['current_low_priority_nodes'] > 0:
        break
print(f'elapsed time : {datetime.now().replace(microsecond=0)-starttime}')

elapsed time : 0:07:28


In [24]:
iscompleted = False
while True:
    iscompleted = True
    try:
        for i in range(len(task_id)):
            if boolean[i]==False:
                y = batch_client.task.get(job_id=config._JOB_ID,task_id=task_id[i])
                y = y.as_dict()
                print('...')
                if y['state'] != 'completed':    # Checking the status of task for completion
                    print("task id : {} still {}".format(task_id[i],y['state']))
                    iscompleted = False
                else:
                    print("task id : {} is {}".format(task_id[i],y['state']))
                    boolean[i]=True
        if iscompleted != True:
            time.sleep(30)
    except batchmodels.BatchErrorException as err:
        print_batch_exception(err)
        raise
    if iscompleted:
        break

...
task id : Task0 still active
...
task id : Task1 still active
...
task id : Task0 is completed
...
task id : Task1 is completed


In [25]:
boolean

[True, True]

In [None]:
# task1 gets completed in 10 sec
# task2 is in pending
# task3 complete



In [26]:
task_count = batch_client.job.get_task_counts(job_id=config._JOB_ID)

In [27]:
task_count.as_dict()

{'active': 0, 'running': 0, 'completed': 2, 'succeeded': 0, 'failed': 2}

## Deleting the tasks

In [38]:
for i in task_id:
    batch_client.task.delete(job_id=config._JOB_ID,task_id=i)