Skip to content

Commit

Permalink
libflux: eliminate prepare watcher for futures
Browse files Browse the repository at this point in the history
The flux_future_t prepare watcher callback is currently used only
to start the idle watcher. Eliminate the middle man and start
the idle watcher directly in `then_context_start`.
  • Loading branch information
grondo committed Nov 16, 2018
1 parent 11b8785 commit 5e17ad8
Showing 1 changed file with 7 additions and 31 deletions.
38 changes: 7 additions & 31 deletions src/common/libflux/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct now_context {
struct then_context {
flux_reactor_t *r; // external reactor for then
flux_watcher_t *timer; // timer watcher (if timeout set)
flux_watcher_t *prepare;// doorbell for fulfill
flux_watcher_t *check;
flux_watcher_t *idle;
bool init_called;
Expand Down Expand Up @@ -78,8 +77,6 @@ struct flux_future {
zlist_t *queue;
};

static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void now_timer_cb (flux_reactor_t *r, flux_watcher_t *w,
Expand Down Expand Up @@ -154,7 +151,6 @@ static void then_context_destroy (struct then_context *then)
{
if (then) {
flux_watcher_destroy (then->timer);
flux_watcher_destroy (then->prepare);
flux_watcher_destroy (then->check);
flux_watcher_destroy (then->idle);
free (then);
Expand All @@ -169,8 +165,6 @@ static struct then_context *then_context_create (flux_reactor_t *r, void *arg)
goto error;
}
then->r = r;
if (!(then->prepare = flux_prepare_watcher_create (r, prepare_cb, arg)))
goto error;
if (!(then->check = flux_check_watcher_create (r, check_cb, arg)))
goto error;
if (!(then->idle = flux_idle_watcher_create (r, NULL, NULL)))
Expand All @@ -183,13 +177,13 @@ static struct then_context *then_context_create (flux_reactor_t *r, void *arg)

static void then_context_start (struct then_context *then)
{
flux_watcher_start (then->prepare);
flux_watcher_start (then->idle); // prevent reactor from blocking
flux_watcher_start (then->check);
}

static void then_context_stop (struct then_context *then)
{
flux_watcher_stop (then->prepare);
flux_watcher_stop (then->idle);
flux_watcher_stop (then->check);
}

Expand Down Expand Up @@ -708,20 +702,6 @@ static void now_timer_cb (flux_reactor_t *r, flux_watcher_t *w,
flux_reactor_stop_error (r);
}

/* prepare - if results are ready, ensure loop doesn't block
* so check can call continuation on next loop iteration
*/
static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
flux_future_t *f = arg;

assert (f->then != NULL);

if (future_is_ready (f))
flux_watcher_start (f->then->idle); // prevent reactor from blocking
}

/* check - if results are ready, call the continuation
*/
static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
Expand All @@ -731,15 +711,11 @@ static void check_cb (flux_reactor_t *r, flux_watcher_t *w,

assert (f->then != NULL);

flux_watcher_stop (f->then->idle);
if (future_is_ready (f)) {
flux_watcher_stop (f->then->timer);
flux_watcher_stop (f->then->prepare);
flux_watcher_stop (f->then->check);
if (f->then->continuation)
f->then->continuation (f, f->then->continuation_arg);
// N.B. callback might destroy future
}
flux_watcher_stop (f->then->timer);
then_context_stop (f->then);
if (f->then->continuation)
f->then->continuation (f, f->then->continuation_arg);
// N.B. callback might destroy future
}


Expand Down

0 comments on commit 5e17ad8

Please sign in to comment.