Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Internal: Spark align cluster job batch resource usage #688

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3231b32
first pass at ssh submit log streaming
jafreck Nov 2, 2018
68bfaa8
fix some bugs
jafreck Nov 2, 2018
f43fe74
typo in function name
jafreck Nov 2, 2018
c0f2639
refactor some functions, monkeypatch max_block_size
jafreck Nov 2, 2018
0b4bdf3
docstring, fix bad traceback call
jafreck Nov 2, 2018
46f02f9
Merge branch 'master' into feature/spark-scheduling-target-master-log…
jafreck Nov 2, 2018
2d22237
start shift from print to logger
jafreck Nov 2, 2018
6d56fec
add todo
jafreck Nov 2, 2018
d400147
Merge branch 'feature/spark-scheduling-target-master-log-streaming' o…
jafreck Nov 2, 2018
2118f7a
yapf
jafreck Nov 2, 2018
53a6f90
add comments about planned implementation
jafreck Nov 2, 2018
ff9ddee
create batch error context manager, start stream log download impl
jafreck Nov 5, 2018
9ad56c7
fix type error
jafreck Nov 5, 2018
1a06d64
remove unused import
jafreck Nov 5, 2018
0aa8733
update cli and fix size 0 log get
jafreck Nov 6, 2018
aabebd1
change default max_block_size
jafreck Nov 6, 2018
4fc6eb0
refactor get_application_log, task abstraction
jafreck Nov 6, 2018
4e029c7
start alignment of batch resource usage for clusters and jobs
jafreck Nov 7, 2018
9a96c06
start refactor of batch functions
jafreck Nov 7, 2018
53065f8
align cluster and job functions
jafreck Nov 8, 2018
cc2e072
remove some debug code, add Active application state
jafreck Nov 8, 2018
87c4dea
fix calls to use cluster_id instead of pool_id
jafreck Nov 9, 2018
b9c1cb2
remove unused code, pass correct id, add base job get stub
jafreck Nov 9, 2018
57a1547
fix some issues with job submission
jafreck Nov 9, 2018
123f5b4
add file service, rename blob_client
jafreck Nov 22, 2018
024ed8c
start file_share implementation
jafreck Nov 22, 2018
9faa691
factor log streaming code out
jafreck Nov 29, 2018
0a871c9
Merge branch 'master' into internal/spark-align-cluster-job-batch-res…
jafreck Nov 29, 2018
b2880cd
remove azure file dependency
jafreck Nov 29, 2018
0fc87e3
Merge branch 'internal/spark-align-cluster-job-batch-resource-usage' …
jafreck Nov 29, 2018
17f808e
remove ghost task, fix yapf
jafreck Nov 29, 2018
e96a411
yapf
jafreck Nov 29, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
124 changes: 73 additions & 51 deletions aztk/client/base/base_operations.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from aztk import models
from aztk.internal import cluster_data

from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node,
generate_user_on_cluster, generate_user_on_node, get_application_log, get_recent_job,
get_remote_login_settings, get_task_state, list_tasks, node_run, run, ssh_into_node, task_table)
from .helpers import (create_batch_resources, create_user_on_cluster, create_user_on_node, delete_batch_resources,
delete_user_on_cluster, delete_user_on_node, generate_user_on_cluster, generate_user_on_node,
get_application_log, get_node, get_remote_login_settings, get_task, get_task_state, list_tasks,
node_run, run, ssh_into_node, task_table)


