New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mgr/dashboard: asynchronous task support #20870

Merged
merged 9 commits into from Mar 28, 2018

Conversation

@rjfd
Copy link
Contributor

rjfd commented Mar 13, 2018

This PR introduces the support for executing long-running tasks by dashboard backend controllers.
The tasks are executed asynchronously and can be queried for their respecting executing status.

For detailed information please read the changes made to HACKING.rst file.

Signed-off-by: Ricardo Dias rdias@suse.com

@rjfd rjfd force-pushed the rjfd:wip-dashboard-tasks branch from 7d017f5 to 7a21c3a Mar 13, 2018

@rjfd rjfd requested a review from LenzGr Mar 13, 2018


To help in the development of the above scenario we added the support for
asynchronous tasks. To trigger the execution of an asynchronous task we must
use the follwoing class method of the ``TaskManager`` class::

This comment has been minimized.

@ricardoasmarques

ricardoasmarques Mar 13, 2018

Member

s/follwoing/following/


To help in the development of the above scenario we added the support for
asynchronous tasks. To trigger the execution of an asynchronous task we must
use the follwoing class method of the ``TaskManager`` class::

This comment has been minimized.

@votdev

votdev Mar 13, 2018

Contributor

s/follwoing/following/

* ``func`` is the python function that implements the operation code, which
will be executed asynchronously.

* ``args`` and ``kwargs`` are the positional and named argguments that will be

This comment has been minimized.

@votdev

votdev Mar 13, 2018

Contributor

s/argguments/arguments/

is not created and you get the task object of the current running task.


How to get the list of executing and finished asynchronous tasks?

This comment has been minimized.

@votdev

votdev Mar 13, 2018

Contributor

Maybe 'running' is better than 'executing' here.

asynchronous tasks. To trigger the execution of an asynchronous task we must
use the follwoing class method of the ``TaskManager`` class::

import ..tools import TaskManager

This comment has been minimized.

@ricardoasmarques

ricardoasmarques Mar 13, 2018

Member

s/import ..tools import TaskManager/from .tools import TaskManager/


@ApiController('task')
@AuthRequired()
class TaskController(RESTController):

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

's/TaskController/Task/g' or add Controller to all controllers.

@ApiController('task')
@AuthRequired()
class TaskController(RESTController):
def list(self, namespace=None):

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

Regarding the namespace:

Do you have a requirement from the UI to add namespaces? If not, I'd add namespaces later, only if they are needed.
Looks like I can filter by namespace here. Why not instead by task name? In my experience, we won't have that many different tasks.

This comment has been minimized.

@rjfd

rjfd Mar 14, 2018

Author Contributor

