Skip to content
This repository has been archived by the owner on Mar 20, 2023. It is now read-only.

Commit

Permalink
Convert max_tasks_per_node to task_slots_per_node
Browse files Browse the repository at this point in the history
  • Loading branch information
alfpark committed Mar 20, 2023
1 parent f851815 commit dca6081
Show file tree
Hide file tree
Showing 39 changed files with 141 additions and 87 deletions.
2 changes: 1 addition & 1 deletion config_templates/pool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pool_specification:
vm_count:
dedicated: 4
low_priority: 8
max_tasks_per_node: 1
task_slots_per_node: 1
resize_timeout: 00:20:00
node_fill_type: pack
autoscale:
Expand Down
12 changes: 6 additions & 6 deletions convoy/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
_UNBOUND_MAX_NODES = 16777216
AutoscaleMinMax = collections.namedtuple(
'AutoscaleMinMax', [
'max_tasks_per_node',
'task_slots_per_node',
'min_target_dedicated',
'min_target_low_priority',
'max_target_dedicated',
Expand Down Expand Up @@ -79,7 +79,7 @@ def _formula_tasks(pool):
task_type,
pool.autoscale.scenario.required_sample_percentage,
),
'reqVMs = {}TaskAvg / maxTasksPerNode'.format(task_type),
'reqVMs = {}TaskAvg / taskSlotsPerNode'.format(task_type),
]
if pool.autoscale.scenario.rebalance_preemption_percentage is not None:
req_vms.extend([
Expand All @@ -101,7 +101,7 @@ def _formula_tasks(pool):
'{}TaskAvg = avg(${}Tasks.GetSample(sli, {}))'.format(
task_type, task_type,
pool.autoscale.scenario.required_sample_percentage),
'reqVMs = {}TaskAvg / maxTasksPerNode'.format(task_type),
'reqVMs = {}TaskAvg / taskSlotsPerNode'.format(task_type),
'reqVMs = ({}TaskAvg > 0 && reqVMs < 1) ? 1 : reqVMs'.format(
task_type),
]
Expand Down Expand Up @@ -186,7 +186,7 @@ def _formula_tasks(pool):
pool.autoscale.scenario.bias_node_type))
target_vms = ';\n'.join(target_vms)
formula = [
'maxTasksPerNode = {}'.format(minmax.max_tasks_per_node),
'taskSlotsPerNode = {}'.format(minmax.task_slots_per_node),
'minTargetDedicated = {}'.format(minmax.min_target_dedicated),
'minTargetLowPriority = {}'.format(minmax.min_target_low_priority),
'maxTargetDedicated = {}'.format(minmax.max_target_dedicated),
Expand Down Expand Up @@ -274,7 +274,7 @@ def _formula_day_of_week(pool):
pool.autoscale.scenario.bias_node_type))
target_vms = ';\n'.join(target_vms)
formula = [
'maxTasksPerNode = {}'.format(minmax.max_tasks_per_node),
'taskSlotsPerNode = {}'.format(minmax.task_slots_per_node),
'minTargetDedicated = {}'.format(minmax.min_target_dedicated),
'minTargetLowPriority = {}'.format(minmax.min_target_low_priority),
'maxTargetDedicated = {}'.format(minmax.max_target_dedicated),
Expand Down Expand Up @@ -327,7 +327,7 @@ def _get_minmax(pool):
if max_inc_low_priority <= 0:
max_inc_low_priority = _UNBOUND_MAX_NODES
return AutoscaleMinMax(
max_tasks_per_node=pool.max_tasks_per_node,
task_slots_per_node=pool.task_slots_per_node,
min_target_dedicated=min_target_dedicated,
min_target_low_priority=min_target_low_priority,
max_target_dedicated=max_target_dedicated,
Expand Down
86 changes: 69 additions & 17 deletions convoy/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ def list_pools(batch_client, config):
' * low priority:',
' * current: {}'.format(pool.current_low_priority_nodes),
' * target: {}'.format(pool.target_low_priority_nodes),
' * max tasks per node: {}'.format(pool.max_tasks_per_node),
' * tasks slots per node: {}'.format(pool.task_slots_per_node),
' * enable inter node communication: {}'.format(
pool.enable_inter_node_communication),
' * autoscale enabled: {}'.format(pool.enable_auto_scale),
Expand Down Expand Up @@ -1517,10 +1517,10 @@ def pool_stats(batch_client, config, pool_id=None):
if node.running_tasks_count is not None:
tasks_running.append(node.running_tasks_count)
total_running_tasks = sum(tasks_running)
runnable_task_slots = runnable_nodes * pool.max_tasks_per_node
runnable_task_slots = runnable_nodes * pool.task_slots_per_node
total_task_slots = (
pool.current_dedicated_nodes + pool.current_low_priority_nodes
) * pool.max_tasks_per_node
) * pool.task_slots_per_node
busy_task_slots_fraction = (
0 if runnable_task_slots == 0 else
total_running_tasks / runnable_task_slots
Expand Down Expand Up @@ -1998,17 +1998,32 @@ def job_stats(batch_client, config, jobid=None):
task_wall_times = []
task_counts = batchmodels.TaskCounts(
active=0, running=0, completed=0, succeeded=0, failed=0)
task_slots = batchmodels.TaskSlotCounts(
active=0, running=0, completed=0, succeeded=0, failed=0)
total_tasks = 0
total_slots = 0
for job in jobs:
job_count += 1
# get task counts
tc = batch_client.job.get_task_counts(job_id=job.id)
task_counts.active += tc.active
task_counts.running += tc.running
task_counts.completed += tc.completed
task_counts.succeeded += tc.succeeded
task_counts.failed += tc.failed
total_tasks += tc.active + tc.running + tc.completed
task_counts.active += tc.task_counts.active
task_counts.running += tc.task_counts.running
task_counts.completed += tc.task_counts.completed
task_counts.succeeded += tc.task_counts.succeeded
task_counts.failed += tc.task_counts.failed
total_tasks += (
tc.task_counts.active + tc.task_counts.running +
tc.task_counts.completed
)
task_slots.active += tc.task_slot_counts.active
task_slots.running += tc.task_slot_counts.running
task_slots.completed += tc.task_slot_counts.completed
task_slots.succeeded += tc.task_slot_counts.succeeded
task_slots.failed += tc.task_slot_counts.failed
total_slots = (
tc.task_slot_counts.active + tc.task_slot_counts.running +
tc.task_slot_counts.completed
)
if job.execution_info.end_time is not None:
job_times.append(
(job.execution_info.end_time -
Expand Down Expand Up @@ -2054,6 +2069,29 @@ def job_stats(batch_client, config, jobid=None):
100 * task_counts.failed / task_counts.completed
if task_counts.completed > 0 else 0
),
'* Total slots: {}'.format(total_slots),
' * Active: {0} ({1:.2f}% of total)'.format(
task_slots.active,
100 * task_slots.active / total_slots if total_slots > 0 else 0
),
' * Running: {0} ({1:.2f}% of total)'.format(
task_slots.running,
100 * task_slots.running / total_slots if total_slots > 0 else 0
),
' * Completed: {0} ({1:.2f}% of total)'.format(
task_slots.completed,
100 * task_slots.completed / total_slots if total_slots > 0 else 0
),
' * Succeeded: {0} ({1:.2f}% of completed)'.format(
task_slots.succeeded,
100 * task_slots.succeeded / task_slots.completed
if task_slots.completed > 0 else 0
),
' * Failed: {0} ({1:.2f}% of completed)'.format(
task_slots.failed,
100 * task_slots.failed / task_slots.completed
if task_slots.completed > 0 else 0
),
]
if len(job_times) > 0:
log.extend([
Expand Down Expand Up @@ -3883,7 +3921,7 @@ def get_task_counts(batch_client, config, jobid=None):
raw = {}
for job in jobs:
jobid = settings.job_id(job)
log = ['task counts for job {}'.format(jobid)]
log = ['task counts and slot counts for job {}'.format(jobid)]
try:
if settings.raw(config):
raw[jobid] = util.print_raw_output(
Expand All @@ -3901,11 +3939,25 @@ def get_task_counts(batch_client, config, jobid=None):
raise
else:
if not settings.raw(config):
log.append('* active: {}'.format(tc.active))
log.append('* running: {}'.format(tc.running))
log.append('* completed: {}'.format(tc.completed))
log.append(' * succeeded: {}'.format(tc.succeeded))
log.append(' * failed: {}'.format(tc.failed))
log.append('* task counts:')
log.append(' * active: {}'.format(tc.task_counts.active))
log.append(' * running: {}'.format(tc.task_counts.running))
log.append(' * completed: {}'.format(
tc.task_counts.completed))
log.append(' * succeeded: {}'.format(
tc.task_counts.succeeded))
log.append(' * failed: {}'.format(tc.task_counts.failed))
log.append('* task slots:')
log.append(' * active: {}'.format(
tc.task_slot_counts.active))
log.append(' * running: {}'.format(
tc.task_slot_counts.running))
log.append(' * completed: {}'.format(
tc.task_slot_counts.completed))
log.append(' * succeeded: {}'.format(
tc.task_slot_counts.succeeded))
log.append(' * failed: {}'.format(
tc.task_slot_counts.failed))
logger.info(os.linesep.join(log))
if util.is_not_empty(raw):
util.print_raw_json(raw)
Expand Down Expand Up @@ -5429,15 +5481,15 @@ def add_jobs(
if recurrence.job_manager.allow_low_priority_node
else 0
)
total_slots = cloud_pool.max_tasks_per_node * total_vms
total_slots = cloud_pool.task_slots_per_node * total_vms
else:
total_vms = (
pool.vm_count.dedicated +
pool.vm_count.low_priority
if recurrence.job_manager.allow_low_priority_node
else 0
)
total_slots = pool.max_tasks_per_node * total_vms
total_slots = pool.task_slots_per_node * total_vms
if total_slots == 1:
logger.error(
('Only 1 scheduling slot available which is '
Expand Down
8 changes: 4 additions & 4 deletions convoy/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,7 @@ def _construct_pool_object(
pool_settings.vm_count.low_priority if not asenable else None
),
resize_timeout=pool_settings.resize_timeout if not asenable else None,
max_tasks_per_node=pool_settings.max_tasks_per_node,
task_slots_per_node=pool_settings.task_slots_per_node,
enable_inter_node_communication=pool_settings.
inter_node_communication_enabled,
start_task=batchmodels.StartTask(
Expand Down Expand Up @@ -1778,7 +1778,7 @@ def _construct_auto_pool_specification(
poolspec = batchmodels.PoolSpecification(
vm_size=pool.vm_size,
virtual_machine_configuration=pool.virtual_machine_configuration,
max_tasks_per_node=pool.max_tasks_per_node,
task_slots_per_node=pool.task_slots_per_node,
task_scheduling_policy=pool.task_scheduling_policy,
resize_timeout=pool.resize_timeout,
target_dedicated_nodes=pool.target_dedicated_nodes,
Expand Down Expand Up @@ -2783,9 +2783,9 @@ def _adjust_settings_for_pool_creation(config):
raise ValueError(
'vm_count dedicated should exceed 1 for glusterfs '
'on compute')
if pool.max_tasks_per_node > 1:
if pool.task_slots_per_node > 1:
raise ValueError(
'max_tasks_per_node cannot exceed 1 for glusterfs '
'task_slots_per_node cannot exceed 1 for glusterfs '
'on compute')
num_gluster += 1
try:
Expand Down
6 changes: 3 additions & 3 deletions convoy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
)
PoolSettings = collections.namedtuple(
'PoolSettings', [
'id', 'vm_size', 'vm_count', 'resize_timeout', 'max_tasks_per_node',
'id', 'vm_size', 'vm_count', 'resize_timeout', 'task_slots_per_node',
'inter_node_communication_enabled', 'vm_configuration',
'reboot_on_start_task_failed', 'attempt_recovery_on_unusable',
'block_until_all_global_resources_loaded',
Expand Down Expand Up @@ -1277,7 +1277,7 @@ def pool_settings(config):
:return: pool settings from specification
"""
conf = pool_specification(config)
max_tasks_per_node = _kv_read(conf, 'max_tasks_per_node', default=1)
task_slots_per_node = _kv_read(conf, 'task_slots_per_node', default=1)
resize_timeout = _kv_read_checked(conf, 'resize_timeout')
if util.is_not_empty(resize_timeout):
resize_timeout = util.convert_string_to_timedelta(resize_timeout)
Expand Down Expand Up @@ -1446,7 +1446,7 @@ def pool_settings(config):
vm_size=_pool_vm_size(config),
vm_count=_pool_vm_count(config),
resize_timeout=resize_timeout,
max_tasks_per_node=max_tasks_per_node,
task_slots_per_node=task_slots_per_node,
inter_node_communication_enabled=inter_node_communication_enabled,
vm_configuration=_populate_pool_vm_configuration(config),
reboot_on_start_task_failed=reboot_on_start_task_failed,
Expand Down
19 changes: 10 additions & 9 deletions docs/13-batch-shipyard-configuration-pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pool_specification:
vm_count:
dedicated: 4
low_priority: 8
max_tasks_per_node: 1
task_slots_per_node: 1
resize_timeout: 00:20:00
node_fill_type: pack
autoscale:
Expand Down Expand Up @@ -242,15 +242,15 @@ of nodes). The format for this property is a timedelta with a string
representation of "d.HH:mm:ss". "HH:mm:ss" is required, but "d" is optional,
if specified. If not specified, the default is 15 minutes. This should not
be specified (and is ignored) for `autoscale` enabled pools.
* (optional) `max_tasks_per_node` is the maximum number of concurrent tasks
that can be running at any one time on a compute node. This defaults to a
value of 1 if not specified. The maximum value for the property that Azure
Batch will accept is `4 x <# cores per compute node>`. For instance, for a
`STANDARD_F2` instance, because the virtual machine has 2 cores, the maximum
allowable value for this property would be `8`.
* (optional) `task_slots_per_node` is the maximum number of concurrent task
slots configured for a single compute node. This defaults to a value of `1`
if not specified. The maximum value for the property that Azure Batch will
accept is `4 x <# vCPUs per compute node>` or `256`. For instance, for a
`STANDARD_D2_V3` instance, because the virtual machine has 2 vCPUs, the
maximum allowable value for this property would be `8`.
* (optional) `node_fill_type` is the task scheduling compute node fill type
policy to apply. `pack`, which is the default, attempts to pack the
maximum number of tasks on a node (controlled through `max_tasks_per_node`
maximum number of tasks on a node (controlled through `task_slots_per_node`
before scheduling tasks to another node). `spread` will schedule tasks
evenly across compute nodes before packing.
* (optional) `autoscale` designates the autoscale settings for the pool. If
Expand Down Expand Up @@ -356,12 +356,13 @@ The default, if not specified, is `false`.
* (optional) `reboot_on_start_task_failed` allows Batch Shipyard to reboot the
compute node in case there is a transient failure in node preparation (e.g.,
network timeout, resolution failure or download problem). This defaults to
`false`. This option is ignored for `auto_pool` where the behvaior is always
`false`.
* (optional) `attempt_recovery_on_unusable` allows Batch Shipyard to attempt
to recover nodes that enter `unusable` state automatically. Note that
enabling this option can lead to infinite wait on `pool add` or `pool resize`
with `--wait`. This defaults to `false` and is ignored for `custom_image`
where the behavior is always `false`.
and `auto_pool` where the behavior is always `false`.
* (optional) `upload_diagnostics_logs_on_unusable` allows Batch Shipyard
to attempt upload of diagnostics logs for nodes that have entered unusable
state during provisioning to the storage account designated under the
Expand Down
8 changes: 4 additions & 4 deletions docs/14-batch-shipyard-configuration-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,15 @@ a set interval.
nodes provisioned.
* (optional) `run_exclusive` forces the job manager to run on a compute
node where there are no other tasks running. The default is `false`.
This is only relevant when the pool's `max_tasks_per_node` setting is
greater than 1.
This is only relevant when the pool's `task_slots_per_node` setting
is greater than 1.
* (optional) `monitor_task_completion` allows the job manager to
monitor the tasks in the job for completion instead of relying on
`auto_complete`. The advantage for doing so is that the job can move
much more quickly into completed state thus allowing the next job
recurrence to be created for very small values of
`recurrence_interval`. In order to properly utilize this feature,
you must either set your pool's `max_tasks_per_node` to greater
you must either set your pool's `task_slots_per_node` to greater
than 1 or have more than one compute node in your pool. If neither
of these conditions are met, then the tasks that the job manager
creates will be blocked as there will be no free scheduling slots
Expand Down Expand Up @@ -752,7 +752,7 @@ information and terminology definitions.
* (optional) `exclusive` specifies if each task within the task group
must not be co-scheduled with other running tasks on compute nodes.
Effectively this excludes pools as scheduling targets that have
been provisioned with the setting `max_tasks_per_node` greater
been provisioned with the setting `task_slots_per_node` greater
than `1`.
* (optional) `gpu` specifies if tasks within the task group should
be scheduled on a compute node that has a GPU. Note that specifying
Expand Down
2 changes: 1 addition & 1 deletion docs/68-batch-shipyard-federation.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ non-native and use the same setting consistently across all pools and all job
* `arm_image_id` under `vm_configuration`:`custom_image` will allow
routing of task groups with `custom_image_arm_id` constraints.
* `vm_size` will be impacted by `compute_node` job constraints.
* `max_tasks_per_node` will impact available scheduling slots and the
* `task_slots_per_node` will impact available scheduling slots and the
`compute_node`:`exclusive` constraint.
* `autoscale` changes behavior of scheduling across various constraints.
* `inter_node_communication` enabled pools will allow tasks that contain
Expand Down
4 changes: 2 additions & 2 deletions docs/96-troubleshooting-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ The `pool images update` command runs as a normal job if your pool is
comprised entirely of dedicated compute nodes. Thus, your compute
nodes must be able to accommodate this update job and task. If your pool only
has one node in it, it will run as a single task under a job. If the node in
this pool is busy and the `max_tasks_per_node` in your `pool.yaml` is either
this pool is busy and the `task_slots_per_node` in your `pool.yaml` is either
unspecified or set to 1, then it will be blocked behind the running task.

For pools with more than 1 node, then the update images command will run
Expand All @@ -199,7 +199,7 @@ the `pool images update` command is issued. If before the task can be
scheduled, the pool is resized down and the number of nodes decreases, then
the update container images job will not be able to execute and will stay
active until the number of compute nodes reaches the prior number.
Additionally, if `max_tasks_per_node` is set to 1 or unspecified in
Additionally, if `task_slots_per_node` is set to 1 or unspecified in
`pool.yaml` and any task is running on any node, the update container images
job will be blocked until that task completes.

Expand Down
Loading

0 comments on commit dca6081

Please sign in to comment.