Skip to content
Permalink
Browse files

Defer task_vars creation to WorkerProcess when possible.

Ansible calls VariableManager.get_vars() prior to forking each task in
order to template the task's title for the "v2_playbook_on_task_start"
plugin callback, and to template the "run_once" playbook setting if it
exists.

This wraps get_vars() in a caching factory, taking care to avoid calling
it when neither the task title nor run_once contain a template, avoiding
a major bottleneck in the top-level process when possible.

The factory produces the same dict instance on each call, so the dict's
lifetime and any changes made to it propagate identically to how things
worked previously.

For DebOps common.yml, 90% of tasks can avoid calling get_vars() in the
top-level process, delaying it until TaskExecutor starts in the worker,
effectively parallelizing get_vars() for most tasks.

The net effect is that in a 10 target run against Ansible HEAD on an 8
vCPU controller, common.yml runtime drops from 4m22s to 2m40s (-38.9%).

This will be even more pronounced with larger runs -- the top-level
process spends about one third of its time asleep, which leaves plenty
of room to be kept busy with more targets.

process/worker.py:
    Receive a factory function rather than a concrete dict. Invoke the
    factory during run(), i.e. post-fork.

strategy/__init__.py:
    * Add get_vars_factory() to produce the factory.
    * add_tqm_variables() is needlessly inflexible requiring a dict as
      input, change it to get_tqm_variables()
    * _queue_task() receives the factory rather than a dict.

strategy/free.py:
strategy/linear.py:
    * Use factory rather than task_vars dict.
    * Restructure "run_once" and callback logic to only invoke the
      factory if necessary.

template/__init__.py:
    * Rename _clean_regex to better reflect what it matches.
    * Add is_template_uncached() to avoid invoking Jinja2 compiler in
      the majority of cases. This can't become default since some code
      relies on is_template() to ensure a compiled template is cached
      pre-fork in the top-level process.
  • Loading branch information...
