In [1]:
import azure.storage.blob as azureblob
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batchauth
import azure.batch.models as batchmodels
import datetime

## Connect Client to the blob storage account

In [2]:
_STORAGE_ACCOUNT_NAME = 'endeavourobsdata'
_STORAGE_ACCOUNT_KEY = 'nfokNynUJ1CZFsUl1+6TLVJvNqz01F4++Jhvr4+gphSBG/B2Hz1MiVkPHoQAPJ8NclpYLLubhQ4d+AStVI0krw=='


In [3]:
blob_client = azureblob.BlockBlobService(
        account_name=_STORAGE_ACCOUNT_NAME,
        account_key=_STORAGE_ACCOUNT_KEY)

## Get SAS URL token to access the container within the blob storage account

In [4]:
def get_container_sas_token(block_blob_client,
                            container_name, blob_permissions):
    """
    Obtains a shared access signature granting the specified permissions to the
    container.
    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :param BlobPermissions blob_permissions:
    :rtype: str
    :return: A SAS token granting the specified permissions to the container.
    """
    # Obtain the SAS token for the container, setting the expiry time and
    # permissions. In this case, no start time is specified, so the shared
    # access signature becomes valid immediately. Expiration is in 2 hours.
    container_sas_token = \
        block_blob_client.generate_container_shared_access_signature(
            container_name,
            permission=blob_permissions,
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2))

    return container_sas_token

def get_container_sas_url(block_blob_client,
                          container_name, blob_permissions):
    """
    Obtains a shared access signature URL that provides write access to the 
    ouput container to which the tasks will upload their output.
    :param block_blob_client: A blob service client.
    :type block_blob_client: `azure.storage.blob.BlockBlobService`
    :param str container_name: The name of the Azure Blob storage container.
    :param BlobPermissions blob_permissions:
    :rtype: str
    :return: A SAS URL granting the specified permissions to the container.
    """
    # Obtain the SAS token for the container.
    sas_token = get_container_sas_token(block_blob_client,
                                        container_name, azureblob.BlobPermissions.WRITE)

    # Construct SAS URL for the container
    container_sas_url = "https://{}.blob.core.windows.net/{}?{}".format(
        _STORAGE_ACCOUNT_NAME, container_name, sas_token)

    return container_sas_url

In [5]:
output_container_name = 'endeavourobsdata'

In [6]:
output_container_sas_url = get_container_sas_url(
        blob_client,
        output_container_name,
        azureblob.BlobPermissions.WRITE)

In [7]:
output_container_sas_url

'https://endeavourobsdata.blob.core.windows.net/endeavourobsdata?se=2022-11-04T18%3A01%3A57Z&sp=w&sv=2018-03-28&sr=c&sig=6ymyK0IjTU04E7wsKHbevqulpJVJ/45YHOEO03OoFLA%3D'

## Connect Client to batch account

In [8]:
_BATCH_ACCOUNT_NAME = 'tmatch'
_BATCH_ACCOUNT_KEY = 'KnCI2SAbalzTJPUe7qq5H0+19RjxH1mukN9fAhjSGawTpDg4M1FPUVvORTTR3fq+mizdhaCs2eSo+ABalZMZpQ=='
_BATCH_ACCOUNT_URL = 'https://tmatch.westus2.batch.azure.com'

In [9]:
# Create a Batch service client. We'll now be interacting with the Batch
# service in addition to Storage
credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                             _BATCH_ACCOUNT_KEY)

In [10]:
batch_client = batch.BatchServiceClient(
        credentials,
        batch_url=_BATCH_ACCOUNT_URL)

## Create a pool

Followed documentation here to add the container registry to the pool set-up:
https://learn.microsoft.com/en-us/python/api/azure-batch/azure.batch.models.containerconfiguration?view=azure-python

In [22]:
_POOL_ID = 'test_pool'
_POOL_START_TASK = '/bin/bash -c "sudo apt -y install nfs-common && mkdir -p /tmp/data && sudo mount -o sec=sys,vers=3,nolock,proto=tcp endeavourobsdata.blob.core.windows.net:/endeavourobsdata/endeavourobsdata /tmp/data && sudo chmod -R 0755 /tmp/data"'
_POOL_VM_SIZE = 'Standard_D16s_v3'
_DEDICATED_POOL_NODE_COUNT = 1
_LOW_PRIORITY_POOL_NODE_COUNT = 0 
_CONTAINER_REGISTRY_USERNAME = 'mldetect'
_CONTAINER_REGISTRY_PASSWORD = 'TD3sQW7tyxq=MTh1OsH6WIVSKNY8uJrm'
_CONTAINER_REGISTRY_SERVER = 'mldetect.azurecr.io'

In [12]:
# Tie to container registry
container_registry = batchmodels.ContainerRegistry(
    user_name = _CONTAINER_REGISTRY_USERNAME,
    password = _CONTAINER_REGISTRY_PASSWORD,
    registry_server = _CONTAINER_REGISTRY_SERVER)

