Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Changing forks keyword to throttle and adding some more docs
  • Loading branch information
jimi-c committed Aug 28, 2019
1 parent c966df5 commit 01cd17b
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 267 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/throttle_feature.yml
@@ -0,0 +1,2 @@
minor_changes:
- Added new `throttle` keyword, which can be used at the task, block, or play level to limit the number of workers (up to the specified forks or serial setting) allowed.
2 changes: 1 addition & 1 deletion docs/docsite/keyword_desc.yml
Expand Up @@ -31,7 +31,6 @@ environment: A dictionary that gets converted into environment vars to be provid
fact_path: Set the fact path option for the fact gathering plugin controlled by :term:`gather_facts`.
failed_when: "Conditional expression that overrides the task's normal 'failed' status."
force_handlers: Will force notified handler execution for hosts even if they failed during the play. Will not trigger if the play itself fails.
forks: Limit number of concurrent task runs on task, block and playbook level.
gather_facts: "A boolean that controls if the play will automatically run the 'setup' task to gather facts for the hosts."
gather_subset: Allows you to pass subset options to the fact gathering plugin controlled by :term:`gather_facts`.
gather_timeout: Allows you to set the timeout for the fact gathering plugin controlled by :term:`gather_facts`.
Expand Down Expand Up @@ -69,6 +68,7 @@ serial: |
strategy: Allows you to choose the connection plugin to use for the play.
tags: Tags applied to the task or included tasks, this allows selecting subsets of tasks from the command line.
tasks: Main list of tasks to execute in the play, they run after :term:`roles` and before :term:`post_tasks`.
throttle: Limit number of concurrent task runs on task, block and playbook level. This is independent of the forks and serial settings, but cannot be set higher than those limits. For example, if forks is set to 10 and the throttle is set to 15, at most 10 hosts will be operated on in parallel.
until: "This keyword implies a ':term:`retries` loop' that will go on until the condition supplied here is met or we hit the :term:`retries` limit."
vars: Dictionary/map of variables
vars_files: List of files that contain vars to include in the play.
Expand Down
6 changes: 6 additions & 0 deletions docs/docsite/rst/user_guide/playbooks_strategies.rst
Expand Up @@ -38,6 +38,12 @@ Using keywords to control execution
-----------------------------------
Several play-level :ref:`keyword<playbook_keywords>` also affect play execution. The most common one is ``serial``, which sets a number, a percentage, or a list of numbers of hosts you want to manage at a time. Setting ``serial`` with any strategy directs Ansible to 'batch' the hosts, completing the play on the specified number or percentage of hosts before starting the next 'batch'. This is especially useful for :ref:`rolling updates<rolling_update_batch_size>`.

The second keyword to affect execution is ``throttle``, which can also be used at the block and task level. This keyword limits the number of workers up to the maximum set via the forks setting or ``serial``. This can be useful in restricting tasks that may be CPU-intensive or interact with a rate-limiting API::

tasks:
- command: /path/to/cpu_intensive_command
throttle: 1

Other keywords that affect play execution include ``ignore_errors``, ``ignore_unreachable``, and ``any_errors_fatal``. Please note that these keywords are not strategies. They are play-level directives or options.

.. seealso::
Expand Down
2 changes: 1 addition & 1 deletion lib/ansible/playbook/base.py
Expand Up @@ -612,7 +612,7 @@ class Base(FieldAttributeBase):
_check_mode = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('check'))
_diff = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('diff'))
_any_errors_fatal = FieldAttribute(isa='bool', default=C.ANY_ERRORS_FATAL)
_forks = FieldAttribute(isa='list', extend=True, prepend=True, static=True)
_throttle = FieldAttribute(isa='int', default=0)

