Skip to content

Commit

Permalink
Merge pull request #1503 from garlick/rpc_multi
Browse files Browse the repository at this point in the history
libflux: add flux_future_reset()
  • Loading branch information
grondo committed May 3, 2018
2 parents b148196 + c38b407 commit aa881c9
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 77 deletions.
10 changes: 6 additions & 4 deletions doc/man3/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ MAN3_FILES_PRIMARY = \
flux_request_encode.3 \
flux_content_load.3 \
flux_log.3 \
flux_future_then.3 \
flux_future_get.3 \
flux_future_create.3 \
flux_kvs_lookup.3 \
flux_kvs_commit.3 \
Expand Down Expand Up @@ -116,6 +116,7 @@ MAN3_FILES_SECONDARY = \
flux_log_set_appname.3 \
flux_log_set_procid.3 \
flux_future_wait_for.3 \
flux_future_reset.3 \
flux_future_destroy.3 \
flux_future_get.3 \
flux_future_fulfill.3 \
Expand Down Expand Up @@ -238,9 +239,10 @@ flux_content_store_get.3: flux_content_load.3
flux_vlog.3: flux_log.3
flux_log_set_appname.3: flux_log.3
flux_log_set_procid.3: flux_log.3
flux_future_destroy.3: flux_future_then.3
flux_future_wait_for.3: flux_future_then.3
flux_future_get.3: flux_future_then.3
flux_future_reset.3: flux_future_get.3
flux_future_destroy.3: flux_future_get.3
flux_future_wait_for.3: flux_future_get.3
flux_future_then.3: flux_future_get.3
flux_future_fulfill.3: flux_future_create.3
flux_future_fulfill_error.3: flux_future_create.3
flux_future_aux_set.3: flux_future_create.3
Expand Down
2 changes: 1 addition & 1 deletion doc/man3/flux_content_load.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,6 @@ include::COPYRIGHT.adoc[]
SEE ALSO
---------
flux_rpc(3), flux_future_then(3)
flux_rpc(3), flux_future_get(3)

https://github.com/flux-framework/rfc/blob/master/spec_10.adoc[RFC 10: Content Storage Service]
8 changes: 5 additions & 3 deletions doc/man3/flux_future_create.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ SYNOPSIS
DESCRIPTION
-----------
See `flux_future_get(3)` for general functions that operate on futures.
This page covers functions primarily used when building classes that
return futures.
A Flux future represents some activity that may be completed with reactor
watchers and/or message handlers. It is intended to be returned by other
classes as a handle for synchronization and a container for results.
This page describes the future interfaces used by such classes.
Class users and users seeking an introduction to Flux futures are referred
to `flux_future_then(3)`.
A class that returns a future usually provides a creation function
that internally calls `flux_future_create()`, and may provide functions
Expand Down Expand Up @@ -182,4 +184,4 @@ include::COPYRIGHT.adoc[]
SEE ALSO
---------
flux_future_create(3), flux_clone(3)
flux_future_get(3), flux_clone(3)
69 changes: 32 additions & 37 deletions doc/man3/flux_future_then.adoc → doc/man3/flux_future_get.adoc
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
flux_future_then(3)
===================
flux_future_get(3)
==================
:doctype: manpage


NAME
----
flux_future_then, flux_future_wait_for, flux_future_get, flux_future_destroy - synchronize an activity
flux_future_get, flux_future_then, flux_future_wait_for, flux_future_reset, flux_future_destroy - synchronize an activity


SYNOPSIS
--------
#include <flux/core.h>

typedef struct flux_future flux_future_t;

typedef void (*flux_continuation_f)(flux_future_t *f, void *arg);

int flux_future_get (flux_future_t *f, void *result);

int flux_future_then (flux_future_t *f, double timeout,
flux_continuation_f cb, void *arg);

int bool flux_future_wait_for (flux_future_t *f, double timeout);
int flux_future_wait_for (flux_future_t *f, double timeout);

int flux_future_get (flux_future_t *f, void *result);
void flux_future_reset (flux_future_t *f);

void flux_future_destroy (flux_future_t *f);