In [23]:
def create_pool(batch_service_client, pool_id):
    """
    Creates a pool of compute nodes with the specified OS settings.
    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str pool_id: An ID for the new pool.
    :param str publisher: Marketplace image publisher
    :param str offer: Marketplace image offer
    :param str sku: Marketplace image sky
    """
    print('Creating pool [{}]...'.format(pool_id))

    # Create a new pool of Linux compute nodes using an Azure Virtual Machines
    # Marketplace image. For more information about creating pools of Linux
    # nodes, see:
    # https://azure.microsoft.com/documentation/articles/batch-linux-nodes/

    # The start task mounts the blob storage container on each node, using
    # an administrator user identity.

    new_pool = batch.models.PoolAddParameter(
        id=pool_id,
        virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
                    image_reference=batchmodels.ImageReference(
                        publisher="microsoft-azure-batch",
                        offer="ubuntu-server-container",
                        sku="20-04-lts",
                        version="latest"
                    ),
                    node_agent_sku_id="batch.node.ubuntu 20.04",
                    container_configuration=batchmodels.ContainerConfiguration(
                        container_image_names=['mldetect.azurecr.io/denolle-lab/seismicloud:latest'],
                        container_registries=[container_registry]
                        ),
                    ),
        vm_size=_POOL_VM_SIZE,
        target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
        target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
        start_task=batchmodels.StartTask(
            command_line=_POOL_START_TASK,
            wait_for_success=True,
            user_identity=batchmodels.UserIdentity(
                auto_user=batchmodels.AutoUserSpecification(
                    scope=batchmodels.AutoUserScope.pool,
                    elevation_level=batchmodels.ElevationLevel.admin)),
        )
    )

    batch_service_client.pool.add(new_pool)

In [24]:
create_pool(batch_client,_POOL_ID)

Creating pool [test_pool]...


## Create a job within a specified pool

In [25]:
_JOB_ID = 'auto_test'
_POOL_ID = _POOL_ID

In [26]:
def create_job(batch_service_client, job_id, pool_id):
    """
    Creates a job with the specified ID, associated with the specified pool.
    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str job_id: The ID for the job.
    :param str pool_id: The ID for the pool.
    """
    print('Creating job [{}]...'.format(job_id))

    job = batch.models.JobAddParameter(
        id=job_id,
        pool_info=batch.models.PoolInformation(pool_id=pool_id))

    batch_service_client.job.add(job)

In [28]:
# Create the job that will run the tasks.
create_job(batch_client, _JOB_ID, _POOL_ID)

Creating job [auto_test]...


## Create tasks within job

In [None]:
def add_tasks(batch_service_client, job_id, input_files, output_container_sas_url):
    """
    Adds a task for each input file in the collection to the specified job.
    :param batch_service_client: A Batch service client.
    :type batch_service_client: `azure.batch.BatchServiceClient`
    :param str job_id: The ID of the job to which to add the tasks.
    :param list input_files: A collection of input files. One task will be
     created for each input file.
    :param output_container_sas_token: A SAS token granting write access to
    the specified Azure Blob storage container.
    """

    print('Adding {} tasks to job [{}]...'.format(len(input_files), job_id))

    tasks = list()

    for idx, in range(len(n_nodes)):
        command = '/bin/bash -c "python /tmp/scripts/template_matching/create_joblist.py --config /tmp/data/data/outputs/config_zoe_azure_batch.json && mpirun -np {} python /tmp/scripts/template_matching/distributed_detection.py -c /tmp/data/data/outputs/config_zoe_azure_batch.json -n NV -y 2017 -r {}"
        tasks.append(batch.models.TaskAddParameter(
            id='Task{}'.format(idx),
            command_line=command,
            resource_files=[input_file],
            output_files=[batchmodels.OutputFile(
                file_pattern=output_file_path,
                destination=batchmodels.OutputFileDestination(
                          container=batchmodels.OutputFileBlobContainerDestination(
                              container_url=output_container_sas_url)),
                upload_options=batchmodels.OutputFileUploadOptions(
                    upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
        )
        )
    batch_service_client.task.add_collection(job_id, tasks)

In [30]:

# Define number of nodes
n_nodes = _DEDICATED_POOL_NODE_COUNT
# Define number of CPUs per node
n_cpus = 16

In [31]:
idx = 0
command = '/bin/bash -c "python /tmp/batch_scripts/template_matching/create_joblist.py --config /tmp/configs/config_zoe_azure_batch.json && mpirun -np {} python /tmp/batch_scripts/template_matching/detection.py -c /tmp/configs/config_zoe_azure_batch.json -n NV -y 2017 -r {}"'.format(n_cpus,idx)




In [None]:
os.system(
        f"{config['workflow']['interpreter']} /tmp/scripts/template_matching/detection.py"
        + f" -n {network} -d {day} -y {year} -r {rank} -v {verbose} -c {fconfig} --pid {pid} "
    

In [32]:
command

'/bin/bash -c "python /tmp/scripts/template_matching/create_joblist.py --config /tmp/data/data/outputs/config_zoe_azure_batch.json &&mpirun -np 16 python /tmp/scripts/template_matching/distributed_detection.py -c /tmp/data/data/outputs/config_zoe_azure_batch.json -n NV -y 2017 -r 0"'

In [None]:
# Add the tasks to the job. Pass the input files and a SAS URL
# to the storage container for output files.
add_tasks(batch_client, config._JOB_ID,
          input_files, output_container_sas_url)

## Clean up

In [None]:
# Delete pool
batch_client.pool.delete(_POOL_ID)

In [None]:
# Delete job
batch_client.job.delete(_JOB_ID)