Skip to content

Commit

Permalink
Merge pull request #2099 from grondo/composite-future-errors
Browse files Browse the repository at this point in the history
libflux/composite_future: propagate child errors to parent
  • Loading branch information
garlick committed Mar 25, 2019
2 parents f08d6fe + d558bf0 commit 6833978
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 33 deletions.
37 changes: 29 additions & 8 deletions src/common/libflux/composite_future.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,56 @@ static struct composite_future * composite_get (flux_future_t *f)
/*
* Return true if all futures in this composite are ready
*/
static bool wait_all_is_ready (struct composite_future *cf)
static bool wait_all_is_ready (struct composite_future *cf, int *errnum)
{
int err = 0;
flux_future_t *f = zhash_first (cf->children);
while (f) {
if (!flux_future_is_ready (f))
return (false);
if (flux_future_get (f, NULL) < 0)
err = errno;
f = zhash_next (cf->children);
}
*errnum = err;
return (true);
}

/*
* Return true if cf->any or if all futures are fulfilled.
* Sets *errp to returned errno if any future failed for wait_all
* case, or if 'f', the current future, has error set for
* wait_any case.
*/
static bool composite_is_ready (struct composite_future *cf,
flux_future_t *f,
int *errp)
{
if (cf->any) {
*errp = flux_future_get (f, NULL) < 0 ? errno : 0;
return true;
}
return wait_all_is_ready (cf, errp);
}

/* Continuation for children of a composition future -- simply check
* to see if the parent composite is "ready" and fulfill it if so.
*/
static void child_cb (flux_future_t *f, void *arg)
{
int errnum;
flux_future_t *parent = arg;
struct composite_future *cf = composite_get (parent);
if (!arg || !cf)
return;
/*
* Fulfill the composite future if "wait any" is set, or all child
* futures are fulfilled:
*/
if (cf->any || wait_all_is_ready (cf))
flux_future_fulfill (parent, NULL, NULL);
if (composite_is_ready (cf, f, &errnum)) {
if (errnum)
flux_future_fulfill_error (parent, errnum, NULL);
else
flux_future_fulfill (parent, NULL, NULL);
}
}


/* Propagate the current reactor *and* flux_t handle context from
* future `f` to another future `next`.
*/
Expand Down
87 changes: 62 additions & 25 deletions src/common/libflux/test/composite_future.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,24 @@ static void reset_static_sentinels (void)

static void init_and_fulfill (flux_future_t *f, void *arg)
{
bool with_error = *(bool *)arg;
init_and_fulfill_called = true;
flux_future_fulfill (f, NULL, NULL);
if (with_error)
flux_future_fulfill_error (f, EPERM, NULL);
else
flux_future_fulfill (f, NULL, NULL);
}

static void init_no_fulfill (flux_future_t *f, void *arg)
{
init_no_fulfill_called = true;
}

static void test_composite_basic_any (flux_reactor_t *r)
static void test_composite_basic_any (flux_reactor_t *r, bool with_error)
{
flux_future_t *any = flux_future_wait_any_create ();
flux_future_t *f1 = flux_future_create (init_no_fulfill, NULL);
flux_future_t *f2 = flux_future_create (init_and_fulfill, NULL);
flux_future_t *f2 = flux_future_create (init_and_fulfill, &with_error);
const char *s = NULL;
const char *p = NULL;
int rc;
Expand Down Expand Up @@ -93,8 +97,15 @@ static void test_composite_basic_any (flux_reactor_t *r)
"flux_future_wait_for() returns success");
ok (init_and_fulfill_called && init_no_fulfill_called,
"initializers for both futures called synchronously");
ok (flux_future_get (any, NULL) == 0,
"flux_future_get on composite returns success");

if (with_error) {
ok (flux_future_get (any, NULL) < 0 && errno == EPERM,
"flux_future_get error if first future fulfilled with error");
}
else {
ok (flux_future_get (any, NULL) == 0,
"flux_future_get on composite returns success");
}
ok (!flux_future_is_ready (f1),
"future f1 is not ready");
ok (flux_future_is_ready (f2),
Expand All @@ -103,11 +114,11 @@ static void test_composite_basic_any (flux_reactor_t *r)
flux_future_destroy (any);
}

static void test_composite_basic_all (flux_reactor_t *r)
static void test_composite_basic_all (flux_reactor_t *r, bool with_error)
{
flux_future_t *all = flux_future_wait_all_create ();
flux_future_t *f1 = flux_future_create (init_no_fulfill, NULL);
flux_future_t *f2 = flux_future_create (init_and_fulfill, NULL);
flux_future_t *f2 = flux_future_create (init_and_fulfill, &with_error);
const char *s = NULL;
int rc;

Expand Down Expand Up @@ -143,14 +154,22 @@ static void test_composite_basic_all (flux_reactor_t *r)
ok (!flux_future_is_ready (all),
"wait_all future still not ready");

flux_future_fulfill (f1, NULL, NULL);
if (with_error)
flux_future_fulfill_error (f1, EPERM, NULL);
else
flux_future_fulfill (f1, NULL, NULL);

ok (flux_future_wait_for (all, 0.1) == 0,
"flux_future_wait_for() now returns success");

ok (flux_future_get (all, NULL) == 0,
"flux_future_get on wait_all composite returns success");

if (with_error) {
ok (flux_future_get (all, NULL) < 0 && errno == EPERM,
"flux_future_get composite with error in child returns error");
}
else {
ok (flux_future_get (all, NULL) == 0,
"flux_future_get on wait_all composite returns success");
}
flux_future_destroy (all);
}

