Skip to content

Commit

Permalink
Merge pull request #5927 from grondo/issue#5923
Browse files Browse the repository at this point in the history
fix possible use-after-free in chained future implementation
  • Loading branch information
mergify[bot] committed May 2, 2024
2 parents 86d905f + dcd5de6 commit 16cefe3
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 26 deletions.
130 changes: 111 additions & 19 deletions src/common/libflux/composite_future.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,12 @@ static flux_future_t *future_create_composite (int wait_any)
{
struct composite_future *cf = composite_future_create ();
flux_future_t *f = flux_future_create (composite_future_init, (void *) cf);
if (!f || !cf || flux_future_aux_set (f, "flux::composite", cf,
(flux_free_f) composite_future_destroy) < 0) {
if (!f
|| !cf
|| flux_future_aux_set (f,
"flux::composite",
cf,
(flux_free_f) composite_future_destroy) < 0) {
composite_future_destroy (cf);
flux_future_destroy (f);
return (NULL);
Expand Down Expand Up @@ -265,11 +269,11 @@ const char *flux_future_next_child (flux_future_t *f)
* If the user calls both and_then() and or_then(), the same cf->next
* future is returned, since only one of these calls will be used.
*
* The underlying then() callback for `f` is subsequently set to use
* The underlying then() callback for `f` is subsequently set to use
* chained_continuation() below, which will call `and_then()` on successful
* fulfillment of f (aka `prev`) or or_then() on failure. These continuations
* are passed `f, arg` as if a normal continuation was used with
* flux_future_then(3). These callbacks may use one of
* flux_future_then(3). These callbacks may use one of
* flux_future_continue(3) or flux_future_continue_error(3) to schedule
* fulfillment of the internal `cf->next` future based on a new
* intermediate future created during the continuation (e.g. when a
Expand Down Expand Up @@ -300,6 +304,7 @@ struct continuation_info {
};

struct chained_future {
int refcount;
bool continued;
flux_future_t *next;
flux_future_t *prev;
Expand Down Expand Up @@ -367,7 +372,8 @@ static void chained_continuation (flux_future_t *prev, void *arg)
* cf->next using prev directly.
*/
if (!cf->continued && flux_future_fulfill_with (cf->next, prev) < 0) {
flux_future_fatal_error (cf->next, errno,
flux_future_fatal_error (cf->next,
errno,
"chained_continuation: fulfill_with failed");
}

Expand All @@ -385,6 +391,15 @@ static void chained_continuation (flux_future_t *prev, void *arg)
flux_future_destroy (prev);
}

static void chained_future_decref (struct chained_future *cf)
{
if (--cf->refcount == 0) {
int saved_errno = errno;
free (cf);
errno = saved_errno;
}
}

/* Initialization for a chained future. Get current reactor for this
* context and install it in "previous" future, _then_ set the "then"
* callback for that future to `chained_continuation()` which will
Expand All @@ -393,7 +408,9 @@ static void chained_continuation (flux_future_t *prev, void *arg)
static void chained_future_init (flux_future_t *f, void *arg)
{
struct chained_future *cf = arg;
if (cf == NULL || cf->prev == NULL || cf->next == NULL
if (cf == NULL
|| cf->prev == NULL
|| cf->next == NULL
|| !(flux_future_get_reactor (f))) {
errno = EINVAL;
goto error;
Expand Down Expand Up @@ -425,6 +442,18 @@ static void chained_future_init (flux_future_t *f, void *arg)
fulfill_next (f, cf->next);
}

static void cf_next_destroy (struct chained_future *cf)
{
/* cf->next is being destroyed. If cf->prev is still active, destroy
* it now since no longer makes sense to trigger the cf->prev callback.
*/
flux_future_destroy (cf->prev);
cf->prev = NULL;

/* Release reference on cf taken by cf->next
*/
chained_future_decref (cf);
}

/* Allocate a chained future structure */
static struct chained_future *chained_future_alloc (void)
Expand All @@ -436,12 +465,40 @@ static struct chained_future *chained_future_alloc (void)
free (cf);
return (NULL);
}

/* If cf->next is destroyed then later cf->prev is fulfilled, this
* can cause use-after-free of cf->next in chained_continuation() and/or
* flux_future_continue(3). Additionally, a reasonable expectation
* is that destruction of cf->next will stop any previous future in
* the chain. Therefore, arrange to have this object notified when
* cf->next is destroyed so that cf->prev can also be destroyed
* (if it has not yet been destroyed already. That case is handled
* by nullifying cf->prev when prev is destroyed)
*/
if (flux_future_aux_set (cf->next,
NULL,
cf,
(flux_free_f) cf_next_destroy) < 0) {
flux_future_destroy (cf->next);
free (cf);
return NULL;
}

/* Take one reference on cf for cf->next (released in cf_next_destroy())
*/
cf->refcount = 1;
return (cf);
}

static void chained_future_destroy (struct chained_future *cf)
static void nullify_prev (struct chained_future *cf)
{
free (cf);
/* cf->prev has been destroyed. Ensure we don't reference it again
* by nullifying the reference in cf.
*/
cf->prev = NULL;

/* Remove reference added by this callback */
chained_future_decref (cf);
}

/* Create a chained future on `f` by embedding a chained future
Expand All @@ -453,16 +510,48 @@ static void chained_future_destroy (struct chained_future *cf)
*/
static struct chained_future *chained_future_create (flux_future_t *f)
{
struct chained_future *cf = flux_future_aux_get (f, "flux::chained");
if (cf == NULL && (cf = chained_future_alloc ())) {
if (flux_future_aux_set (f, "flux::chained",
(void *) cf,
(flux_free_f) chained_future_destroy) < 0) {
chained_future_destroy (cf);
return (NULL);
}
struct chained_future *cf;

/* If future `f` is already chained, then just return the existing
* stored chained_future object:
*/
if ((cf = flux_future_aux_get (f, "flux::chained")))
return cf;

/* Otherwise, create one and store it:
*/
if (!(cf = chained_future_alloc ())
|| flux_future_aux_set (f,
"flux::chained",
(void *) cf,
(flux_free_f) chained_future_decref) < 0) {
chained_future_decref (cf);
return NULL;
}

/* Increment refcount for cf->prev
*/
cf->refcount++;
cf->prev = f;

/* Nullify cf->prev on destruction of f to notify cf_next_destroy() that
* prev was already destroyed. (See note in chained_future_alloc()).
*/
if (flux_future_aux_set (f,
NULL,
cf,
(flux_free_f) nullify_prev) < 0) {
chained_future_decref (cf);
return NULL;
}

/* Increment refcount again for nullify_prev aux item. This is necessary
* since the order of aux_item destructors can't be guaranteed.
* This increment is done separately to avoid leaking cf in case of
* flux_future_aux_set() failure above.
*/
cf->refcount++;

/*
* Ensure the empty "next" future we have just created inherits
* the same reactor (if any) and handle (if any) from the previous
Expand Down Expand Up @@ -521,7 +610,8 @@ int flux_future_continue (flux_future_t *prev, flux_future_t *f)

/* "Continue" the chained "next" future embedded in `prev` with an error
*/
void flux_future_continue_error (flux_future_t *prev, int errnum,
void flux_future_continue_error (flux_future_t *prev,
int errnum,
const char *errstr)
{
struct chained_future *cf = chained_future_get (prev);
Expand All @@ -546,7 +636,8 @@ int flux_future_fulfill_next (flux_future_t *f,
}

flux_future_t *flux_future_and_then (flux_future_t *prev,
flux_continuation_f next_cb, void *arg)
flux_continuation_f next_cb,
void *arg)
{
struct chained_future *cf = chained_future_create (prev);
if (!cf)
Expand All @@ -557,7 +648,8 @@ flux_future_t *flux_future_and_then (flux_future_t *prev,
}

flux_future_t *flux_future_or_then (flux_future_t *prev,
flux_continuation_f or_cb, void *arg)
flux_continuation_f or_cb,
void *arg)
{
struct chained_future *cf = chained_future_create (prev);
if (!cf)
Expand Down
55 changes: 48 additions & 7 deletions src/common/libflux/test/composite_future.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,15 @@ void timeout_init (flux_future_t *f, void *arg)
flux_future_fulfill_error (f, errno, NULL);
}

static void flux_future_timeout_clear (flux_future_t *f)
static void future_timeout_clear (flux_future_t *f)
{
flux_watcher_t *w = flux_future_aux_get (f, "watcher");
ok (w != NULL, "timeout stop: got timer watcher");
if (w)
flux_watcher_stop (w);
}

static flux_future_t *flux_future_timeout (double s)
static flux_future_t *future_timeout (double s)
{
double *dptr = calloc (1, sizeof (*dptr));
if (dptr == NULL)
Expand Down Expand Up @@ -429,8 +429,8 @@ void test_composite_all_async (bool with_error)
ok (flux_future_push (fc, "f1", f) == 0,
"flux_future_push success");

if (!(f = flux_future_timeout (0.1)))
BAIL_OUT ("flux_future_timeout failed");
if (!(f = future_timeout (0.1)))
BAIL_OUT ("future_timeout failed");

ok (flux_future_push (fc, "timeout", f) == 0,
"flux_future_push timeout success");
Expand Down Expand Up @@ -470,7 +470,7 @@ void async_any_check (flux_future_t *fc, void *arg)
"async: retrieved handle to timeout future");
ok (flux_future_is_ready (f) == false,
"async: timeout future not yet fulfilled");
flux_future_timeout_clear (f);
future_timeout_clear (f);
async_any_check_rc = 0;
/* Required so we pop out of reactor since we will still have
* active watchers */
Expand All @@ -493,8 +493,8 @@ void test_composite_any_async (bool with_error)
ok (flux_future_push (fc, "f1", f) == 0,
"flux_future_push success");

if (!(f = flux_future_timeout (1.0)))
BAIL_OUT ("flux_future_timeout failed");
if (!(f = future_timeout (1.0)))
BAIL_OUT ("future_timeout failed");

ok (flux_future_push (fc, "timeout", f) == 0,
"flux_future_push timeout success");
Expand Down Expand Up @@ -761,6 +761,45 @@ void test_future_fulfill_next (flux_reactor_t *r)
flux_future_destroy (f2);
}

static bool issue5923_and_then_called = false;
static bool issue5923_then_called = false;

static void issue5923_and_then_cb (flux_future_t *f, void *arg)
{
issue5923_and_then_called = false;
flux_future_destroy (f);
}

static void issue5923_then_cb (flux_future_t *f, void *arg)
{
issue5923_then_called = true;
ok (flux_future_get (f, NULL) < 0 && errno == ETIMEDOUT,
"issue5923: then cb timed out");
flux_future_destroy (f);
}

static void test_issue_5923 (flux_reactor_t *r)
{
flux_future_t *prev, *next;

if (!(prev = future_timeout (0.1)))
BAIL_OUT ("future_timeout failed");
flux_future_set_reactor (prev, r);

if (!(next = flux_future_and_then (prev, issue5923_and_then_cb, NULL)))
BAIL_OUT ("flux_future_and_then failed");

if (flux_future_then (next, 0.001, issue5923_then_cb, NULL) < 0)
BAIL_OUT ("flux_future_then failed");

ok (flux_reactor_run (r, 0) == 0,
"flux_reactor_run returns 0");
ok (issue5923_then_called,
"issue5923: then_cb was called");
ok (!issue5923_and_then_called,
"issue5923: and_then_cb was not called");
}

int main (int argc, char *argv[])
{
flux_reactor_t *reactor;
Expand Down Expand Up @@ -790,6 +829,8 @@ int main (int argc, char *argv[])

test_future_fulfill_next (reactor);

test_issue_5923 (reactor);

flux_reactor_destroy (reactor);

done_testing();
Expand Down

0 comments on commit 16cefe3

Please sign in to comment.