dw committed May 17, 2018
1 parent 133695e commit 7e1f88c1cb3d823cc36cf2a80aa839cffbff73b6
@@ -58,12 +58,12 @@ class WorkerProcess(multiprocessing.Process):
for reading later.
'''

def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
def __init__(self, rslt_q, vars_factory, host, task, play_context, loader, variable_manager, shared_loader_obj):

super(WorkerProcess, self).__init__()
# takes a task queue manager as the sole param:
self._rslt_q = rslt_q
self._task_vars = task_vars
self._vars_factory = vars_factory
self._host = host
self._task = task
self._play_context = play_context
@@ -101,7 +101,7 @@ def _run_main(self):
executor_result = TaskExecutor(
self._host,
self._task,
self._task_vars,
self._vars_factory(),
self._play_context,
self._new_stdin,
self._loader,
@@ -121,7 +121,7 @@ def inner(self, iterator, one_pass=False, max_passes=None):
task = result._task
host = result._host
_queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None)
task_vars = _queued_task_args['task_vars']
vars_factory = _queued_task_args['vars_factory']
play_context = _queued_task_args['play_context']
# Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state
try:
@@ -131,7 +131,7 @@ def inner(self, iterator, one_pass=False, max_passes=None):

while result.needs_debugger(globally_enabled=self.debugger_active):
next_action = NextAction()
dbg = Debugger(task, host, task_vars, play_context, result, next_action)
dbg = Debugger(task, host, vars_factory(), play_context, result, next_action)
dbg.cmdloop()

if next_action.result == NextAction.REDO:
@@ -144,7 +144,7 @@ def inner(self, iterator, one_pass=False, max_passes=None):
self._tqm._stats.decrement('ok', host.name)

# redo
self._queue_task(host, task, task_vars, play_context)
self._queue_task(host, task, vars_factory, play_context)

_processed_results.extend(debug_closure(func)(self, iterator, one_pass))
break
@@ -264,15 +264,40 @@ def get_hosts_remaining(self, play):
def get_failed_hosts(self, play):
return [host for host in self._inventory.get_hosts(play.hosts) if host.name in self._tqm._failed_hosts]

def add_tqm_variables(self, vars, play):
def get_vars_factory(self, play, **kwargs):
'''
Base class method to add extra variables/information to the list of task
vars sent through the executor engine regarding the task queue manager state.
Return a factory that generates task variables on first invocation, and
caches the result for subsequent invocations. This allows delaying
an exensive get_vars() call hopefully until its first use occurs within
a Worker, rather than in the top-level process.
:param ansible.playbook.play.Play play:
Associated play.
:returns:
Factory callable as `factory()`, returning a task variables dict.
'''
vars['ansible_current_hosts'] = [h.name for h in self.get_hosts_remaining(play)]
vars['ansible_failed_hosts'] = [h.name for h in self.get_failed_hosts(play)]
box = [None]

def _queue_task(self, host, task, task_vars, play_context):
def closure():
if not box[0]:
task_vars = self._variable_manager.get_vars(play=play, **kwargs)
task_vars.update(self.get_tqm_variables(play))
box[0] = task_vars
return box[0]

return closure

def get_tqm_variables(self, play):
'''
Return extra variables/information to the list of task vars sent
through the executor engine regarding the task queue manager state.
'''
return {
'ansible_current_hosts': [h.name for h in self.get_hosts_remaining(play)],
'ansible_failed_hosts': [h.name for h in self.get_failed_hosts(play)]
}

def _queue_task(self, host, task, vars_factory, play_context):
''' handles queueing the task up to be sent to a worker '''

display.debug("entering _queue_task() for %s/%s" % (host.name, task.action))
@@ -307,11 +332,11 @@ def _queue_task(self, host, task, task_vars, play_context):
self._queued_task_cache[(host.name, task._uuid)] = {
'host': host,
'task': task,
'task_vars': task_vars,
'vars_factory': vars_factory,
'play_context': play_context
}

worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
worker_prc = WorkerProcess(self._final_q, vars_factory, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
@@ -864,9 +889,8 @@ def _do_handler_run(self, handler, handler_name, iterator, play_context, notifie
host_results = []
for host in notified_hosts:
if not handler.has_triggered(host) and (not iterator.is_failed(host) or play_context.force_handlers):
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=handler)
self.add_tqm_variables(task_vars, play=iterator._play)
self._queue_task(host, handler, task_vars, play_context)
vars_factory = self.get_vars_factory(iterator._play, host=host, task=handler)
self._queue_task(host, handler, vars_factory, play_context)
if run_once:
break

@@ -115,25 +115,31 @@ def run(self, iterator, play_context):
action = None

display.debug("getting variables", host=host_name)
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task)
self.add_tqm_variables(task_vars, play=iterator._play)
templar = Templar(loader=self._loader, variables=task_vars)
vars_factory = self.get_vars_factory(iterator._play, host=host, task=task)
templar = Templar(loader=self._loader)
display.debug("done getting variables", host=host_name)

try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating", host=host_name)
except:
# just ignore any errors during task name templating,
# we don't care if it just shows the raw name
display.debug("templating failed for some reason", host=host_name)

run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False)
if run_once:
if action and getattr(action, 'BYPASS_HOST_LOOP', False):
raise AnsibleError("The '%s' module bypasses the host loop, which is currently not supported in the free strategy "
"and would instead execute for every host in the inventory list." % task.action, obj=task._ds)
if templar.is_template_uncached(task.name):
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating", host=host_name)
except:
# just ignore any errors during task name templating,
# we don't care if it just shows the raw name
display.debug("templating failed for some reason", host=host_name)

if getattr(action, 'BYPASS_HOST_LOOP', False):
raise AnsibleError("The '%s' module bypasses the host loop, which is currently not supported in the free strategy "
"and would instead execute for every host in the inventory list." % task.action, obj=task._ds)

if task.run_once:
if templar.is_template_uncached(task.run_once):
templar.set_available_variables(vars_factory())
run_once = templar.template(task.run_once)
else:
run_once = task.run_once

if run_once:
display.warning("Using run_once with the free strategy is not currently supported. This task will still be "
"executed for every host in the inventory list.")

@@ -157,8 +163,8 @@ def run(self, iterator, play_context):
display.warning("Using any_errors_fatal with the free strategy is not supported, "
"as tasks are executed independently on each host")
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
self._queue_task(host, task, task_vars, play_context)
del task_vars
self._queue_task(host, task, vars_factory, play_context)
del vars_factory
else:
display.debug("%s is blocked, skipping for now" % host_name)

@@ -275,36 +275,43 @@ def run(self, iterator, play_context):
break

display.debug("getting variables")
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task)
self.add_tqm_variables(task_vars, play=iterator._play)
templar = Templar(loader=self._loader, variables=task_vars)
vars_factory = self.get_vars_factory(iterator._play, host=host, task=task)
templar = Templar(loader=self._loader)
display.debug("done getting variables")

run_once = templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False)
run_once = getattr(action, 'BYPASS_HOST_LOOP', False)
if task.run_once and not run_once:
if templar.is_template_uncached(task.run_once):
templar.set_available_variables(vars_factory())
run_once = templar.template(task.run_once)
else:
run_once = task.run_once

if (task.any_errors_fatal or run_once) and not task.ignore_errors:
any_errors_fatal = True

if not callback_sent:
display.debug("sending task start callback, copying the task so we can template it temporarily")
saved_name = task.name
display.debug("done copying, going to template now")
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating")
except:
# just ignore any errors during task name templating,
# we don't care if it just shows the raw name
display.debug("templating failed for some reason")
if templar.is_template_uncached(task.name):
display.debug("done copying, going to template now")
templar.set_available_variables(vars_factory())
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating")
except:
# just ignore any errors during task name templating,
# we don't care if it just shows the raw name
display.debug("templating failed for some reason")
display.debug("here goes the callback...")
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
task.name = saved_name
callback_sent = True
display.debug("sending task start callback")

self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, play_context)
del task_vars
self._queue_task(host, task, vars_factory, play_context)
del vars_factory

# if we're bypassing the host loop, break out now
if run_once:
@@ -45,7 +45,7 @@

from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleFilterError, AnsibleUndefinedVariable, AnsibleAssertionError
from ansible.module_utils.six import string_types, text_type
from ansible.module_utils.six import string_types, binary_type, text_type, viewvalues
from ansible.module_utils._text import to_native, to_text, to_bytes
from ansible.plugins.loader import filter_loader, lookup_loader, test_loader
from ansible.template.safe_eval import safe_eval
@@ -69,6 +69,7 @@
NON_TEMPLATED_TYPES = (bool, Number)

JINJA2_OVERRIDE = '#jinja2:'
B_JINJA2_OVERRIDE = b'#jinja2:'


def generate_ansible_template_vars(path):
@@ -283,12 +284,14 @@ def __init__(self, loader, shared_loader_obj=None, variables=None):

self.SINGLE_VAR = re.compile(r"^%s\s*(\w*)\s*%s$" % (self.environment.variable_start_string, self.environment.variable_end_string))

self._clean_regex = re.compile(r'(?:%s|%s|%s|%s)' % (
template_token_regex = r'(?:%s|%s|%s|%s)' % (
self.environment.variable_start_string,
self.environment.block_start_string,
self.environment.block_end_string,
self.environment.variable_end_string
))
)
self._template_token_regex = re.compile(template_token_regex)
self._b_template_token_regex = re.compile(to_bytes(template_token_regex))
self._no_type_regex = re.compile(r'.*?\|\s*(?:%s)(?:\([^\|]*\))?\s*\)?\s*(?:%s)' %
('|'.join(C.STRING_TYPE_FILTERS), self.environment.variable_end_string))

@@ -370,7 +373,7 @@ def _clean_data(self, orig_data):
# want to replace matched pairs of print/block tags
print_openings = []
block_openings = []
for mo in self._clean_regex.finditer(orig_data):
for mo in self._template_token_regex.finditer(orig_data):
token = mo.group(0)
token_start = mo.start(0)

@@ -554,6 +557,32 @@ def is_template(self, data):
return True
return False

def is_template_uncached(self, obj):
'''
Return :data:`True` if any string or bytes in the graph `obj` contains
template directives. Unlike :meth:`is_template`, this does not cause
caching of the compiled template as a side-effect.
'''
stack = [obj]
while stack:
data = stack.pop()
if isinstance(data, text_type):
if data.startswith(JINJA2_OVERRIDE):
return self.is_template(obj)
if self._template_token_regex.search(data):
return True
elif isinstance(data, binary_type):
if data.startswith(B_JINJA2_OVERRIDE):
return self.is_template(obj)
if self._b_template_token_regex.search(data):
return True
elif isinstance(data, (list, tuple)):
stack.extend(data)
elif isinstance(data, dict):
stack.extend(data)
stack.extend(viewvalues(data))
return False

def templatable(self, data):
'''
returns True if the data can be templated w/o errors
@@ -213,13 +213,13 @@ def fake_run(self):