Namespaces are useful to group related tasks. For instance, to group all tasks related to a single component like RBD rbd/*.
Since the frontend is the main entity to consume the list of tasks, working with a "task name" is difficult. The frontend memory is volatile and might easily lose the task name upon refresh, therefore it is easier to use namespaces.

Also, you can define a unique namespace to mimic the same behavior of a "task name" or "task id".

The namespace parameter in this function accepts a glob expression, which makes it pretty good for filtering groups of tasks.

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 14, 2018

Member

Just renaming namespace to name or task_name would make things clearer. Especially when moving the name to the task definition. We're talking about the same thing here.

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

I don't have strong feelings either way, but namespace does sound a bit better than name or task_name IF namespace here is supposed to allow more than just a specific task unique identifier; name and task_name, on the other hand, seem to convey something a bit too specific.

However, I don't particularly like namespace for this particular case, but I don't have a better name for it, so...

@cherrypy.expose
@cherrypy.tools.json_out()
def default(self):
task = TaskManager.run("dummy/task", {}, self._dummy)

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

The name "dummy/task" should be part of the definition of the task, as it should constant across all usages.

This comment has been minimized.

@rjfd

rjfd Mar 14, 2018

Author Contributor

Can you be more clear on what you mean by should be part of the definition of the task?

The namespace parameter is supposed to be a dynamic property on purpose, if the controller developer wants all its tasks to have the same namespace string then it can declare a constant variable with that string.

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 14, 2018

Member

The task name is typically very similar to the method name plus class or package name. There is no need to make this a dynamic property, except if you have a clear usecase in mind.

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

@sebastian-philipp If you only have one single task_name as the sole identifier of a single task in the queue, how do you expect to handle different tasks performing the same base action if you are not also differentiating them based on their arguments?

You actually gave a pretty good example of how what you are proposing would not work (in another comment), even though you presented it as an argument against the current implementation:

TaskManager.run('set_pg_count_per_pool', 42, lambda: set_pg_count_per_pool(mypool, 42))
TaskManager.run('set_pg_count_per_pool', 12, lambda: set_pg_count_per_pool(mypool, 12))
TaskManager.run('set_pg_count_per_pool', 42, lambda: set_pg_count_per_pool(mypool, 42))

Whereas what is above would be expected to run two different tasks, and have the third call being returned the status of the first call (given it's an idempotent operation), how would you handle such a sequence of operations if you only had one single, static identifier task_name?

``value == None``, and if ``state == VALUE_EXCEPTION`` then ``value`` stores
the exception object raised by the execution of function ``func``.

The pair ``(namespace, metadata)`` should univocally identify the task being

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

why not use a task name instead? Why invent something like namespace?

This comment has been minimized.

@rjfd

rjfd Mar 14, 2018

Author Contributor

(namespace, metadata) provides the ability to identify unequivocally a task without having to know the "task name" or "task id". This provides much more flexibility for the consumers of the list of executing/finished tasks, as they are not require to store the list of task names that they want keep track.


* ``VALUE_DONE = 0``
* ``VALUE_EXECUTING = 1``
* ``VALUE_EXCEPTION = 2``

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

Please remove VALUE_EXCEPTION as a valid return code. Instead raise the original exception in the caller's thread. If someone wants to catch a specific exception, he should use `try: ... except:' instead.

This comment has been minimized.

@rjfd

rjfd Mar 14, 2018

Author Contributor

Right, maybe we don't need to pass the exception this way. Will try your suggestion.

{
'namespace': "namespace", # str
'metadata': { }, # dict
'begin_time': 0.0, # float

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

again, ISO 8601

'namespace': "namespace", # str
'metadata': { }, # dict
'begin_time': 0.0, # float
'end_time': 0.0, # float

This comment has been minimized.

@sebastian-philipp
'end_time': 0.0, # float
'latency': 0.0, # float
'progress': 0 # int (percentage)
'success': True, # bool

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

I'd replace with state add unify executing and finished tasks. This would also improve the TaskManager itself.

'latency': 0.0, # float
'progress': 0 # int (percentage)
'success': True, # bool
'ret_value': None, # object, populated only if 'success' == True

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

I'd allow generic JSON data structures here. Sometimes, a simple Boolean is enough.

This comment has been minimized.

@rjfd

rjfd Mar 14, 2018

Author Contributor

When I say object, I mean anything, it could be a boolean. Of course the restriction is that it must be something that can be serialized in JSON.

return {
'executing_tasks': executing_t,
'finished_tasks': finished_t
}

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 13, 2018

Member

I'd unify both lists into one and add a state property.

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

I can imagine this being useful from the UI point-of-view. Maybe a frontend dev could opinionate on what they would like to be returned here?

with self.lock:
if self.executor_thread is None:
self.executor_thread = AsyncTask.ExecutorThread(self)
self.executor_thread.start()

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 14, 2018

Member

You're unconditionally creating a new posix thread per scheduled task here and dashboard_v2 already starts about ten posix threads.

This comment has been minimized.

@LenzGr

LenzGr Mar 14, 2018

Contributor

But wouldn't each task have to be handled in a separate thread? How else would you execute them in parallel?

This comment has been minimized.

@rjfd

rjfd Mar 14, 2018

Author Contributor

@LenzGr I think what @sebastian-philipp means is that with the current implementation the number of threads might grow unbounded. I agree that we can improve this part of code by using a thread queue that limits the number of worker threads. I think this improvement can be done in a separate PR and is not critical at this point.

val = self._task.fn(*self._task.fn_args, **self._task.fn_kwargs)
self._task.end_time = time.time()
except Exception as ex:
logger.exception("Error while calling %s: ex=%s", self._task, str(ex))

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 14, 2018

Member

replace with

logger.exception("Error while calling %s", self._task)

as the exception is already printed by logger.excepton

self._task.exception = ex
else:
with self._task.lock:
self._task.latency = self._task.end_time - self._task.begin_time

This comment has been minimized.

@sebastian-philipp

sebastian-philipp Mar 14, 2018

Member

s/latency/duration?

@jecluis
Copy link
Member

jecluis left a comment

Only focused on evaluating the general architecture and approach, which seemed sound and generic enough to me to make me happy. I will leave for the backend developers to figure out the code correctness, and frontend developers to ascertain if the interfaces make sense to them.

Some nits on grammar and english foo, but does look good to me.


* ``namespace`` is a string that can be used to group tasks. For instance
for RBD image creation tasks we could specify ``"rbd/create"`` as the
namespace, or conversly ``"rbd/remove"`` for RBD image removal tasks.

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

s/conversly/conversely/. Also, I don't think conversely applies here. I'd go with similarly instead, as you are not pointing out a reversion of the previous statement.

The pair ``(namespace, metadata)`` should univocally identify the task being
run, which means that if you try to trigger a new task that matches the same
``(namespace, metadata)`` pair of the currently running task, then the new task
is not created and you get the task object of the current running task.

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

I would also mention the idempotent nature of running two calls to run() with the same namespace and metadata, with an example.

will return all executing and finished tasks which namespace starts with
``rbd/``.

To prevent the finished tasks list from growing unboundly, the finished tasks

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

s/unboundly/unbounded/

}


How to updated the execution progress of an asynchronous task?

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

Either s/updated/update/, or you the verb.

The ``inc_progress`` method receives as argument an integer value representing
the delta we want to increment to the current execution progress percentage.

Now we show a full example of a controller that triggers a new task and

This comment has been minimized.

@jecluis

jecluis Mar 14, 2018

Member

Instead of

Now we show a full example of a controller [...]

I'd go with

Take the following example of a controler [...]

dismissed review

@rjfd rjfd force-pushed the rjfd:wip-dashboard-tasks branch from 7a21c3a to 84e3a5e Mar 14, 2018

@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 14, 2018

@votdev @ricardoasmarques @jecluis I addressed your documentation comments.

@sebastian-philipp I addressed all the comments that are not under discussion.

@rjfd rjfd force-pushed the rjfd:wip-dashboard-tasks branch 3 times, most recently from ddc38e0 to a9e8f20 Mar 14, 2018

@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 15, 2018

jenkins retest this please

@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 16, 2018

@ricardoasmarques I implemented the solution you proposed for the clean up of old finished tasks.

@rjfd rjfd force-pushed the rjfd:wip-dashboard-tasks branch 2 times, most recently from b29edb1 to 2801f25 Mar 16, 2018

@rjfd rjfd changed the title mgr/dashboard_v2: asynchronous task support mgr/dashboard: asynchronous task support Mar 16, 2018

@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 16, 2018

jenkins retest this please

@ricardoasmarques
Copy link
Member

ricardoasmarques left a comment

I've tested this PR, and it meets all requirements that will be needed for the front-end. Lgtm.

@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 21, 2018

@sebastian-philipp here's an example how to mimic the oA code you referred to using dashboard asynchronous tasks. Since getting the pg_status of a pool is a blocking operation we will be executing the monitor_pg_state function as a task using the default ThreadedExecutor, which internally spawns a thread.

def monitor_pg_state(pool_name, pg_num):
    pool_info = CephService.get_pool_info(pool_name)
    if not pool_info or 'pg_status' not in pool_info:
        return {'success': False, 'msg': "Could not get pool pg_status"}
    pg_status = pool_info['pg_status']
    active = 0
    if 'active+clean' in pg_status:
        if pg_status['active+clean'] == pg_num:
            return {'success': True, 'msg': "All PGs are active+clean"}
        else:
            active = pg_status['active+clean']

    progress = int(round(active * 100.0 / pg_num))
    TaskManager.current_task().set_progress(progress)

    time.sleep(1.0)  # sleep for a bit, no need to check every millisecond
    return monitor_pg_state(pool_name, pg_num)


@ApiController('test')
class Test(BaseController):
    @cherrypy.expose
    @cherrypy.tools.json_out()
    def pg_monitor(self, pool_name):
        pool_info = CephService.get_pool_info(pool_name)
        task = TaskManager.run("osd/pool/pg/monitor", {'pool_name': pool_name},
                               monitor_pg_state, [pool_name, pool_info['pg_num']])
        return task.wait(2.0)
@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 21, 2018

@sebastian-philipp (and others @jecluis @ricardoasmarques @jcsp ) here's an example of how to implement the "create pool" operation using the asynchronous nature of mgr.send_command and a custom task executor. The implementation is based on an asynchronous state machine that executes all steps asynchronously, and the task only finishes when all pools PGs are in active+clean state.

https://github.com/rjfd/ceph/blob/wip-dashboard-pr-20865/src/pybind/mgr/dashboard/controllers/pool.py

@rjfd rjfd force-pushed the rjfd:wip-dashboard-tasks branch 2 times, most recently from 4e0d0cd to f387e7b Mar 21, 2018

@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 21, 2018

Added cc5ef6a to this PR to avoid memory leaks because of dangling notificationqueue listeners registered by short lived objects like in the custom executor example in HACKING.rst. Updated the example accordingly.

@ricardoasmarques

This comment has been minimized.

Copy link
Member

ricardoasmarques commented Mar 22, 2018

@rjfd I've retested this PR and it still lgtm.

@sebastian-philipp

This comment has been minimized.

Copy link
Member

sebastian-philipp commented Mar 23, 2018

I just looked at /task and I'm still find it confusing to have two success bools contradicting each other:

{
  "executing_tasks": [],
  "finished_tasks": [
    {
      "exception": null,
      "end_time": "2018-03-23T12:30:37.318413Z",
      "success": true,
      "begin_time": "2018-03-23T12:30:37.317111Z",
      "duration": 0.0013020038604736328,
      "progress": 100,
      "ret_value": {
        "msg": "Could not get pool pg_status",
        "success": false
      },
      "namespace": "osd/pool/pg/monitor",
      "metadata": {
        "pool_name": ".rgw.root"
      }
    }
  ]
}

But I'm OK with it, if @ricardoasmarques is handling this in the UI.

@jecluis

This comment has been minimized.

Copy link
Member

jecluis commented Mar 23, 2018

But... Are they not on different objects though?

@sebastian-philipp

This comment has been minimized.

Copy link
Member

sebastian-philipp commented Mar 23, 2018

The first on is on the task and the second one is on the return value of the task.

rjfd added some commits Mar 8, 2018

mgr/dashboard: privatize NotificationQueue methods
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: Support for handler priorities in NotificationQueue
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: fix NotificationQueue waiting loop
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: implemented NotificationQueue listener removal
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: Asynchronous tasks implementation
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: async tasks controller
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: added tasks info to summary controller
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: add task manager usage instructions to HACKING.rst
Signed-off-by: Ricardo Dias <rdias@suse.com>
mgr/dashboard: task manager unit tests implementation
Signed-off-by: Ricardo Dias <rdias@suse.com>

@rjfd rjfd force-pushed the rjfd:wip-dashboard-tasks branch from f387e7b to 9f11a68 Mar 27, 2018

@rjfd

This comment has been minimized.

Copy link
Contributor Author

rjfd commented Mar 28, 2018

comments have been addresses

@rjfd rjfd merged commit 5a861f5 into ceph:master Mar 28, 2018

4 of 5 checks passed

make check (arm64) make check failed
Details
Docs: build check OK - docs built
Details
Signed-off-by all commits in this PR are signed
Details
Unmodified Submodules submodules for project are unmodified
Details
make check make check succeeded
Details
@tchaikov

This comment has been minimized.

Copy link
Contributor

tchaikov commented on src/pybind/mgr/dashboard/tests/test_notification.py in 6b0afa3 Mar 30, 2018

@rjfd is it possible that

        self.assertLess(self.listener.type1_3_ts[0], self.listener.all_ts[0])
        self.assertLess(self.listener.all_ts[0], self.listener.type1_ts[0])
        self.assertLess(self.listener.type2_ts[0], self.listener.all_ts[1])
>       self.assertLess(self.listener.type1_3_ts[1], self.listener.all_ts[2])
E       AssertionError: 1522423799.316104 not less than 1522423799.316104

the timestamp of type_1_3_ts[1] is identical to that of self.listener.all_ts[2] ? if yes, probably we can should s/assertLess/assertLessEqual/ here? see https://jenkins.ceph.com/job/ceph-pull-requests/43376/console

This comment has been minimized.

Copy link
Contributor Author

rjfd replied Mar 30, 2018

Yes, I'll open a PR with the fix

This comment has been minimized.

Copy link
Contributor Author

rjfd replied Mar 30, 2018

PR: #21147

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment