Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow flux_future_continue(3) to work with flux_kvs_lookup(3) #2130

Merged
merged 9 commits into from Apr 30, 2019

Conversation

@grondo
Copy link
Contributor

commented Apr 17, 2019

This PR fixes #2127 by adding a new function flux_future_fulfill_with(3), which allows a fulfilled future to be used to fulfill another future. This is what fulfill_next() in composite_future.c tried to do, but its scope was limited because it didn't have access to flux_future_t internals.

flux_future_fulfill_with(3) copies any fatal error, error result, or normal result into the destination future. It also "embeds" the source future into the target future, so that calls to flux_future_aux_get() can access the source future aux list. This allows futures that are a result of calls which use aux data as part of custom get() methods to work after flux_future_fulfill_with(3). This function steals a reference to the source future, so flux_future_destroy() should not be called on the source future after flux_future_fulfill_with() is called. (If this is confusing, the function could just take a reference instead)

The destination future also "steals" the valid result by copying the result's destructor into its result structure, nullifying the destructor in the source future. This is so that multiple fulfillment can work properly, since once flux_future_reset() is called on the source future, a result pending in the destination future may point to invalid memory.

Finally, composite_future.c:fulfill_next() was updated to use flux_future_fulfill_with() to get these benefits, allowing flux_future_and_then(3) to be used with flux_kvs_lookup(3) and other functions that return futures and make use of the aux data in flux_future_t.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 17, 2019

Nicely done. I'm fine with pulling this in if you're ready.

@trws

This comment has been minimized.

Copy link
Member

commented Apr 17, 2019

This certainly seems to work. The example I was playing with, re-factored to correctly represent and_then and correctly type the intermediate futures, now runs correctly to completion it seems:

    let mut composite = h.kvs_lookup("", 0, "thing")?
        .and_then(|fi| {
            eprintln!("kvs result:{:?}", fi.lookup_get()?);
            h.kvs_lookup("", 0, "other_thing")
        })?
        .and_then(|fi| {
            eprintln!("kvs result2:{:?}", fi.lookup_get()?);
            h.kvs_lookup("", 0, "other_thing")
        })?
        .then(|f| {
            // f is strongly typed with kvs future methods
            println!("kvs result3:{:?}", f.lookup_get().unwrap());
        })?;

The only odd thing, which may have nothing to do with this, is that tacking a wait_for(-1) on the end gives me a "no such file or directory" error as soon as the composite future gets there. If I swap that for a reactor_run, everything works fine seemingly. That happens with or without this patch though, so I'm not sure it should delay merging this.

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 18, 2019

Hold off, I'll throw in a fix for #2135 as well in here.

And I'll try to see if I can reproduce and possibly fix the issue described by @trws above as well.

@grondo grondo force-pushed the grondo:issue#2127 branch from b51ed38 to b58a2f6 Apr 19, 2019
@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 19, 2019

Forced a push here changing the ownership semantics of flux_future_continue_with(3). It was insanity to have this function steal the reference to the source future p, especially when the function could be called internally as a result of another operation.

The function now takes a reference to the source future, and thus fulfill_next() in the composite future code is safe to destroy the future (releasing the implicit reference taken by flux_future_continue(3), which makes more logical sense)

I'm still working on support for #2135

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 19, 2019

There is actually a tricky memory management case I ran across here while trying to implement a fix for #2135.

If a user registers a flux_future_and_then(3) callback on a future f, there is already a fallback case when f is fulfilled with an error. When there is no or_then() callback registered, the fall through immediately uses f to fulfill the next future in the chain so that the error is propagated to either the final future in the chain or the next or_then() callback.

The problem is that futures are typically destroyed in their continuations immediately after the result is used, or for chained futures after flux_future_continue (f, next) is called. The proposed fall back (and the existing one) don't give the user a chance to destroy the future f after it is propagated.

I think the current code only works because the future gets accidentally destroyed after the error is propagated. We could keep this behavior, but then there is a problem if a user decides to destroy the future outside of the and_then() callback -- in normal conditions this will work, but for the error case, there would be a double free.

Probably the only sane solution here is to make it so that a call toflux_future_and_then(3) or flux_future_or_then(3) steals the reference to the target future. This would mean that for and_then or or_then callbacks, flux_future_destroy (f) would not be used. If the future is fulfilled, then the implementation would destroy the future in its internal continuation. Unfulfilled futures would be destroyed by the destructor of the successor future next, which was returned by flux_future_and_then or or_then.

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 19, 2019

Probably the only sane solution here is to make it so that a call toflux_future_and_then(3) or flux_future_or_then(3) steals the reference to the target future.

