Skip to content

Commit

Permalink
libflux/dispatch: add fastpath for tagged responses
Browse files Browse the repository at this point in the history
When a message watcher is registered for a tagged response,
use O(1) array lookup to map the tag in the response back to the
message handler rather the O(N) list traversal.

Start with 32 entries for single RPCs and 32 for group RPCs.
Expand that as needed.  Since the tagpool is LIFO, the array
size is a function of the maximum number of concurrent requests
in flight.

This should solve #680 where a large backlog of requests,
such as flush requests from content cache to backing store,
can cause the dispatcher to spend a *lot* of time walking that list.

This fastpath is not used in coproc mode.  Coproc mode is less
commonly used and it is more challenging to implement the fastpath
in that context.
  • Loading branch information
garlick committed Jun 3, 2016
1 parent 796802f commit 7c45eb5
Showing 1 changed file with 164 additions and 23 deletions.
187 changes: 164 additions & 23 deletions src/common/libflux/dispatch.c
Expand Up @@ -37,15 +37,30 @@
#include "src/common/libutil/coproc.h"
#include "src/common/libutil/iterators.h"


/* Fastpath for RPCs:
* fastpath array translates response matchtags to message handlers,
* bypassing the handlers zlist. Since the matchtag pools are LIFO,
* start with a small array and realloc if the backlog grows beyond it.
*/
#define BASE_FASTPATH_MAPLEN 32 /* use power of 2 */

struct fastpath {
struct flux_msg_handler **map;
struct flux_msg_handler *base[BASE_FASTPATH_MAPLEN];
int len;
};

struct dispatch {
flux_t h;
zlist_t *handlers;
zlist_t *handlers_new;
struct fastpath norm;
struct fastpath group;
flux_msg_handler_t *current;
flux_watcher_t *w;
int running_count;
int usecount;
int lastnotice; /* message handler count */
};

#define HANDLER_MAGIC 0x44433322
Expand All @@ -70,6 +85,9 @@ static void handle_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void free_msg_handler (flux_msg_handler_t *w);

static void fastpath_init (struct fastpath *fp);
static void fastpath_free (struct fastpath *fp);

static void dispatch_usecount_decr (struct dispatch *d)
{
flux_msg_handler_t *w;
Expand All @@ -89,6 +107,8 @@ static void dispatch_usecount_decr (struct dispatch *d)
zlist_destroy (&d->handlers_new);
}
flux_watcher_destroy (d->w);
fastpath_free (&d->norm);
fastpath_free (&d->group);
free (d);
}
}
Expand Down Expand Up @@ -122,6 +142,9 @@ static struct dispatch *dispatch_get (flux_t h)
d->w = flux_handle_watcher_create (r, h, FLUX_POLLIN, handle_cb, d);
if (!d->w)
goto nomem;
fastpath_init (&d->norm);
fastpath_init (&d->group);

flux_aux_set (h, "flux::dispatch", d, dispatch_destroy);
}
return d;
Expand All @@ -131,6 +154,101 @@ static struct dispatch *dispatch_get (flux_t h)
return NULL;
}

static void fastpath_init (struct fastpath *fp)
{
fp->map = fp->base;
fp->len = BASE_FASTPATH_MAPLEN;
}

static void fastpath_free (struct fastpath *fp)
{
if (fp->map && fp->map != fp->base)
free (fp->map);
}

static int fastpath_grow (struct fastpath *fp)
{
int new_len = fp->len<<1;
struct flux_msg_handler **new_map;
int i;

if (!(new_map = calloc (new_len, sizeof (fp->map[0])))) {
errno = ENOMEM;
return -1;
}
for (i = 0; i < fp->len; i++)
new_map[i] = fp->map[i];
if (fp->map != fp->base)
free (fp->map);
fp->map = new_map;
fp->len = new_len;
return 0;
}

static int fastpath_get (struct fastpath *fp, uint32_t tag,
struct flux_msg_handler **hpp)
{
if (tag >= fp->len || fp->map[tag] == NULL)
return -1;
*hpp = fp->map[tag];
return 0;
}

static int fastpath_set (struct fastpath *fp, uint32_t tag,
struct flux_msg_handler *hp)
{
while (tag >= fp->len) {
if (fastpath_grow (fp) < 0)
return -1;
}
if (fp->map[tag] != NULL) {
errno = EINVAL;
return -1;
}
fp->map[tag] = hp;
return 0;
}

static void fastpath_clr (struct fastpath *fp, uint32_t tag)
{
if (tag < fp->len)
fp->map[tag] = NULL;
}