try:
strategy_base = StrategyBase(tqm=tqm)
strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock())
strategy_base._queue_task(host=mock_host, task=mock_task, vars_factory=(lambda: {}), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 1)
self.assertEqual(strategy_base._pending_results, 1)
strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock())
strategy_base._queue_task(host=mock_host, task=mock_task, vars_factory=(lambda: {}), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 2)
self.assertEqual(strategy_base._pending_results, 2)
strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock())
strategy_base._queue_task(host=mock_host, task=mock_task, vars_factory=(lambda: {}), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 0)
self.assertEqual(strategy_base._pending_results, 3)
finally:
@@ -344,7 +344,7 @@ def _has_dead_workers():
(mock_host.name, mock_task._uuid): {
'task': mock_task,
'host': mock_host,
'task_vars': {},
'vars_factory': (lambda: {}),
'play_context': {},
}
}
@@ -564,7 +564,7 @@ def fake_run(*args):
strategy_base._queued_task_cache[(mock_host.name, mock_handler_task._uuid)] = {
'task': mock_handler_task,
'host': mock_host,
'task_vars': {},
'vars_factory': (lambda: {}),
'play_context': mock_play_context
}
tqm._final_q.put(task_result)

0 comments on commit 7e1f88c

Please sign in to comment.
You can’t perform that action at this time.