DESCRIPTION
-----------
OVERVIEW
--------
A Flux future represents some activity that may be completed with reactor
watchers and/or message handlers. It is both a handle for synchronization
Expand All @@ -42,38 +42,37 @@ access function for results. The functions described in this page can be
used to access, synchronize, and destroy futures returned from any such class.
Authors of classes that return futures are referred to `flux_future_create(3)`.
DESCRIPTION
-----------

`flux_future_get()` accesses the result of a fulfilled future. If the
future is not yet fulfilled, it calls `flux_future_wait_for()` internally
with a negative _timeout_, causing it to block until the future is fulfilled.
A pointer to the result is assigned to _result_ (caller must NOT free),
or -1 is returned if the future was fulfilled with an error.

`flux_future_then()` sets up a continuation callback _cb_ that is called
with opaque argument _arg_ once the future is fulfilled. The continuation
is registered on the reactor passed to a class-specific create function
(some create functions accept a flux_t handle, and the reactor is
derived from that).
The continuation will normally use `flux_future_get()` or a class-specific
access function to obtain the result from the future container without
blocking. `flux_future_then()` may only be called once on a given future.
It is not an error to set up a continuation on a future that has already
been fulfilled. If _timeout_ is non-negative, the future must be fulfilled
within the specified amount of time or the timeout fulfills it with an error
(errno set to ETIMEDOUT). The reactor must be run or re-entered in order
for the timer and the future activity to make progress. It is safe to
destroy the future from within the continuation callback.
will normally use `flux_future_get()` or a class-specific access function
to obtain the result from the future container without blocking. The
continuation may call `flux_future_destroy()` or `flux_future_reset()`.
If _timeout_ is non-negative, the future must be fulfilled within the
specified amount of time or the timeout fulfills it with an error (errno
set to ETIMEDOUT).

`flux_future_wait_for()` blocks until the future is fulfilled, or _timeout_
(if non-negative) expires. This function may be called multiple times,
with different values for _timeout_. If the timeout expires before
the future is fulfilled, an error is returned (errno set to ETIMEDOUT)
but the future remains unfulfilled. The timer and the future activity can
make progress while `flux_future_wait_for()` is executing, unless _timeout_
is zero, in which case the function times out immediately if the future
has not already been fulfilled. While `flux_future_wait_for()` is executing,
unrelated reactor watchers and message handlers are not active.
but the future remains unfulfilled. If _timeout_ is zero, function times
out immediately if the future has not already been fulfilled.

`flux_future_get()` accesses the result of a fulfilled future. If the
future is not yet fulfilled, it calls `flux_future_wait_for()` internally
with a negative _timeout_, causing it to block until the future is fulfilled.
A pointer to the result is assigned to _result_ (caller must NOT free),
or -1 is returned if the future was fulfilled with an error. Often this
function is wrapped with a class-specific result access function.
`flux_future_reset()` unfulfills a future, invalidating any result stored
in the container, and preparing it to be fulfilled once again. If a
continuation was registered, it remains in effect for the next fulfillment,
however any timeout will have been cleared by the current fulfillment
and must be re-established by following the `flux_future_reset()` with
another `flux_future_then()`, if desired.

`flux_future_destroy()` destroys a future, including any result contained
within.
Expand All @@ -86,10 +85,6 @@ RETURN VALUE
return zero on success. On error, -1 is returned, and errno is set
appropriately.
`flux_future_check()` returns a boolean result. If an error occurs,
it returns true. A subsequent call to `flux_future_get()` returns
-1 with errno set to the error that occurred during the check.

ERRORS
------
Expand Down
2 changes: 1 addition & 1 deletion doc/man3/flux_kvs_commit.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ include::COPYRIGHT.adoc[]
SEE ALSO
---------
flux_future_then(3), flux_kvs_txn_create(3), flux_kvs_set_namespace(3)
flux_future_get(3), flux_kvs_txn_create(3), flux_kvs_set_namespace(3)
2 changes: 1 addition & 1 deletion doc/man3/flux_rpc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ include::COPYRIGHT.adoc[]