static int fastpath_response_lookup (struct dispatch *d, const flux_msg_t *msg,
struct flux_msg_handler **hpp)
{
uint32_t tag, group;

if (flux_msg_get_matchtag (msg, &tag) < 0)
return -1;
group = tag>>FLUX_MATCHTAG_GROUP_SHIFT;
if (group > 0)
return fastpath_get (&d->group, group, hpp);
else
return fastpath_get (&d->norm, tag, hpp);
}

static int fastpath_response_register (struct dispatch *d,
struct flux_msg_handler *hp)
{
uint32_t tag = hp->match.matchtag;
uint32_t group = tag>>FLUX_MATCHTAG_GROUP_SHIFT;
if (group > 0)
return fastpath_set (&d->group, group, hp);
else
return fastpath_set (&d->norm, tag, hp);
}

static void fastpath_response_unregister (struct dispatch *d, uint32_t tag)
{
uint32_t group = tag>>FLUX_MATCHTAG_GROUP_SHIFT;
if (group > 0)
fastpath_clr (&d->group, group);
else
fastpath_clr (&d->norm, tag);
}

static int copy_match (struct flux_match *dst,
const struct flux_match src)
{
Expand Down Expand Up @@ -338,14 +456,24 @@ static int dispatch_message (struct dispatch *d,
bool match = false;
int rc = -1;

FOREACH_ZLIST (d->handlers, w) {
if (!w->running)
continue;
if (flux_msg_cmp (msg, w->match)) {
/* fastpath */
if (type == FLUX_MSGTYPE_RESPONSE) {
if (fastpath_response_lookup (d, msg, &w) == 0 && w->running) {
w->fn (d->h, w, msg, w->arg);
match = true;
if (type != FLUX_MSGTYPE_EVENT)
break;
}
}
/* slowpath */
if (!match) {
FOREACH_ZLIST (d->handlers, w) {
if (!w->running)
continue;
if (flux_msg_cmp (msg, w->match)) {
w->fn (d->h, w, msg, w->arg);
match = true;
if (type != FLUX_MSGTYPE_EVENT)
break;
}
}
}
rc = match ? 1 : 0;
Expand Down Expand Up @@ -525,13 +653,21 @@ static void free_msg_handler (flux_msg_handler_t *w)

void flux_msg_handler_destroy (flux_msg_handler_t *w)
{
if (!w)
return;
assert (w->magic == HANDLER_MAGIC);
if (!w->destroyed) {
flux_msg_handler_stop (w);
dispatch_usecount_decr (w->d);
w->destroyed = 1;
if (w) {
assert (w->magic == HANDLER_MAGIC);
if (!(flux_flags_get (w->d->h) & FLUX_O_COPROC)
&& w->match.typemask == FLUX_MSGTYPE_RESPONSE
&& w->match.matchtag != FLUX_MATCHTAG_NONE) {
fastpath_response_unregister (w->d, w->match.matchtag);
flux_msg_handler_stop (w);
free_msg_handler (w);
} else {
if (!w->destroyed) {
flux_msg_handler_stop (w);
dispatch_usecount_decr (w->d);
w->destroyed = 1;
}
}
}
}

Expand All @@ -541,7 +677,7 @@ flux_msg_handler_t *flux_msg_handler_create (flux_t h,
{
struct dispatch *d = dispatch_get (h);
flux_msg_handler_t *w = NULL;
int count;
int saved_errno;

if (!d)
goto nomem;
Expand All @@ -554,19 +690,24 @@ flux_msg_handler_t *flux_msg_handler_create (flux_t h,
w->fn = cb;
w->arg = arg;
w->d = d;
if (zlist_append (d->handlers_new, w) < 0)
goto nomem;
count = zlist_size (d->handlers) + zlist_size (d->handlers_new);
if (count >= d->lastnotice + 100 || count <= d->lastnotice - 100) {
flux_log (d->h, LOG_DEBUG, "message handler count has reached %d",
count);
d->lastnotice = count;
if (!(flux_flags_get (h) & FLUX_O_COPROC)
&& w->match.typemask == FLUX_MSGTYPE_RESPONSE
&& w->match.matchtag != FLUX_MATCHTAG_NONE) {
if (fastpath_response_register (d, w) < 0) {
saved_errno = errno;
goto error;
}
} else {
if (zlist_append (d->handlers_new, w) < 0)
goto nomem;
}
dispatch_usecount_incr (d);
return w;
nomem:
saved_errno = ENOMEM;
error:
free_msg_handler (w);
errno = ENOMEM;
errno = saved_errno;
return NULL;
}

Expand Down

0 comments on commit 7c45eb5

Please sign in to comment.