Expand Down Expand Up @@ -366,9 +385,15 @@ static flux_future_t *flux_future_timeout (double s)
static int async_check_rc = -1;
void async_check (flux_future_t *fc, void *arg)
{
bool with_error = *(bool *)arg;
flux_future_t *f;
ok (flux_future_is_ready (fc) == true,
"async: composite future is ready");
if (with_error) {
ok (flux_future_get (fc, NULL) < 0 && errno == EPERM,
"async: composite future fulfilled with error as expected");
goto out;
}
ok ((f = flux_future_get_child (fc, "f1")) != NULL,
"async: retrieved handle child future");
ok (flux_future_get (f, NULL) == 0,
Expand All @@ -377,10 +402,11 @@ void async_check (flux_future_t *fc, void *arg)
"async: retrieved handle to timeout future");
ok (flux_future_get (f, NULL) == 0,
"async: timeout future fulfilled");
out:
async_check_rc = 0;
}

void test_composite_all_async (void)
void test_composite_all_async (bool with_error)
{
flux_reactor_t *r;
flux_future_t *f, *fc;
Expand All @@ -390,7 +416,7 @@ void test_composite_all_async (void)
BAIL_OUT ("flux_reactor_create failed");
if (!(fc = flux_future_wait_all_create ()))
BAIL_OUT ("flux_future_wait_all_create failed");
if (!(f = flux_future_create (init_and_fulfill, NULL)))
if (!(f = flux_future_create (init_and_fulfill, &with_error)))
BAIL_OUT ("flux_future_create failed");

ok (flux_future_push (fc, "f1", f) == 0,
Expand All @@ -403,7 +429,7 @@ void test_composite_all_async (void)
"flux_future_push timeout success");

flux_future_set_reactor (fc, r);
ok (flux_future_then (fc, 1., async_check, NULL) == 0,
ok (flux_future_then (fc, 1., async_check, &with_error) == 0,
"flux_future_then worked");
ok (flux_future_is_ready (fc) == 0,
"flux_future_is_ready returns false");
Expand All @@ -419,13 +445,20 @@ void test_composite_all_async (void)
static int async_any_check_rc = -1;
void async_any_check (flux_future_t *fc, void *arg)
{
bool with_error = *(bool *)arg;
flux_future_t *f;
ok (flux_future_is_ready (fc) == true,
"async: composite future is ready");
ok ((f = flux_future_get_child (fc, "f1")) != NULL,
"async: retrieved handle child future");
ok (flux_future_get (f, NULL) == 0,
"async: flux_future_get on child successful");
if (with_error) {
ok (flux_future_get (fc, NULL) < 0 && errno == EPERM,
"flux_future_get on composite returns error from child");
}
else {
ok ((f = flux_future_get_child (fc, "f1")) != NULL,
"async: retrieved handle child future");
ok (flux_future_get (f, NULL) == 0,
"async: flux_future_get on child successful");
}
ok ((f = flux_future_get_child (fc, "timeout")) != NULL,
"async: retrieved handle to timeout future");
ok (flux_future_is_ready (f) == false,
Expand All @@ -437,7 +470,7 @@ void async_any_check (flux_future_t *fc, void *arg)
flux_reactor_stop (flux_future_get_reactor (f));
}

void test_composite_any_async (void)
void test_composite_any_async (bool with_error)
{
flux_reactor_t *r;
flux_future_t *f, *fc;
Expand All @@ -447,7 +480,7 @@ void test_composite_any_async (void)
BAIL_OUT ("flux_reactor_create failed");
if (!(fc = flux_future_wait_any_create ()))
BAIL_OUT ("flux_future_wait_any_create failed");
if (!(f = flux_future_create (init_and_fulfill, NULL)))
if (!(f = flux_future_create (init_and_fulfill, &with_error)))
BAIL_OUT ("flux_future_create failed");

ok (flux_future_push (fc, "f1", f) == 0,
Expand All @@ -460,7 +493,7 @@ void test_composite_any_async (void)
"flux_future_push timeout success");

flux_future_set_reactor (fc, r);
ok (flux_future_then (fc, -1., async_any_check, NULL) == 0,
ok (flux_future_then (fc, -1., async_any_check, &with_error) == 0,
"flux_future_then worked");
ok (flux_future_is_ready (fc) == 0,
"flux_future_is_ready returns false");
Expand Down Expand Up @@ -546,11 +579,15 @@ int main (int argc, char *argv[])
if (!reactor)
BAIL_OUT ("can't continue without reactor");

test_composite_basic_any (reactor);
test_composite_basic_all (reactor);
test_composite_basic_any (reactor, false);
test_composite_basic_any (reactor, true);
test_composite_basic_all (reactor, false);
test_composite_basic_all (reactor, true);
test_basic_chained (reactor);
test_composite_all_async ();
test_composite_any_async ();
test_composite_all_async (false);
test_composite_all_async (true);
test_composite_any_async (false);
test_composite_any_async (true);
test_chained_async ();

flux_reactor_destroy (reactor);
Expand Down

0 comments on commit 6833978

Please sign in to comment.