class BaseOperations:
Expand All @@ -12,15 +13,19 @@ class BaseOperations:
Attributes:
batch_client (:obj:`azure.batch.batch_service_client.BatchServiceClient`): Client used to interact with the
Azure Batch service.
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
cloud_storage_account (:obj:`azure.storage.blob.CloudStorageAccount`): Azure Storage account used
block_blob_service (:obj:`azure.storage.blob.CloudStorageAccount`): Client used to interact with the
Azure Storage Blob service.
table_service (:obj:`azure.storage.blob.CloudStorageAccount`): Client used to interact with the Azure Storage
Table service.
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`):
Model that holds AZTK secrets used to authenticate with Azure and the clusters.
"""

def __init__(self, context):
self.batch_client = context["batch_client"]
self.blob_client = context["blob_client"]
self.cloud_storage_account = context["cloud_storage_account"]
self.block_blob_service = context["block_blob_service"]
self.table_service = context["table_service"]
self.secrets_configuration = context["secrets_configuration"]

Expand Down Expand Up @@ -53,7 +58,7 @@ def get_cluster_data(self, id: str) -> cluster_data.ClusterData:
Returns:
:obj:`aztk.models.ClusterData`: Object used to manage the data and storage functions for a cluster
"""
return cluster_data.ClusterData(self.blob_client, id)
return cluster_data.ClusterData(self.block_blob_service, id)

def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
"""Open an ssh tunnel to a node
Expand All @@ -75,6 +80,56 @@ def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port
"""
ssh_into_node.ssh_into_node(self, id, node_id, username, ssh_key, password, port_forward_list, internal)

def create_batch_resources(
self,
id,
start_task,
job_manager_task,
vm_size,
vm_image_model,
on_all_tasks_complete,
mixed_mode,
software_metadata_key,
mode_metadata_key,
size_dedicated,
size_low_priority,
subnet_id,
job_metadata,
):
"""Create the underlying batch resources for a cluster or a job
Args:
...
Returns:
...
"""

return create_batch_resources.create_batch_resources(
self.batch_client,
id,
start_task,
job_manager_task,
vm_size,
vm_image_model,
on_all_tasks_complete,
mixed_mode,
software_metadata_key,
mode_metadata_key,
size_dedicated,
size_low_priority,
subnet_id,
job_metadata,
)

def delete_batch_resources(self, id, keep_logs):
"""Delete the underlying batch resources for a cluster or a job
Args:
...
Returns:
...
"""

return delete_batch_resources.delete_batch_resources(self, id, keep_logs)

def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None):
"""Create a user on a node

Expand Down Expand Up @@ -234,28 +289,6 @@ def create_task_table(self, id: str):
"""
return task_table.create_task_table(self.table_service, id)

def list_task_table_entries(self, id):
"""list tasks in a storage table

Args:
id (:obj:`str`): the id of the cluster

Returns:
:obj:`[aztk.models.Task]`: a list of models representing all entries in the Task table
"""
return task_table.list_task_table_entries(self.table_service, id)

def get_task_from_table(self, id, task_id):
"""Create a storage table to track tasks

Args:
id (:obj:`str`): the id of the cluster

Returns:
:obj:`[aztk.models.Task]`: the task with id task_id from the cluster's storage table
"""
return task_table.get_task_from_table(self.table_service, id, task_id)

def insert_task_into_task_table(self, id, task):
"""Insert a task into the table

Expand Down Expand Up @@ -300,16 +333,6 @@ def list_tasks(self, id):
"""
return list_tasks.list_tasks(self, id)

def get_recent_job(self, id):
"""Get the most recently run job in an Azure Batch job schedule

Args:
id (:obj:`str`): the id of the job schedule
Returns:
:obj:`[azure.batch.models.Job]`: the most recently run job on the job schedule
"""
return get_recent_job.get_recent_job(self, id)

def get_task_state(self, id: str, task_name: str):
"""Get the status of a submitted task

Expand All @@ -322,25 +345,24 @@ def get_task_state(self, id: str, task_name: str):
"""
return get_task_state.get_task_state(self, id, task_name)

def list_batch_tasks(self, id: str):
"""Get the status of a submitted task
def get_task(self, id: str, task_id: str):
"""Get a task submitted to a cluster

Args:
id (:obj:`str`): the name of the cluster the task was submitted to
id (:obj:`str`): the id of the cluster

Returns:
:obj:`[aztk.models.Task]`: list of aztk tasks
:obj:`[aztk.models.Task]`: the submitted task with id task_id
"""
return task_table.list_batch_tasks(self.batch_client, id)

def get_batch_task(self, id: str, task_id: str):
"""Get the status of a submitted task
return get_task.get_task(self, id, task_id)

def get_node(self, id: str, node_id: str):
"""Get a node in a cluster
Args:
id (:obj:`str`): the name of the cluster the task was submitted to
task_id (:obj:`str`): the name of the task to get
id (:obj:`str`): the id of the cluster
node_id (:obj:`str`): the id of the node

Returns:
:obj:`aztk.models.Task`: aztk Task representing the Batch Task
:obj:`[azure.batch.models.ComputeNode]`: the requested node
"""
return task_table.get_batch_task(self.batch_client, id, task_id)
return get_node.get_node(self, id, node_id)
69 changes: 69 additions & 0 deletions aztk/client/base/helpers/create_batch_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import timedelta

import azure.batch.models as batch_models

from aztk.utils import constants, helpers


def create_batch_resources(
batch_client,
id,
start_task,
job_manager_task,
vm_size,
vm_image_model,
on_all_tasks_complete,
mixed_mode,
software_metadata_key,
mode_metadata_key,
size_dedicated,
size_low_priority,
subnet_id,
job_metadata,
):
autoscale_formula = "$TargetDedicatedNodes = {0}; " "$TargetLowPriorityNodes = {1}".format(
size_dedicated, size_low_priority)

sku_to_use, image_ref_to_use = helpers.select_latest_verified_vm_image_with_node_agent_sku(
vm_image_model.publisher, vm_image_model.offer, vm_image_model.sku, batch_client)

network_conf = None
if subnet_id is not None:
network_conf = batch_models.NetworkConfiguration(subnet_id=subnet_id)

auto_pool_specification = batch_models.AutoPoolSpecification(
pool_lifetime_option=batch_models.PoolLifetimeOption.job,
auto_pool_id_prefix=id,
keep_alive=False,
pool=batch_models.PoolSpecification(
display_name=id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use),
vm_size=vm_size,
enable_auto_scale=True,
auto_scale_formula=autoscale_formula,
auto_scale_evaluation_interval=timedelta(minutes=5),
start_task=start_task,
enable_inter_node_communication=not mixed_mode,
network_configuration=network_conf,
max_tasks_per_node=4,
metadata=[
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY,
value=constants.AZTK_JOB_MODE_METADATA) # dyanmically change to cluster/job metadata
]),
)

job = batch_models.JobAddParameter(
id=id,
pool_info=batch_models.PoolInformation(auto_pool_specification=auto_pool_specification),
job_manager_task=job_manager_task,
on_all_tasks_complete=on_all_tasks_complete,
metadata=[
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(name=constants.AZTK_MODE_METADATA_KEY, value=mode_metadata_key)
] + job_metadata,
)

return batch_client.job.add(job)
29 changes: 29 additions & 0 deletions aztk/client/base/helpers/delete_batch_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from azure.batch.models import BatchErrorException
from msrest.exceptions import ClientRequestError

from aztk.utils import BackOffPolicy, helpers, retry


@retry(retry_count=4, retry_interval=1, backoff_policy=BackOffPolicy.exponential, exceptions=(ClientRequestError))
def delete_batch_resources(core_base_operations, job_id, keep_logs: bool = False):
success = False

# delete batch job, autopool
try:
core_base_operations.batch_client.job.delete(job_id)
success = True
except BatchErrorException:
pass

# delete storage container
if not keep_logs:
cluster_data = core_base_operations.get_cluster_data(job_id)
cluster_data.delete_container(job_id)
success = True

table_exists = core_base_operations.table_service.exists(job_id)
if table_exists:
core_base_operations.delete_task_table(job_id)
success = True

return success
46 changes: 20 additions & 26 deletions aztk/client/base/helpers/get_application_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from aztk import error, models
from aztk.models import Task, TaskState
from aztk.utils import constants, helpers
from aztk.utils import batch_error_manager, constants, helpers

output_file = constants.TASK_WORKING_DIR + "/" + constants.SPARK_SUBMIT_LOGS_FILE

Expand All @@ -19,19 +19,15 @@ def __check_task_node_exist(batch_client, cluster_id: str, task: Task) -> bool:
return False


def __wait_for_app_to_be_running(base_operations, cluster_id: str, application_name: str) -> Task:
"""
Wait for the batch task to leave the waiting state into running(or completed if it was fast enough)
"""

while True:
task_state = base_operations.get_task_state(cluster_id, application_name)

if task_state in [batch_models.TaskState.active, batch_models.TaskState.preparing]:
# TODO: log
time.sleep(5)
else:
return base_operations.get_batch_task(id=cluster_id, task_id=application_name)
def wait_for_task(base_operations, cluster_id, application_name):
# TODO: ensure get_task_state not None or throw
task = base_operations.get_task(cluster_id, application_name)
while task.state not in [TaskState.Completed, TaskState.Failed]:
time.sleep(3)
# TODO: enable logger
# log.debug("{} {}: application not yet complete".format(cluster_id, application_name))
task = base_operations.get_task(cluster_id, application_name)
return task


def __get_output_file_properties(batch_client, cluster_id: str, application_name: str):
Expand All @@ -48,18 +44,18 @@ def __get_output_file_properties(batch_client, cluster_id: str, application_name
raise e


def get_log_from_storage(blob_client, container_name, application_name, task):
def get_log_from_storage(block_blob_service, container_name, application_name, task):
"""
Args:
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
block_blob_service (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
container_name (:obj:`str`): the name of the Azure Blob storage container to get data from
application_name (:obj:`str`): the name of the application to get logs for
task (:obj:`aztk.models.Task`): the aztk task for for this application

"""
try:
blob = blob_client.get_blob_to_text(container_name, application_name + "/" + constants.SPARK_SUBMIT_LOGS_FILE)
blob = block_blob_service.get_blob_to_text(container_name,
application_name + "/" + constants.SPARK_SUBMIT_LOGS_FILE)
except azure.common.AzureMissingResourceHttpError:
raise error.AztkError("Logs not found in your storage account. They were either deleted or never existed.")

Expand All @@ -77,24 +73,24 @@ def wait_for_scheduling_target_task(base_operations, cluster_id, application_nam
application_state = base_operations.get_task_state(cluster_id, application_name)
while TaskState(application_state) not in [TaskState.Completed, TaskState.Failed]:
time.sleep(3)
print("Application {}: State {}".format(TaskState(application_state), application_name))
# TODO: enable logger
# log.debug("{} {}: application not yet complete".format(cluster_id, application_name))
application_state = base_operations.get_task_state(cluster_id, application_name)
return base_operations.get_task_from_table(cluster_id, application_name)
return base_operations.get_task(cluster_id, application_name)


def get_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
job_id = cluster_id
task_id = application_name
cluster_configuration = base_operations.get_cluster_configuration(cluster_id)

task = wait_for_task(base_operations, cluster_id, application_name)
if cluster_configuration.scheduling_target is not models.SchedulingTarget.Any:
task = wait_for_scheduling_target_task(base_operations, cluster_id, application_name)
return get_log_from_storage(base_operations.blob_client, cluster_id, application_name, task)
return get_log_from_storage(base_operations.block_blob_service, cluster_id, application_name, task)
else:
task = __wait_for_app_to_be_running(base_operations, cluster_id, application_name)
if not __check_task_node_exist(base_operations.batch_client, cluster_id, task):
return get_log_from_storage(base_operations.blob_client, cluster_id, application_name, task)
return get_log_from_storage(base_operations.block_blob_service, cluster_id, application_name, task)

file = __get_output_file_properties(base_operations.batch_client, cluster_id, application_name)
target_bytes = file.content_length
Expand Down Expand Up @@ -129,7 +125,5 @@ def get_log(base_operations, cluster_id: str, application_name: str, tail=False,


def get_application_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
try:
with batch_error_manager():
return get_log(base_operations, cluster_id, application_name, tail, current_bytes)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
7 changes: 7 additions & 0 deletions aztk/client/base/helpers/get_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from aztk.utils import batch_error_manager


def get_node(core_base_operations, cluster_id, node_id):
with batch_error_manager():
cluster = core_base_operations.get(cluster_id)
return core_base_operations.batch_client.compute_node.get(cluster.pool.id, node_id)