An alternative might be to document that the default continuation for a chained future which has not had flux_future_continue(3) called is:

flux_future_fulfill_with (next, f);
flux_future_destroy (f);

This avoids changing a lot of code, and also avoids holding onto the memory for chained futures longer than necessary (if we take the approach above of having the next future "own" the memory for the future argument to and_then/or_then, then memory for futures in a chain isn't released until the final future is destroyed.)

This also handles #2135 since the above propagation is now the default, overridden only by a call to flux_future_continue(3).

@trws

This comment has been minimized.

Copy link
Member

commented Apr 21, 2019

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 21, 2019

I’m not sure if this will be useful information or not, but the usual pattern for a “then” type call is that the future returned by then owns the future that’s passed in, as you suggested.

Thanks @trws! This is actually really helpful. Following existing convention seems like the right way to go! I was still struggling a bit with which was the right way to go.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 23, 2019

Should we consider (separate issue) changing flux_future_then() so that the future is implicitly destroyed when the continuation returns? Since there's an incref, it's still user's choice to keep the future around if that's useful, as it sometimes is when the contained result is to be used elsewhere.

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 23, 2019

Should we consider (separate issue) changing flux_future_then() so that the future is implicitly destroyed when the continuation returns? Since there's an incref, it's still user's choice to keep the future around if that's useful, as it sometimes is when the contained result is to be used elsewhere.

I was thinking the same thing. Once you call flux_future_then() the callback owns the memory for the future. My only concern is a user has to know if the callback was actually called to know if the future was destroyed or not. However, now that I write that down, that's no different than the current situation.

@trws

This comment has been minimized.

Copy link
Member

commented Apr 23, 2019

That's exactly it, the runtime effectively "owns" the future at that point, or at least takes a reference to it and should incref it so the user can't pull it out from under you. We effectively have only "shared" futures, in that they're all potentially reference counted, the behavior for a non-refcounted future is usually that it is moved into the next one and the caller can no longer do anything with it, it's effectively destroyed as far as their reference is concerned. Part of this though is that "then" normally itself returns a future which can be waited on to wait for the completion of the callback or an error, which takes care of the issue you mentioned @grondo. The normal chained workflow looks, pseudocode-wise, something like this for ownership:

fut f1 = something_async();
fut f2 = fut_then(f1, [](fut f){ // f2 now owns f1
  int res = something_with_result(fut_get(f));
  // fut_incref(f); // if we want to keep it live for some reason
  fut_fulfill(f, res);
   });
if (fut_wait(f2) < 0) // f1 dies here or before, unless incref called
  handle_error()
int res = fut_get(f2); // often future is actually destroyed by this, optional though
fut_destroy(f2);

The thing where our model differs right now is that it doesn't represent a value coming out of the callback unless you use and/or_then, and rather than returning a future if you want to pass one on you call continue or continue_error to do it. It's trickier to deal with that in C unfortunately since you can't do strong typing on it, but we could probably figure something out.

@grondo grondo force-pushed the grondo:issue#2127 branch from b58a2f6 to 87d7ccc Apr 25, 2019
@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

If we've decided to change the ownership model of base flux_future_t type as @trws suggests above, I'll open a separate issue on that and punt on solving #2135 here, since doing it the easy way could result in impossible-to-solve allocation issues in some cases, and I tried doing the "right" way and it quickly turned into a huge mess.

Once the base flux_future_t type ownership model changes, I think it will make it a lot easier to make the change suggested in #2135 without adding confusing ownership differences between then(), and_then() and or_then().

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

I've force-pushed a branch rebased on current master and constrained the fixes to #2127

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

One more note -- instead of fixing #2135, it would be trivial to temporarily add an assertion in the internal chained future callback that would trigger if a callback did not call either flux_future_continue() or flux_future_continue_error().

@trws

This comment has been minimized.

Copy link
Member

commented Apr 25, 2019

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 25, 2019

Ok, maybe I'll attempt that here once I'm back at a computer

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 26, 2019

That seems like it would be a good stopgap.

Actually, I've followed the original plan here and have pushed a commit that auto-fulfills the next future in the chain with the current future when either flux_future_continue() wasn't called from the callback, or a callback wasn't called at all (i.e. if a future is fulfilled with success and no and_then callback exists, or fulfilled with error and no or_then callback exists.)

If any callback is used, then it is expected that the user will destroy the future within the callback or elsewhere. If no callback was called the future is destroyed. This not only preserves current behavior, but also will be easier to update and reason about if we change the ownership model of the base flux_future_t type in the future.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 29, 2019

Just trying to get up to speed on this one again. It sounds like you covered all the cases I was concerned about (multiple fulfillment, aux collision with multiple futures of the same type in a chain). I'll try a couple of random tests when I get to work, but looks like a great job so far.

And thanks for the inline documentation. I really struggle with this future code and having that will help.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 29, 2019

Following up on our face to face discussion: I was hoping to implement the single-job state synchronization (#1665) on top of flux_job_event_watch(), which returns a future that is fulfilled for each job event. Ideally flux_job_state_watch() would internally handle events, and return a future that is fulfilled only when a watched state is reached. It would operate as a sort of filter on the event watch fulfillments.

Could I use flux_future_and_then() to implement this with a composite future? As discussed, the new continued flag added in this PR presents a difficulty since when an event fulfilling the event future doesn't advance the current state to one of the watched states, there should not be a fulfillment of the composite future. With the flag, the composite future would be auto-fulfilled in that case. Could we add a function that could be called from the event continuation to defeat that behavior for one iteration?

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 29, 2019

Could we add a function that could be called from the event continuation to defeat that behavior for one iteration?

Yes. Perhaps this should be added regardless of whether it satisfies your current use case or not.

The easiest way to support this would be to allow a NULL second argument to flux_future_continue(3). The current prototype for that function is:

/* Set the next future for the chained future `prev` to `f`.
 *  This function steals a reference to `f` and thus flux_future_destroy()
 *  should not be called on `f`. (However, flux_future_destroy() should
 *  still be called on `prev`)
 */
int flux_future_continue (flux_future_t *prev, flux_future_t *f);

Already we document (or should document if I haven't done that yet here) that if flux_future_continue(3) is not called on prev then next is fulfilled using the result from prev by default. We could document that if prev is "continued" with a NULL future, then this automatic fulfill is disabled, and the next future remains unfulfilled.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 29, 2019

That will actually work great for this case. flux_future_continue (NULL) will fulfill the "state" future with the event that transitioned to that watched-for state, and the event continuation can accumulate events in its aux hash. So upon fulfillment, not only the state but the event that led to it, and metadata encoded in the the eventlog thus far can be made evailable via a get function. In particular, upon entering CLEANUP or INACTIVE, one could retrieve either exception or finish metadata.

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 29, 2019

That will actually work great for this case. flux_future_continue (NULL) will fulfill the "state" future with the event that transitioned to that watched-for state, and the event continuation can accumulate events in its aux hash.

Just to be clear, I was thinking

/* Do *not* fulfill the "next" future in the chain on this iteration, and disable
 *  default auto-continue for chained futures that don't have flux_future_continue(3)
 *  called on them from this callback
  */
flux_future_continue (f, NULL);

/* Do fulfill the next future in the chain with the current result stored in `f`:
 */
flux_future_continue (f, f);

I know, that second one is weird. You can also get the same result if you just don't call flux_future_contine() obviously. Maybe we need specialized functions to more clear, like:

#define flux_future_nocontinue (x) flux_future_continue (x, NULL)
#define flux_future_continue_next (x) flux_future_continue (x, x)

It seemed like perhaps you had a better idea above, though. I'm really open to anything that lessens confusion.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 29, 2019

Apologies I had it backwards. I call nothing to fulfill the state future with with the event future (or flux_future_continue (f, f), and I call flux_future_continue (f, NULL) to leave the state future unfulfilled.

Sorry about that.

@grondo grondo force-pushed the grondo:issue#2127 branch from e2adaca to fc4f74b Apr 29, 2019
@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 29, 2019

Ok, I've implemented support for the above (still have to write tests and docs), but thought you could try it out before I get any further?

(Sorry, I also rebased on current master without thinking)

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 29, 2019

I was able to use the above strategy to implement a prototype state monitoring API call! Nice, thank you!

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 29, 2019

Ok, I also pushed some sanity tests into test_composite_future.t for flux_future_continue (f, NULL) and flux_future_continue (f, f) as well as multiple-fulfillment for chained futures.

If this approach is good I will also update documentation.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 29, 2019

Thanks for adding the tests! LGTM!

grondo added 6 commits Apr 17, 2019
Add an implementation of flux_future_fulfill_with(3), which allows
one fulfilled future to be used to fulfill another.

This function not only copies the fatal error, error result, or nornal
result from the source future into the destination future, but "embeds"
the source into the destination future such that the source future's
aux items are available read-only in the destination.

This makes it possible to use one future to fulfill another even if
some get() methods used on that future require auxillary data stored
within the object. (e.g. flux_kvs_lookup_get(3)).

Since flux_future_fulfill_with() propagates a copy of the result from
the source future to the destination future, the function may actually
be called multiple times, as long as the source future is reset
between calls.

On the first call, however, the source future is embedded in the
destination future, so it is an error to call flux_future_fulfill_with()
multiple times on the same target future with a different source.

This function steals a reference to the source future, since its
destruction has to be tied to the destination.
Add tests for flux_future_fulfill_with() in sync and asycn modes.
Add test to ensure aux items in a prev future used to fulfill
the next future in a chain are available from that next future.
For chained composite futures, switch to flux_future_fulfill_with()
to propagate the result of a fulfilled "prev" future to the "next"
future in the chain.

Fixes #2127
Add documentation for flux_future_fulfill_with(3) to the
flux_future_create(3) manpage.
Problem: The explanation of the chained future implementation was spread
throughout the code and overall was confusing.

Add a long comment before declaration of struct chained_future to
hopefully add some clarity for future code spelunkers.
@grondo grondo force-pushed the grondo:issue#2127 branch from 51a79f8 to defaa6b Apr 30, 2019
@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 30, 2019

Just rebased on master and reworded flux_future_and_then(3) manpage to note that fulfilling the next future with the current future after a chained continuation is now the default, and document the use of flux_future_continue (f, NULL).

The manpage could probably use some examples, though they might be a bit involved. Have to think about it a bit.

O/w, if this looks good I can squash down some incremental development.

@codecov-io

This comment has been minimized.

Copy link

commented Apr 30, 2019

Codecov Report

Merging #2130 into master will increase coverage by 0.02%.
The diff coverage is 95.74%.

@@            Coverage Diff             @@
##           master    #2130      +/-   ##
==========================================
+ Coverage   80.43%   80.45%   +0.02%     
==========================================
  Files         200      200              
  Lines       31745    31781      +36     
==========================================
+ Hits        25534    25570      +36     
  Misses       6211     6211
Impacted Files Coverage Δ
src/common/libflux/future.c 88.78% <100%> (+0.84%) ⬆️
src/common/libflux/composite_future.c 81.97% <91.3%> (+0.21%) ⬆️
src/common/libflux/response.c 79.24% <0%> (-1.26%) ⬇️
src/broker/modservice.c 78.84% <0%> (-0.97%) ⬇️
src/cmd/flux-event.c 77.97% <0%> (-0.6%) ⬇️
src/broker/module.c 82.74% <0%> (+0.26%) ⬆️
src/common/libflux/handle.c 84.38% <0%> (+0.67%) ⬆️
src/common/libutil/aux.c 94.44% <0%> (+3.7%) ⬆️
@garlick

This comment has been minimized.

Copy link
Member

commented Apr 30, 2019

Looks good to me. Please squash if you're ready.

grondo added 3 commits Apr 26, 2019
Add a new 'continued' flag for chained futures to allow detection
of whether a future was continued from one of the and_then or
or_then callbacks. If a future was not continued; either because
the user failed to call flux_future_continue(3) in the callback,
or because the appropriate and_then() or or_then() callback didn't
exist; then auto-fulfill the next future in the chain with the
currently fulfilled future.

If a user's callback was *not* called, be sure to still destroy the
future 'prev' from the internal continuation to avoid leaking
a future in a branch where the user has not registered a callback
from which to destroy it.

Fixes #2135
Update test/composite_future.c to include tests for fallthrough
flux_future_continue(3) when continue is not used in callback, as
well as tests for chained futures using multiple fulfillment,
including tests using

 flux_future_continue (f, NULL);

and

 flux_future_continue (f, f);
Note that the default behavior for and_then and or_then callbacks
is to fulfill the next future in the chain with the current future,
unless flux_future_continue(3) is called to fulfill with an intermediate
future.

Also document use of flux_future_continue (f, NULL)
@grondo grondo force-pushed the grondo:issue#2127 branch from defaa6b to 1bd37b2 Apr 30, 2019
@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 30, 2019

Ok, squashed.

@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 30, 2019

One builder failed with

ERROR: t1008-kvs-eventlog.t - exited with status 137 (terminated by signal 9?)

Looks like it just overran the grace timeout:

PASS: t1008-kvs-eventlog.t 6 - flux kvs eventlog append fails with invalid context
# passed all 6 test(s)
flux-start: 0 (pid 11685) Killed
1..6

Restarted builder.

@garlick

This comment has been minimized.

Copy link
Member

commented Apr 30, 2019

Thanks!

@garlick garlick merged commit 29f7ee2 into flux-framework:master Apr 30, 2019
2 checks passed
2 checks passed
Mergify — Summary 1 potential rule
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@grondo

This comment has been minimized.

Copy link
Contributor Author

commented Apr 30, 2019

Thanks!

@grondo grondo deleted the grondo:issue#2127 branch Apr 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.