Skip to content
This repository has been archived by the owner on Apr 12, 2021. It is now read-only.

Commit

Permalink
Fix 541 (#618)
Browse files Browse the repository at this point in the history
* Move machine adding to async queue

Uses two queues to ensure that we can wait until all juju actions (even
ones we're going to wait for, like add_machines) are enqueued on the
juju queue before enqueueing the add_relations call.

Fixes #541

Signed-off-by: Michael McCracken <mike.mccracken@canonical.com>

* Add async queue debugging

Handy switch to print out contents of queues.
Useful for understanding deadlocks.

* fix tests, autoformat
  • Loading branch information
mikemccracken authored and Adam Stokes committed Jan 19, 2017
1 parent 20a7a2e commit fd4ccd2
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 33 deletions.
27 changes: 26 additions & 1 deletion conjureup/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import logging
import time
from collections import defaultdict
from collections import OrderedDict, defaultdict
from concurrent.futures import ThreadPoolExecutor
from threading import Event

Expand All @@ -21,20 +21,45 @@ class ThreadCancelledException(Exception):
_queues = defaultdict(lambda: ThreadPoolExecutor(1))
DEFAULT_QUEUE = "DEFAULT"

ENABLE_LOG = False
if ENABLE_LOG:
import q
_queueLog = defaultdict(OrderedDict)


def submit(func, exc_callback, queue_name="DEFAULT"):
def cb(cb_f):
e = cb_f.exception()
if e:
exc_callback(e)
if ENABLE_LOG:
now = time.time()
t = _queueLog[queue_name][func]
_queueLog[queue_name][func] = (t[0], t[1],
"done", time.time(),
e,
"elapsed", now - t[1])
q.q(qstatsf())
if ShutdownEvent.is_set():
log.debug("ignoring async.submit due to impending shutdown.")
return
f = _queues[queue_name].submit(func)
if ENABLE_LOG:
_queueLog[queue_name][func] = ("added", time.time(), None, None, None)
q.q(qstatsf())
f.add_done_callback(cb)
return f


def qstatsf():
s = ""
for queue, od in _queueLog.items():
s += "{}:\n".format(queue)
for func, t in od.items():
s += "{} - {}\n".format(func, t)
return s


def shutdown():
ShutdownEvent.set()
for queue in _queues.values():
Expand Down
6 changes: 4 additions & 2 deletions conjureup/controllers/bootstrapwait/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ def __init__(self):
def finish(self, *args):
if self.alarm_handle:
EventLoop.remove_alarm(self.alarm_handle)
return controllers.use('deploystatus').render()
return controllers.use('deploystatus').render(
self.relations_scheduled_future)

def __refresh(self, *args):
self.view.redraw_kitt()
self.alarm_handle = EventLoop.set_alarm_in(
1,
self.__refresh)

def render(self):
def render(self, relations_scheduled_future):
self.relations_scheduled_future = relations_scheduled_future
track_screen("Bootstrap wait")
app.log.debug("Rendering bootstrap wait")

Expand Down
53 changes: 36 additions & 17 deletions conjureup/controllers/deploy/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from conjureup.ui.views.applicationlist import ApplicationListView
from ubuntui.ev import EventLoop

DEPLOY_ASYNC_QUEUE = "DEPLOY_ASYNC_QUEUE"


class DeployController:

Expand Down Expand Up @@ -244,15 +246,20 @@ def apply_assignments(self, application):
new_assignments.append(plabel)
application.placement_spec = new_assignments

def ensure_machines(self, application, done_cb):
def ensure_machines_async(self, application, done_cb):
"""If 'application' is assigned to any machine that haven't been added yet,
first add the machines then call done_cb
"""
def _do_ensure_machines():
if app.current_cloud == 'maas':
self.ensure_machines_maas(application, done_cb)
else:
self.ensure_machines_nonmaas(application, done_cb)

if app.current_cloud == 'maas':
self.ensure_machines_maas(application, done_cb)
else:
self.ensure_machines_nonmaas(application, done_cb)
async.submit(_do_ensure_machines,
partial(self._handle_exception,
"Error while adding machines"),
queue_name=DEPLOY_ASYNC_QUEUE)

def ensure_machines_maas(self, application, done_cb):
app_placements = self.get_all_assignments(application)
Expand All @@ -269,7 +276,8 @@ def ensure_machines_maas(self, application, done_cb):

f = juju.add_machines([machine_attrs],
msg_cb=app.ui.set_footer,
exc_cb=partial(self._handle_exception, "ED"))
exc_cb=partial(self._handle_exception,
"Error Adding Machine"))
add_machines_result = f.result()
self._handle_add_machines_return(juju_machine_id,
add_machines_result)
Expand All @@ -281,7 +289,10 @@ def ensure_machines_nonmaas(self, application, done_cb):
for juju_machine_id in [j_id for j_id, _ in app_placements
if j_id not in self.deployed_juju_machines]:
juju_machine = juju_machines[juju_machine_id]
f = juju.add_machines([juju_machine])
f = juju.add_machines([juju_machine],
msg_cb=app.ui.set_footer,
exc_cb=partial(self._handle_exception,
"Error Adding Machine"))
result = f.result()
self._handle_add_machines_return(juju_machine_id, result)
done_cb()
Expand Down Expand Up @@ -310,26 +321,34 @@ def msg_both(*args):
msg_cb(*args)
app.ui.set_footer(*args)

self.ensure_machines(application, partial(self._do_deploy_one,
application, msg_both))
self.ensure_machines_async(application, partial(self._do_deploy_one,
application, msg_both))

def do_deploy_remaining(self):
"deploys all un-deployed applications"

for application in self.undeployed_applications:
self.ensure_machines(application, partial(self._do_deploy_one,
application,
app.ui.set_footer))
self.ensure_machines_async(application,
partial(self._do_deploy_one,
application,
app.ui.set_footer))

def finish(self):
juju.set_relations(self.applications,
app.ui.set_footer,
partial(self._handle_exception, "ED"))
def enqueue_set_relations():
rel_future = juju.set_relations(self.applications,
app.ui.set_footer,
partial(self._handle_exception,
"Error setting relations"))
return rel_future
f = async.submit(enqueue_set_relations,
partial(self._handle_exception,
"Error setting relations"),
queue_name=DEPLOY_ASYNC_QUEUE)

if app.bootstrap.running and not app.bootstrap.running.done():
return controllers.use('bootstrapwait').render()
return controllers.use('bootstrapwait').render(f)
else:
return controllers.use('deploystatus').render()
return controllers.use('deploystatus').render(f)

def render(self):
track_screen("Deploy")
Expand Down
11 changes: 8 additions & 3 deletions conjureup/controllers/deploystatus/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ def __handle_exception(self, msg, exc):
track_exception(msg)
return app.ui.show_exception_message(exc)

def __wait_for_applications(self, *args):
def __wait_for_applications(self, relations_scheduled_future):
# do not schedule app wait until all relations are set:
relations_done_future = relations_scheduled_future.result()
relations_done_future.result()

deploy_done_sh = os.path.join(self.bundle_scripts,
'00_deploy-done')

Expand All @@ -43,7 +47,7 @@ def __refresh(self, *args):
self.view.refresh_nodes()
EventLoop.set_alarm_in(1, self.__refresh)

def render(self):
def render(self, last_deploy_action_future):
""" Render deploy status view
"""
track_screen("Deploy Status")
Expand All @@ -59,7 +63,8 @@ def render(self):
)
app.ui.set_body(self.view)
self.__refresh()
self.__wait_for_applications()
last_deploy_action_future.add_done_callback(
self.__wait_for_applications)