# explicitly invoke a debugger on tasks
_debugger = FieldAttribute(isa='string')
Expand Down
39 changes: 30 additions & 9 deletions lib/ansible/plugins/strategy/__init__.py
Expand Up @@ -155,6 +155,11 @@ class StrategyBase:
code useful to all strategies like running handlers, cleanup actions, etc.
'''

# by default, strategies should support throttling but we allow individual
# strategies to disable this and either forego supporting it or managing
# the throttling internally (as `free` does)
ALLOW_BASE_THROTTLING = True

def __init__(self, tqm):
self._tqm = tqm
self._inventory = tqm.get_inventory()
Expand Down Expand Up @@ -310,19 +315,19 @@ def _queue_task(self, host, task, task_vars, play_context):
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = Lock()

# create a templar and template things we need later for the queuing process
templar = Templar(loader=self._loader, variables=task_vars)

try:
throttle = int(templar.template(task.throttle))
except Exception as e:
raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)

# and then queue the new task
try:
queued = False
starting_worker = self._cur_worker
while True:
if len(task.forks) > 0 and task._parent._play.strategy != "free":
if task.run_once:
display.debug("Ignoring 'forks' as 'run_once' is also set for '%s'" % task.get_name())
else:
forks = min(task.forks)
display.debug("task: %s, forks: %d" % (task.get_name(), forks))
if forks > 0 and self._cur_worker >= forks:
self._cur_worker = 0
worker_prc = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive():
self._queued_task_cache[(host.name, task._uuid)] = {
Expand All @@ -338,9 +343,25 @@ def _queue_task(self, host, task, task_vars, play_context):
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True

self._cur_worker += 1
if self._cur_worker >= len(self._workers):

# Determine the "rewind point" of the worker list. This means we start
# iterating over the list of workers until the end of the list is found.
# Normally, that is simply the length of the workers list (as determined
# by the forks or serial setting), however a task/block/play may "throttle"
# that limit down.
rewind_point = len(self._workers)
if throttle > 0 and self.ALLOW_BASE_THROTTLING:
if task.run_once:
display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name())
else:
if throttle <= rewind_point:
display.debug("task: %s, throttle: %d" % (task.get_name(), throttle))
rewind_point = throttle
if self._cur_worker >= rewind_point:
self._cur_worker = 0

if queued:
break
elif self._cur_worker == starting_worker:
Expand Down
43 changes: 24 additions & 19 deletions lib/ansible/plugins/strategy/free.py
Expand Up @@ -47,6 +47,9 @@

class StrategyModule(StrategyBase):

# This strategy manages throttling on its own, so we don't want it done in queue_task
ALLOW_BASE_THROTTLING = False

def _filter_notified_hosts(self, notified_hosts):
'''
Filter notified hosts accordingly to strategy
Expand Down Expand Up @@ -117,21 +120,31 @@ def run(self, iterator, play_context):

display.debug("this host has work to do", host=host_name)

max_tasks_reached = False
if len(task.forks) > 0:
forks = min(task.forks)
if forks > 0:
# check to see if this host is blocked (still executing a previous task)
if (host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]):

display.debug("getting variables", host=host_name)
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
_hosts=self._hosts_cache,
_hosts_all=self._hosts_cache_all)
self.add_tqm_variables(task_vars, play=iterator._play)
templar = Templar(loader=self._loader, variables=task_vars)
display.debug("done getting variables", host=host_name)

try:
throttle = int(templar.template(task.throttle))
except Exception as e:
raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)

if throttle > 0:
same_tasks = 0
for worker in self._workers:
if worker and worker.is_alive() and worker._task._uuid == task._uuid:
same_tasks += 1

display.debug("task: %s, same_tasks: %d" % (task.get_name(), same_tasks))
if same_tasks >= forks:
max_tasks_reached = True

# check to see if this host is blocked (still executing a previous task)
if (host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]) and not max_tasks_reached:
if same_tasks >= throttle:
break

# pop the task, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
Expand All @@ -144,14 +157,6 @@ def run(self, iterator, play_context):
# corresponding action plugin
action = None

display.debug("getting variables", host=host_name)
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
_hosts=self._hosts_cache,
_hosts_all=self._hosts_cache_all)
self.add_tqm_variables(task_vars, play=iterator._play)
templar = Templar(loader=self._loader, variables=task_vars)
display.debug("done getting variables", host=host_name)

try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating", host=host_name)
Expand All @@ -166,8 +171,8 @@ def run(self, iterator, play_context):
raise AnsibleError("The '%s' module bypasses the host loop, which is currently not supported in the free strategy "
"and would instead execute for every host in the inventory list." % task.action, obj=task._ds)
else:
display.debug("Using run_once with the free strategy is not currently supported. This task will still be "
"executed for every host in the inventory list.")
display.warning("Using run_once with the free strategy is not currently supported. This task will still be "
"executed for every host in the inventory list.")

# check to see if this task should be skipped, due to it being a member of a
# role which has already run (and whether that role allows duplicate execution)
Expand Down
7 changes: 0 additions & 7 deletions test/integration/targets/forks/runme.sh

This file was deleted.

0 comments on commit 01cd17b

Please sign in to comment.