From 01cd17b787a7e0ad721d6724baa248744cf851b4 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 13 Aug 2019 10:51:42 -0500 Subject: [PATCH] Changing forks keyword to throttle and adding some more docs --- changelogs/fragments/throttle_feature.yml | 2 + docs/docsite/keyword_desc.yml | 2 +- .../rst/user_guide/playbooks_strategies.rst | 6 + lib/ansible/playbook/base.py | 2 +- lib/ansible/plugins/strategy/__init__.py | 39 ++- lib/ansible/plugins/strategy/free.py | 43 ++-- test/integration/targets/forks/runme.sh | 7 - test/integration/targets/forks/test_forks.yml | 229 ------------------ .../targets/{forks => throttle}/aliases | 0 .../targets/{forks => throttle}/inventory | 0 test/integration/targets/throttle/runme.sh | 7 + .../targets/throttle/test_throttle.py | 33 +++ .../targets/throttle/test_throttle.yml | 59 +++++ .../plugins/strategy/test_strategy_base.py | 2 +- 14 files changed, 164 insertions(+), 267 deletions(-) create mode 100644 changelogs/fragments/throttle_feature.yml delete mode 100755 test/integration/targets/forks/runme.sh delete mode 100644 test/integration/targets/forks/test_forks.yml rename test/integration/targets/{forks => throttle}/aliases (100%) rename test/integration/targets/{forks => throttle}/inventory (100%) create mode 100755 test/integration/targets/throttle/runme.sh create mode 100755 test/integration/targets/throttle/test_throttle.py create mode 100644 test/integration/targets/throttle/test_throttle.yml diff --git a/changelogs/fragments/throttle_feature.yml b/changelogs/fragments/throttle_feature.yml new file mode 100644 index 00000000000000..1e7595ae4fe0a3 --- /dev/null +++ b/changelogs/fragments/throttle_feature.yml @@ -0,0 +1,2 @@ +minor_changes: + - Added new `throttle` keyword, which can be used at the task, block, or play level to limit the number of workers (up to the specified forks or serial setting) allowed. diff --git a/docs/docsite/keyword_desc.yml b/docs/docsite/keyword_desc.yml index b6f5d60e0b388c..af78d97fc3fb41 100644 --- a/docs/docsite/keyword_desc.yml +++ b/docs/docsite/keyword_desc.yml @@ -31,7 +31,6 @@ environment: A dictionary that gets converted into environment vars to be provid fact_path: Set the fact path option for the fact gathering plugin controlled by :term:`gather_facts`. failed_when: "Conditional expression that overrides the task's normal 'failed' status." force_handlers: Will force notified handler execution for hosts even if they failed during the play. Will not trigger if the play itself fails. -forks: Limit number of concurrent task runs on task, block and playbook level. gather_facts: "A boolean that controls if the play will automatically run the 'setup' task to gather facts for the hosts." gather_subset: Allows you to pass subset options to the fact gathering plugin controlled by :term:`gather_facts`. gather_timeout: Allows you to set the timeout for the fact gathering plugin controlled by :term:`gather_facts`. @@ -69,6 +68,7 @@ serial: | strategy: Allows you to choose the connection plugin to use for the play. tags: Tags applied to the task or included tasks, this allows selecting subsets of tasks from the command line. tasks: Main list of tasks to execute in the play, they run after :term:`roles` and before :term:`post_tasks`. +throttle: Limit number of concurrent task runs on task, block and playbook level. This is independent of the forks and serial settings, but cannot be set higher than those limits. For example, if forks is set to 10 and the throttle is set to 15, at most 10 hosts will be operated on in parallel. until: "This keyword implies a ':term:`retries` loop' that will go on until the condition supplied here is met or we hit the :term:`retries` limit." vars: Dictionary/map of variables vars_files: List of files that contain vars to include in the play. diff --git a/docs/docsite/rst/user_guide/playbooks_strategies.rst b/docs/docsite/rst/user_guide/playbooks_strategies.rst index c053dbefb01254..be40d4f0f0155f 100644 --- a/docs/docsite/rst/user_guide/playbooks_strategies.rst +++ b/docs/docsite/rst/user_guide/playbooks_strategies.rst @@ -38,6 +38,12 @@ Using keywords to control execution ----------------------------------- Several play-level :ref:`keyword` also affect play execution. The most common one is ``serial``, which sets a number, a percentage, or a list of numbers of hosts you want to manage at a time. Setting ``serial`` with any strategy directs Ansible to 'batch' the hosts, completing the play on the specified number or percentage of hosts before starting the next 'batch'. This is especially useful for :ref:`rolling updates`. +The second keyword to affect execution is ``throttle``, which can also be used at the block and task level. This keyword limits the number of workers up to the maximum set via the forks setting or ``serial``. This can be useful in restricting tasks that may be CPU-intensive or interact with a rate-limiting API:: + + tasks: + - command: /path/to/cpu_intensive_command + throttle: 1 + Other keywords that affect play execution include ``ignore_errors``, ``ignore_unreachable``, and ``any_errors_fatal``. Please note that these keywords are not strategies. They are play-level directives or options. .. seealso:: diff --git a/lib/ansible/playbook/base.py b/lib/ansible/playbook/base.py index e57611450875ce..55c240f3ff46a2 100644 --- a/lib/ansible/playbook/base.py +++ b/lib/ansible/playbook/base.py @@ -612,7 +612,7 @@ class Base(FieldAttributeBase): _check_mode = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('check')) _diff = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('diff')) _any_errors_fatal = FieldAttribute(isa='bool', default=C.ANY_ERRORS_FATAL) - _forks = FieldAttribute(isa='list', extend=True, prepend=True, static=True) + _throttle = FieldAttribute(isa='int', default=0) # explicitly invoke a debugger on tasks _debugger = FieldAttribute(isa='string') diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index f155e2467717ed..09a1e0355fe015 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -155,6 +155,11 @@ class StrategyBase: code useful to all strategies like running handlers, cleanup actions, etc. ''' + # by default, strategies should support throttling but we allow individual + # strategies to disable this and either forego supporting it or managing + # the throttling internally (as `free` does) + ALLOW_BASE_THROTTLING = True + def __init__(self, tqm): self._tqm = tqm self._inventory = tqm.get_inventory() @@ -310,19 +315,19 @@ def _queue_task(self, host, task, task_vars, play_context): display.debug('Creating lock for %s' % task.action) action_write_locks.action_write_locks[task.action] = Lock() + # create a templar and template things we need later for the queuing process + templar = Templar(loader=self._loader, variables=task_vars) + + try: + throttle = int(templar.template(task.throttle)) + except Exception as e: + raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e) + # and then queue the new task try: queued = False starting_worker = self._cur_worker while True: - if len(task.forks) > 0 and task._parent._play.strategy != "free": - if task.run_once: - display.debug("Ignoring 'forks' as 'run_once' is also set for '%s'" % task.get_name()) - else: - forks = min(task.forks) - display.debug("task: %s, forks: %d" % (task.get_name(), forks)) - if forks > 0 and self._cur_worker >= forks: - self._cur_worker = 0 worker_prc = self._workers[self._cur_worker] if worker_prc is None or not worker_prc.is_alive(): self._queued_task_cache[(host.name, task._uuid)] = { @@ -338,9 +343,25 @@ def _queue_task(self, host, task, task_vars, play_context): worker_prc.start() display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers))) queued = True + self._cur_worker += 1 - if self._cur_worker >= len(self._workers): + + # Determine the "rewind point" of the worker list. This means we start + # iterating over the list of workers until the end of the list is found. + # Normally, that is simply the length of the workers list (as determined + # by the forks or serial setting), however a task/block/play may "throttle" + # that limit down. + rewind_point = len(self._workers) + if throttle > 0 and self.ALLOW_BASE_THROTTLING: + if task.run_once: + display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name()) + else: + if throttle <= rewind_point: + display.debug("task: %s, throttle: %d" % (task.get_name(), throttle)) + rewind_point = throttle + if self._cur_worker >= rewind_point: self._cur_worker = 0 + if queued: break elif self._cur_worker == starting_worker: diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py index af6631d7cdf02c..264c0e109569cb 100644 --- a/lib/ansible/plugins/strategy/free.py +++ b/lib/ansible/plugins/strategy/free.py @@ -47,6 +47,9 @@ class StrategyModule(StrategyBase): + # This strategy manages throttling on its own, so we don't want it done in queue_task + ALLOW_BASE_THROTTLING = False + def _filter_notified_hosts(self, notified_hosts): ''' Filter notified hosts accordingly to strategy @@ -117,21 +120,31 @@ def run(self, iterator, play_context): display.debug("this host has work to do", host=host_name) - max_tasks_reached = False - if len(task.forks) > 0: - forks = min(task.forks) - if forks > 0: + # check to see if this host is blocked (still executing a previous task) + if (host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]): + + display.debug("getting variables", host=host_name) + task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task, + _hosts=self._hosts_cache, + _hosts_all=self._hosts_cache_all) + self.add_tqm_variables(task_vars, play=iterator._play) + templar = Templar(loader=self._loader, variables=task_vars) + display.debug("done getting variables", host=host_name) + + try: + throttle = int(templar.template(task.throttle)) + except Exception as e: + raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e) + + if throttle > 0: same_tasks = 0 for worker in self._workers: if worker and worker.is_alive() and worker._task._uuid == task._uuid: same_tasks += 1 display.debug("task: %s, same_tasks: %d" % (task.get_name(), same_tasks)) - if same_tasks >= forks: - max_tasks_reached = True - - # check to see if this host is blocked (still executing a previous task) - if (host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]) and not max_tasks_reached: + if same_tasks >= throttle: + break # pop the task, mark the host blocked, and queue it self._blocked_hosts[host_name] = True @@ -144,14 +157,6 @@ def run(self, iterator, play_context): # corresponding action plugin action = None - display.debug("getting variables", host=host_name) - task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task, - _hosts=self._hosts_cache, - _hosts_all=self._hosts_cache_all) - self.add_tqm_variables(task_vars, play=iterator._play) - templar = Templar(loader=self._loader, variables=task_vars) - display.debug("done getting variables", host=host_name) - try: task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') display.debug("done templating", host=host_name) @@ -166,8 +171,8 @@ def run(self, iterator, play_context): raise AnsibleError("The '%s' module bypasses the host loop, which is currently not supported in the free strategy " "and would instead execute for every host in the inventory list." % task.action, obj=task._ds) else: - display.debug("Using run_once with the free strategy is not currently supported. This task will still be " - "executed for every host in the inventory list.") + display.warning("Using run_once with the free strategy is not currently supported. This task will still be " + "executed for every host in the inventory list.") # check to see if this task should be skipped, due to it being a member of a # role which has already run (and whether that role allows duplicate execution) diff --git a/test/integration/targets/forks/runme.sh b/test/integration/targets/forks/runme.sh deleted file mode 100755 index daf18b150a2920..00000000000000 --- a/test/integration/targets/forks/runme.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -set -eux - -# https://github.com/ansible/ansible/pull/42528 -ANSIBLE_STRATEGY='linear' ansible-playbook test_forks.yml -vv -i inventory --forks 12 "$@" -ANSIBLE_STRATEGY='free' ansible-playbook test_forks.yml -vv -i inventory --forks 12 "$@" diff --git a/test/integration/targets/forks/test_forks.yml b/test/integration/targets/forks/test_forks.yml deleted file mode 100644 index 935acc6fadd277..00000000000000 --- a/test/integration/targets/forks/test_forks.yml +++ /dev/null @@ -1,229 +0,0 @@ ---- -- hosts: localhosts - gather_facts: false - vars: - forksdir: ~/ansible_testing/forks.dir/ - tasks: - - name: Clean forksdir '{{ forksdir }}' - file: - state: absent - path: '{{ forksdir }}' - ignore_errors: yes - run_once: yes - - name: Create forksdir '{{ forksdir }}' - file: - state: directory - path: '{{ forksdir }}' - run_once: yes - - block: - - name: "Test 1 (max forks: 3)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}1/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 3 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - #run_once: true - forks: 3 - - block: - - name: "Test 2 (max forks: 5)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}2/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 5 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - forks: 5 - - block: - - name: "Test 3 (max forks: 6)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}3/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 6 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - forks: 8 - forks: 6 - - block: - - block: - - name: "Test 4 (max forks: 8)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}4/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 8 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - forks: 10 - forks: 8 - forks: 12 - forks: 15 - - block: - - name: "Test 1 (max forks: 3)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}5/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 3 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - #run_once: true - forks: 3 - - block: - - name: "Test 2 (max forks: 5)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}6/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 5 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - forks: 5 - - block: - - name: "Test 3 (max forks: 6)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}7/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 6 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - forks: 8 - forks: 6 - - block: - - block: - - name: "Test 4 (max forks: 8)" - command: "{{ ansible_python_interpreter }}" - args: - stdin: | - import os, time - forksdir ='{{ forksdir | expanduser }}8/' - forksfile = forksdir + '{{ inventory_hostname }}' - max_forks = 8 - try: - os.mkdir(forksdir) - except OSError: - pass - try: - with(open(forksfile, 'a')): - os.utime(forksfile, None) - time.sleep(0.5) - forkslist = os.listdir(forksdir) - print("tasks: %d/%d" % (len(forkslist), max_forks)) - if len(forkslist) > max_forks: - raise ValueError("Too many concurrent tasks: %d/%d" % (len(forkslist), max_forks)) - time.sleep(0.5) - finally: - os.unlink(forksfile) - forks: 10 - forks: 8 - forks: 12 - forks: 15 diff --git a/test/integration/targets/forks/aliases b/test/integration/targets/throttle/aliases similarity index 100% rename from test/integration/targets/forks/aliases rename to test/integration/targets/throttle/aliases diff --git a/test/integration/targets/forks/inventory b/test/integration/targets/throttle/inventory similarity index 100% rename from test/integration/targets/forks/inventory rename to test/integration/targets/throttle/inventory diff --git a/test/integration/targets/throttle/runme.sh b/test/integration/targets/throttle/runme.sh new file mode 100755 index 00000000000000..3a2a3da5331b54 --- /dev/null +++ b/test/integration/targets/throttle/runme.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -eux + +# https://github.com/ansible/ansible/pull/42528 +ANSIBLE_STRATEGY='linear' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@" +ANSIBLE_STRATEGY='free' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@" diff --git a/test/integration/targets/throttle/test_throttle.py b/test/integration/targets/throttle/test_throttle.py new file mode 100755 index 00000000000000..86f253cc9ff56f --- /dev/null +++ b/test/integration/targets/throttle/test_throttle.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python + +import os +import sys +import time + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +# read the args from sys.argv +throttledir, inventory_hostname, max_throttle = sys.argv[1:] +# format/create additional vars +max_throttle = int(max_throttle) +throttledir = os.path.expanduser(throttledir) +throttlefile = os.path.join(throttledir, inventory_hostname) +try: + # create the file + with(open(throttlefile, 'a')): + os.utime(throttlefile, None) + # count the number of files in the dir + throttlelist = os.listdir(throttledir) + print("tasks: %d/%d" % (len(throttlelist), max_throttle)) + # if we have too many files, fail + if len(throttlelist) > max_throttle: + print(throttlelist) + raise ValueError("Too many concurrent tasks: %d/%d" % (len(throttlelist), max_throttle)) +finally: + # remove the file, then wait to make sure it's gone + os.unlink(throttlefile) + while True: + if not os.path.exists(throttlefile): + break + time.sleep(0.1) diff --git a/test/integration/targets/throttle/test_throttle.yml b/test/integration/targets/throttle/test_throttle.yml new file mode 100644 index 00000000000000..260ebcdc03f8c8 --- /dev/null +++ b/test/integration/targets/throttle/test_throttle.yml @@ -0,0 +1,59 @@ +--- +- hosts: localhosts + gather_facts: false + vars: + throttledir: ~/ansible_testing/throttle.dir/ + tasks: + - name: Clean throttledir '{{ throttledir }}' + file: + state: absent + path: '{{ throttledir }}' + ignore_errors: yes + run_once: yes + - name: Create throttledir '{{ throttledir }}' + file: + state: directory + path: '{{ throttledir }}' + run_once: yes + - block: + - name: "Test 1 (max throttle: 3)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3" + throttle: 3 + - block: + - name: "Test 2 (max throttle: 5)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5" + throttle: 5 + - block: + - name: "Test 3 (max throttle: 8)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8" + throttle: 8 + throttle: 6 + - block: + - block: + - name: "Test 4 (max throttle: 8)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8" + throttle: 8 + throttle: 6 + throttle: 12 + throttle: 15 + - block: + - name: "Test 1 (max throttle: 3)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3" + throttle: 3 + - block: + - name: "Test 2 (max throttle: 5)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5" + throttle: 5 + - block: + - name: "Test 3 (max throttle: 6)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 6" + throttle: 6 + throttle: 3 + - block: + - block: + - name: "Test 4 (max throttle: 8)" + script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8" + throttle: 8 + throttle: 6 + throttle: 4 + throttle: 2 diff --git a/test/units/plugins/strategy/test_strategy_base.py b/test/units/plugins/strategy/test_strategy_base.py index 210cf76ee1672e..33776ba54f1f21 100644 --- a/test/units/plugins/strategy/test_strategy_base.py +++ b/test/units/plugins/strategy/test_strategy_base.py @@ -194,7 +194,7 @@ def fake_run(self): variable_manager=mock_var_manager, loader=fake_loader, passwords=None, - forks=5, + forks=3, ) tqm._initialize_processes(3) tqm.hostvars = dict()