_controller_class = DeployStatusController
10 changes: 9 additions & 1 deletion conjureup/juju.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ def add_machines(machines, msg_cb=None, exc_cb=None):
supported key
"""
if len(machines) > 0:
pl = "s"
else:
pl = ""

@requires_login
def _add_machines_async():
Expand All @@ -367,6 +371,9 @@ def _add_machines_async():
"jobs": ["JobHostUnits"]}
for m in machines]
app.log.debug("AddMachines: {}".format(machine_params))
if msg_cb:
msg_cb("Adding machine{}: {}".format(
pl, [(m['series'], m['constraints']) for m in machine_params]))
try:
machine_response = this.CLIENT.Client(
request="AddMachines", params={"params": machine_params})
Expand All @@ -377,7 +384,8 @@ def _add_machines_async():
return

if msg_cb:
msg_cb("Added machines: {}".format(machine_response))
ids = [d['machine'] for d in machine_response['machines']]
msg_cb("Added machine{}: {}".format(pl, ids))
return machine_response

return async.submit(_add_machines_async,
Expand Down
6 changes: 3 additions & 3 deletions test/test_controllers_bootstrapwait_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@


import unittest
# from unittest.mock import ANY, call, MagicMock, patch, sentinel
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, patch, sentinel

from conjureup.controllers.bootstrapwait.gui import BootstrapWaitController

Expand Down Expand Up @@ -47,7 +46,7 @@ def tearDown(self):

def test_render(self):
"call render"
self.controller.render()
self.controller.render(MagicMock("future"))


class BootstrapwaitGUIFinishTestCase(unittest.TestCase):
Expand Down Expand Up @@ -75,4 +74,5 @@ def tearDown(self):

def test_finish(self):
"call finish"
self.controller.relations_scheduled_future = sentinel.rsf
self.controller.finish()
12 changes: 10 additions & 2 deletions test/test_controllers_deploy_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def setUp(self):
'conjureup.controllers.deploy.gui.utils')
self.mock_utils = self.utils_patcher.start()

self.submit_patcher = patch(
'conjureup.controllers.deploy.gui.async.submit')
self.mock_submit = self.submit_patcher.start()
self.mock_submit.return_value = sentinel.a_future

self.juju_patcher = patch(
'conjureup.controllers.deploy.gui.juju')
self.mock_juju = self.juju_patcher.start()
Expand All @@ -100,6 +105,7 @@ def setUp(self):
def tearDown(self):
self.controllers_patcher.stop()
self.utils_patcher.stop()
self.submit_patcher.stop()
self.juju_patcher.stop()
self.render_patcher.stop()
self.app_patcher.stop()
Expand All @@ -111,13 +117,15 @@ def test_show_bootstrap_wait(self):
self.mock_app.bootstrap.running.done = MagicMock(name='done')
self.mock_app.bootstrap.running.done.return_value = False
self.controller.finish()
self.assertEqual(1, len(self.mock_submit.mock_calls))
self.assertEqual(self.mock_controllers.mock_calls,
[call.use('bootstrapwait'),
call.use().render()])
call.use().render(sentinel.a_future)])

def test_skip_bootstrap_wait(self):
"Go directly to deploystatus if bootstrap is done"
self.controller.finish()
self.assertEqual(1, len(self.mock_submit.mock_calls))
self.assertEqual(self.mock_controllers.mock_calls,
[call.use('deploystatus'),
call.use().render()])
call.use().render(ANY)])
9 changes: 5 additions & 4 deletions test/test_controllers_deploystatus_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
#
# tests controllers/deploystatus/gui.py
#
# Copyright 2016 Canonical, Ltd.
# Copyright 2016, 2017 Canonical, Ltd.


import unittest
# from unittest.mock import ANY, call, MagicMock, patch, sentinel
from unittest.mock import MagicMock, patch
from unittest.mock import ANY, MagicMock, patch

from conjureup.controllers.deploystatus.gui import DeployStatusController

Expand Down Expand Up @@ -46,7 +45,9 @@ def tearDown(self):

def test_render(self):
"call render"
self.controller.render()
mock_future = MagicMock(name="last_deploy_action_future")
self.controller.render(mock_future)
mock_future.add_done_callback.assert_called_once_with(ANY)


class DeployStatusGUIFinishTestCase(unittest.TestCase):
Expand Down

0 comments on commit fd4ccd2

Please sign in to comment.