Skip to content

Commit

Permalink
Split regular and handler results into their own queues (#69498) (#69730
Browse files Browse the repository at this point in the history
)

When mixed with the free strategy (or any custom strategy that does not behave in
a lock-step manner), the linear methodology of _wait_on_handler_results may cause
race conditions with regular task result processing if the strategy uses
_process_pending_results directly. This patch addresses that by splitting the queues
used for results and adding a flag to _process_pending_results to determine which
queue to check.

Fixes #69457

(cherry picked from commit a4072ad)

Co-authored-by: James Cammarata <jimi@sngx.net>
  • Loading branch information
mwhahaha and jimi-c committed Jun 17, 2020
1 parent bc550fd commit 4a5f9e8
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 10 deletions.
4 changes: 4 additions & 0 deletions changelogs/fragments/69457-free-strategy-handler-race.yml
@@ -0,0 +1,4 @@
bugfixes:
- Prevent a race condition when running handlers using a combination of the free strategy and include_role.
minor_changes:
- The results queue and counter for results are now split for standard / handler results. This allows the governing strategy to be truly independent from the handler strategy, which basically follows the linear methodology.
38 changes: 28 additions & 10 deletions lib/ansible/plugins/strategy/__init__.py
Expand Up @@ -42,6 +42,7 @@
from ansible.module_utils.six import iteritems, itervalues, string_types
from ansible.module_utils._text import to_text
from ansible.module_utils.connection import Connection, ConnectionError
from ansible.playbook.handler import Handler
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task_include import TaskInclude
Expand Down Expand Up @@ -85,7 +86,13 @@ def results_thread_main(strategy):
break
else:
strategy._results_lock.acquire()
strategy._results.append(result)
# only handlers have the listen attr, so this must be a handler
# we split up the results into two queues here to make sure
# handler and regular result processing don't cross wires
if 'listen' in result._task_fields:
strategy._handler_results.append(result)
else:
strategy._results.append(result)
strategy._results_lock.release()
except (IOError, EOFError):
break
Expand All @@ -96,7 +103,7 @@ def results_thread_main(strategy):
def debug_closure(func):
"""Closure to wrap ``StrategyBase._process_pending_results`` and invoke the task debugger"""
@functools.wraps(func)
def inner(self, iterator, one_pass=False, max_passes=None):
def inner(self, iterator, one_pass=False, max_passes=None, do_handlers=False):
status_to_stats_map = (
('is_failed', 'failures'),
('is_unreachable', 'dark'),
Expand All @@ -107,7 +114,7 @@ def inner(self, iterator, one_pass=False, max_passes=None):
# We don't know the host yet, copy the previous states, for lookup after we process new results
prev_host_states = iterator._host_states.copy()

results = func(self, iterator, one_pass=one_pass, max_passes=max_passes)
results = func(self, iterator, one_pass=one_pass, max_passes=max_passes, do_handlers=do_handlers)
_processed_results = []

for result in results:
Expand Down Expand Up @@ -187,6 +194,7 @@ def __init__(self, tqm):

# internal counters
self._pending_results = 0
self._pending_handler_results = 0
self._cur_worker = 0

# this dictionary is used to keep track of hosts that have
Expand All @@ -198,6 +206,7 @@ def __init__(self, tqm):
self._flushed_hosts = dict()

self._results = deque()
self._handler_results = deque()
self._results_lock = threading.Condition(threading.Lock())

# create the result processing thread for reading results in the background
Expand Down Expand Up @@ -377,7 +386,10 @@ def _queue_task(self, host, task, task_vars, play_context):
elif self._cur_worker == starting_worker:
time.sleep(0.0001)

self._pending_results += 1
if isinstance(task, Handler):
self._pending_handler_results += 1
else:
self._pending_results += 1
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
display.debug("got an error while queuing: %s" % e)
Expand Down Expand Up @@ -424,7 +436,7 @@ def _set_always_delegated_facts(self, result, task):
_set_host_facts(target_host, always_facts)

@debug_closure
def _process_pending_results(self, iterator, one_pass=False, max_passes=None):
def _process_pending_results(self, iterator, one_pass=False, max_passes=None, do_handlers=False):
'''
Reads results off the final queue and takes appropriate action
based on the result (executing callbacks, updating state, etc.).
Expand Down Expand Up @@ -477,7 +489,10 @@ def search_handler_blocks_by_name(handler_name, handler_blocks):
while True:
try:
self._results_lock.acquire()
task_result = self._results.popleft()
if do_handlers:
task_result = self._handler_results.popleft()
else:
task_result = self._results.popleft()
except IndexError:
break
finally:
Expand Down Expand Up @@ -696,7 +711,10 @@ def search_handler_blocks_by_name(handler_name, handler_blocks):
# finally, send the ok for this task
self._tqm.send_callback('v2_runner_on_ok', task_result)

self._pending_results -= 1
if do_handlers:
self._pending_handler_results -= 1
else:
self._pending_results -= 1
if original_host.name in self._blocked_hosts:
del self._blocked_hosts[original_host.name]

Expand Down Expand Up @@ -728,19 +746,19 @@ def _wait_on_handler_results(self, iterator, handler, notified_hosts):
handler_results = 0

display.debug("waiting for handler results...")
while (self._pending_results > 0 and
while (self._pending_handler_results > 0 and
handler_results < len(notified_hosts) and
not self._tqm._terminated):

if self._tqm.has_dead_workers():
raise AnsibleError("A worker was found in a dead state")

results = self._process_pending_results(iterator)
results = self._process_pending_results(iterator, do_handlers=True)
ret_results.extend(results)
handler_results += len([
r._host for r in results if r._host in notified_hosts and
r.task_name == handler.name])
if self._pending_results > 0:
if self._pending_handler_results > 0:
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)

display.debug("no more pending handlers, returning what we have")
Expand Down
3 changes: 3 additions & 0 deletions test/integration/targets/handler_race/aliases
@@ -0,0 +1,3 @@
shippable/posix/group4
handler_race
skip/aix
30 changes: 30 additions & 0 deletions test/integration/targets/handler_race/inventory
@@ -0,0 +1,30 @@
host001 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host002 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host003 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host004 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host005 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host006 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host007 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host008 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host009 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host010 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host011 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host012 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host013 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host014 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host015 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host016 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host017 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host018 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host019 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host020 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host021 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host022 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host023 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host024 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host025 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host026 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host027 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host028 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host029 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
host030 ansible_connection=local ansible_python_interpreter="{{ ansible_playbook_python }}"
@@ -0,0 +1,4 @@
---
# handlers file for do_handlers
- name: My Handler
shell: sleep 5
@@ -0,0 +1,9 @@
---
# tasks file for do_handlers
- name: Invoke handler
shell: sleep 1
notify:
- My Handler

- name: Flush handlers
meta: flush_handlers
@@ -0,0 +1,8 @@
---
# tasks file for more_sleep
- name: Random more sleep
set_fact:
more_sleep_time: "{{ 5 | random }}"

- name: Moar sleep
shell: sleep "{{ more_sleep_time }}"
@@ -0,0 +1,8 @@
---
# tasks file for random_sleep
- name: Generate sleep time
set_fact:
sleep_time: "{{ 60 | random }}"

- name: Do random sleep
shell: sleep "{{ sleep_time }}"
6 changes: 6 additions & 0 deletions test/integration/targets/handler_race/runme.sh
@@ -0,0 +1,6 @@
#!/usr/bin/env bash

set -eux

ansible-playbook test_handler_race.yml -i inventory -v "$@"

10 changes: 10 additions & 0 deletions test/integration/targets/handler_race/test_handler_race.yml
@@ -0,0 +1,10 @@
- hosts: all
gather_facts: no
strategy: free
tasks:
- include_role:
name: random_sleep
- include_role:
name: do_handlers
- include_role:
name: more_sleep

0 comments on commit 4a5f9e8

Please sign in to comment.