SEE ALSO
---------
flux_future_then(3)
flux_future_get(3)
https://github.com/flux-framework/rfc/blob/master/spec_6.adoc[RFC 6: Flux
Remote Procedure Call Protocol]
1 change: 1 addition & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,4 @@ resizing
HOSTLIST
hostlist
hostnames
unfulfills
71 changes: 53 additions & 18 deletions src/common/libflux/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct then_context {
flux_watcher_t *prepare;// doorbell for fulfill
flux_watcher_t *check;
flux_watcher_t *idle;
bool init_called;
flux_continuation_f continuation;
void *continuation_arg;
};
Expand Down Expand Up @@ -164,24 +165,37 @@ static struct then_context *then_context_create (flux_reactor_t *r, void *arg)
goto error;
if (!(then->idle = flux_idle_watcher_create (r, NULL, NULL)))
goto error;
flux_watcher_start (then->prepare);
flux_watcher_start (then->check);
return then;
error:
then_context_destroy (then);
return NULL;
}

static void then_context_start (struct then_context *then)
{
flux_watcher_start (then->prepare);
flux_watcher_start (then->check);
}

static int then_context_set_timeout (struct then_context *then,
double timeout, void *arg)
{
assert (then != NULL);
assert (then->timer == NULL);
assert (timeout >= 0.);
if (!(then->timer = flux_timer_watcher_create (then->r, timeout, 0.,
then_timer_cb, arg)))
return -1;
flux_watcher_start (then->timer);
if (then) {
if (timeout < 0.) // disable
flux_watcher_stop (then->timer);
else {
if (!then->timer) { // set
then->timer = flux_timer_watcher_create (then->r, timeout, 0.,
then_timer_cb, arg);
if (!then->timer)
return -1;
}
else { // reset
flux_timer_watcher_reset (then->timer, timeout, 0.);
}
flux_watcher_start (then->timer);
}
}
return 0;
}

Expand Down Expand Up @@ -226,6 +240,24 @@ flux_future_t *flux_future_create (flux_future_init_f cb, void *arg)
return NULL;
}

/* Reset (unfulfill) a future.
*/
void flux_future_reset (flux_future_t *f)
{
if (f) {
if (f->result) {
if (f->result_free)
f->result_free (f->result);
f->result = NULL;
}
f->result_valid = false;
f->result_errnum_valid = false;
if (f->then)
then_context_start (f->then);
}
}


/* Set the flux reactor to be used for 'then' context.
* In 'now' context, reactor will be a temporary one.
*/
Expand Down Expand Up @@ -327,10 +359,8 @@ int flux_future_wait_for (flux_future_t *f, double timeout)
if (!(f->now = now_context_create ()))
return -1;
}
if (timeout >= 0.) {
if (now_context_set_timeout (f->now, timeout, f) < 0)
return -1;
}
if (now_context_set_timeout (f->now, timeout, f) < 0)
return -1;
f->now->running = true;
if (f->init && !f->now->init_called) {
f->init (f, f->init_arg); // might set error
Expand Down Expand Up @@ -373,18 +403,23 @@ int flux_future_get (flux_future_t *f, void *result)
int flux_future_then (flux_future_t *f, double timeout,
flux_continuation_f cb, void *arg)
{
if (!f || !f->r || !cb || f->then != NULL) {
if (!f || !f->r || !cb) {
errno = EINVAL;
return -1;
}
if (!(f->then = then_context_create (f->r, f)))
return -1;
if (timeout >= 0. && then_context_set_timeout (f->then, timeout, f) < 0)
if (!f->then) {
if (!(f->then = then_context_create (f->r, f)))
return -1;
}
then_context_start (f->then);
if (then_context_set_timeout (f->then, timeout, f) < 0)
return -1;
f->then->continuation = cb;
f->then->continuation_arg = arg;
if (f->init)
if (f->init && !f->then->init_called) {
f->init (f, f->init_arg); // might set error
f->then->init_called = true;
}
if (f->result_errnum_valid) {
errno = f->result_errnum;
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/common/libflux/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ int flux_future_then (flux_future_t *f, double timeout,

int flux_future_wait_for (flux_future_t *f, double timeout);

void flux_future_reset (flux_future_t *f);

void flux_future_destroy (flux_future_t *f);

void *flux_future_aux_get (flux_future_t *f, const char *name);
Expand Down
Loading

0 comments on commit aa881c9

Please sign in to comment.