Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redesign/fix flux_reduce_t handle #298

Merged
merged 10 commits into from
Aug 14, 2015
Merged

Conversation

garlick
Copy link
Member

@garlick garlick commented Jul 28, 2015

This PR is preliminary, not for merging. It's posted as a PR mainly to keep @dongahn up to date with what I'm doing to stuff his work depends on. (He should feel free to redirect me if I am making life harder rather than easier):

  • Rename flux_red_t to flux_reduce_t and make opaque type conformant to flux style
  • Rename functions to be prefixed with flux_reduce_
  • Drop flux_redstack_t and add a standalone flux_list_t class instead.
  • Revive the flux-mon command and mon module (update to modern interfaces)
  • Update the live module to use the new flux_reduce_t names.

See comment at the top of src/modules/mon/mon.c for how to demo the mon module/program.

Coming soon: man page for reduce functions.

@garlick garlick added the review label Jul 28, 2015
@garlick
Copy link
Member Author

garlick commented Jul 28, 2015

By the way, as I go through the flux_reduce_t code, I really get the feeling that it doesn't help all that much and tends to result in unclear code structures. I am hoping that by going through it carefully we will come up with some improvements or maybe even something completely new to replace it.

@garlick
Copy link
Member Author

garlick commented Jul 28, 2015

Thanks, sorry I noticed that as well and was fairly surprised that this was broken!

I've been experimenting today with a bit different interface that is based on messages as "items", and transparently installs a request handler so that passing messages upstream can happen behind the scenes. Still not quite working but this is what the header file looks like:

typedef struct flux_reduce_struct flux_reduce_t;

/* Reduce function should call flux_reduce_recv() until NULL,
 * then flux_reduce_send() with reduced message(s).  The sum of the
 * 'count' fields of messages in and messages out should remain constant.
 */
typedef void   (*flux_reduce_f)(flux_reduce_t *r, int seq, void *arg);

/* Sink function should call flux_reduce_recv() until NULL,
 * disposing of messages.
 */
typedef void   (*flux_sink_f)(flux_reduce_t *r, int seq, void *arg);

/* Flags for flux_reduce_create() that specify flush mode.
 * no flags   - you must call flux_reduce_flush() manually to flush messages.
 * TIMEDFLUSH - timer is started upon receipt of the first message of
 *              a given 'seq'.  After timeout, that batch is flushed.
 * HWMFLUSH   - initially messages are immediately flushed.  The number of
 *              messages in a seq batch is established as the high water mark.
 *              Once the hwm is established, the queue is flushed each time
 *              it is reached for a particualr seq; if it is not reached,
 *              they are flushed when the first message from the next seq
 *              is received.
 * TIMEDFLUSH | HWMFLUSH means flush when either hwm  or timeout is reached.
 * DEBUG logs reduction handle internal actions (very verbose!)
 */
enum {
    FLUX_REDUCE_TIMEDFLUSH = 1,
    FLUX_REDUCE_HWMFLUSH = 2,
    FLUX_REDUCE_DEBUG = 0x1000,
};


/* Create a reduction handle for 'topic'.
 * 'flags' are zero or more of the flags above.
 * 'timeout' is TIMEDFLUSH timeout, in seconds.
 * Installs a reactor timer watcher and a message watcher for topic.
 */
flux_reduce_t *flux_reduce_create (flux_t h, const char *topic,
                                   flux_sink_f sink, flux_reduce_f reduce,
                                   double timeout, void *arg, int flags);

void flux_reduce_destroy (flux_reduce_t *r);

/* Send a message to be reduced.
 * Messages of a particular 'seq' (monotonic) will be reduced together.
 * The count represents the number of messages reduced into this one.
 */
int flux_reduce_send (flux_reduce_t *r, const flux_msg_t *msg);

/* Receive a message of a particular seq.  Returns NULL when there are none.
 * This should be used from reduce or sink callbacks.
 */
flux_msg_t *flux_reduce_recv (flux_reduce_t *r, int seq);

/* Flush messages of a particular seq that are queued in the reduction handle.
 * Calls reduction callback (if any), then sink callback (if any),
 * then forwards what is left upstream.
 */
int flux_reduce_flush (flux_reduce_t *r, int seq);

/* Encode a reduction message.
 * The count reflects the number of original messages represented in a
 * reduced one.  The seq number should monotonically increase for each
 * batch of messages that should be reduced together.  Data is user defined.
 */
flux_msg_t *flux_reduce_request_encode (int seq, int count,
                                        const char *json_data);

/* Decode a reduction message.
 * Caller must free 'json_data' (specify NULL to skip receiving).
 */
int flux_reduce_request_decode (const flux_msg_t *msg, int *seq, int *count,
                                char **json_data);

@dongahn
Copy link
Member

dongahn commented Jul 29, 2015

Interesting idioms! So, this should allow flux_reduce_t users not to have to worry about installing communication callbacks and etc -- hiding that level of details from the users? Now that I think about this, making reduction message-passing-style can also allow you to define the reduction function even outside the calling context... Is this intended?

One concern is, of course, requiring the reduction function message-passing-style (only) may incur higher overhead than necessary when in particular it is used within the calling context. In that case, passing IN/OUT of reduction as function arguments would presumably be more efficient.

BTW, I am implementing something like this in my collective named reduction on top of the current flux_red_t support. I hid communication details and simplified reduction function callback signature. As a module, the named reduction installs its own control reduction callback which then ends up calling the user-provided (well now hardwared) reduction function. It takes two items (left-hand side and right-hand side) as the inputs and return the reduced item to the left-hand side. I also hid the sink function from the user by this module's control sink callback automatically takes care of this. This probably was possible because the reduction of this style is more or less deterministic. But base reduction support like this, you probably won't be able to rid the need for the sink function...

BTW, if you expect significant churns on your core reduction support, I am happy to hold my investigation a bit until you are satisfied with revamping the core reduction support.

@garlick
Copy link
Member Author

garlick commented Jul 29, 2015

Thanks for having a look @dongahn. I could take a step back and build on this PR adding a manual page and some tests, then after we've got that and your stuff in, take a look again (together) at a new interface?

@garlick
Copy link
Member Author

garlick commented Jul 29, 2015

I think my plan will be to back off the above message oriented version, and return to something closer to what was there originally, with the following changes:

  • Fix the naming/abstract type style
  • Callbacks should receive flux_reduce_t argument, not flux_t
  • Fix the timeout logic and units (use seconds in a double)
  • Register callbacks at creation
  • Eliminate the list/stack type and add push/pop methods for flux_reduce_t
  • Man page
  • Unit test

@garlick garlick force-pushed the revive_mon branch 2 times, most recently from 9f7ec3b to 68736f7 Compare July 29, 2015 18:42
@dongahn
Copy link
Member

dongahn commented Jul 29, 2015

This works for me as well. I will continue to look at the name reduction support then, and yes these experienced can be combined to generate a new interface!

@dongahn
Copy link
Member

dongahn commented Jul 29, 2015

The changes look good to me. Adjusting the named reduction to that changes should be straightforward. A question what is the "register callbacks at creation" support though? Is this to hide communication needed for reduction from the users?

transparently installs a request handler so that passing messages upstream can happen behind the scenes

In any case, I think hiding communication details for reduction is an excellent direction to pursue.

@garlick
Copy link
Member Author

garlick commented Jul 29, 2015

I've backed out the changes that hide the communications details from the user for now, but in the recent set of changes, there is a new callback called "forward" which, if defined, is called during a flush to forward items upstream in the TBON. "sink" used to do double duty to consume items on rank 0 and forward them on rank > 0. I thought this way made it a bit clearer.

What I have now is pretty easy to port to from the old interface. I've converted modules/live and modules/mon with no trouble.

I'll have a manual page shortly - once that's in it would be a good time to ask you to review briefly to make sure I'm on the right track.

@dongahn
Copy link
Member

dongahn commented Jul 29, 2015

Cool! Thanks @garlick

@garlick
Copy link
Member Author

garlick commented Jul 30, 2015

OK, just forced a push with the following changes:

  • rebased to current master
  • callbacks take flux_reduce_t arg instead of flux_t
  • eliminate list class and provide push/pop directly on flux_reduce_t
  • fix naming/style
  • timeout is a double (seconds)
  • individual setters replaced with more args in the create function, for stuff that has to be set at creation time anyway
  • fixed HWM logic - there was a mismatch between HWM established during training on unreduced items, and later use of HWM on reduced messages. Added itemsize callback so HWM code can access user-provided encoding of how many items are represented by a reduced item.
  • corrected some asciidoc/dictionary probs on RHEL6 from PR cleanup files on exit #276

Still todo:

  • timed flush doesn't work due to uniform timeout at every level of the tree (kind of a classic reduction pitfall I think, or at least one I was aware of and yet apparently didn't address here)
  • unit tests
FLUX_REDUCE_CREATE(3)     Flux Programming Reference     FLUX_REDUCE_CREATE(3)

NAME
       flux_reduce_create, flux_reduce_destroy, flux_reduce_append,
       flux_reduce_pop, flux_reduce_push - low level support for reduction
       pattern

SYNOPSIS
       #include <flux/core.h>

       typedef void (*flux_reduce_f)(flux_reduce_t *r, int batch, void *arg);

       typedef void (*flux_sink_f)(flux_reduce_t *r, int batch, void *arg);

       typedef void (*flux_forward_f)(flux_reduce_t *r, int batch, void *arg);

       typedef int (*flux_itemsize_f)(void *item);

       flux_reduce_t *flux_reduce_create (flux_t h, flux_free_f destroy,
               flux_itemsize_f itemsize, flux_reduce_f reduce, flux_forward_f forward,
               flux_sink_f sink, double timeout, void *arg, int flags);

       void flux_reduce_destroy (flux_reduce_t *r);

       int flux_reduce_append (flux_reduce_t *r, void *item, int batch);

       void *flux_reduce_pop (flux_reduce_t *r);

       int flux_reduce_push (flux_reduce_t *r, void *item);

DESCRIPTION
       flux_reduce_create() sets up a reduction handle, a queue with semantics
       useful for implementing hierarchical reduction patterns over the Flux
       tree based overlay network. It is a "bare bones" interface that
       requires the user to provide callbacks to do the work of reducing,
       forwarding, and consuming items of an opaque, user-defined type.
       destroy may optionally be provided as a destructor for these items.

       Items are appended to the reduction handle with flux_reduce_append()
       with batch, a monotonically increasing sequence number. Only items in
       batches of the same number can be reduced together. Each time an item
       is appended to the handle and two or more items are available there,
       the reduce callback, if non-NULL, is called.

       reduce should call flux_reduce_pop() to obtain items for the current
       batch, perform some operation on them, then call flux_reduce_push() to
       put items back. The purpose of reduce is to consolidate items so that
       the amount of data arriving at each node of the tree can be minimized
       to improve scalability.

       Depending on flags, the queue will be flushed periodically. The flush
       operation empties the queue of items in the current batch by calling
       forward and/or sink, if defined, then internally popping items off and
       destroying them with destroy, if defined.

       The purpose of forward is to forward items "upstream" towards the root
       of the overlay network. Generally this should be defined only for ranks
       greater than zero. forward should call flux_reduce_pop() to obtain
       items. The upstream rank, on receipt, would then call
       flux_reduce_append() to append items to its reduction handle.

       The purpose of sink is to consume items in their fully reduced form.
       Generally this should only be defined for rank zero. sink should call
       flux_reduce_pop() to obtain items.

       flags is the logical "or" of zero or more of the following flags:

       FLUX_REDUCE_TIMEDFLUSH
           When an item of a new batch is appended to the reduction handle, a
           reactor timer is started on handle h for timeout seconds. When the
           timer expires, any items in the queue are flushed. If this flag is
           used, timeout must be greater than zero.

       FLUX_REDUCE_HWMFLUSH
           A high water mark for the number of messages appended to the handle
           is "learned" on the first batch. Thereafter, a flush occurs
           whenever the high water mark is reached. The high water mark can be
           increased by subsequent batches, but will never decrease. If this
           flag is used, the itemsize callback must be provided (see below).

       If no flush policy is selected, or during the learning period of
       FLUX_REDUCE_HWMFLUSH, the flush is immediate and no reduction can be
       performed.

       If both FLUX_REDUCE_TIMEDFLUSH and FLUX_REDUCE_HWMFLUSH are defined,
       the flush will occur on reaching the high water mark, or timer
       expiration, whichever occurs first.

       If an straggler item from a previously flushed batch is received, that
       item is flushed immediately. During upstream or sink callbacks,
       flux_reduce_pop() will return NULL after the straggler is popped
       without co-mingling any items from the current batch.

       The high water mark training period counts unreduced messages, while
       subsequent high water mark comparisons must count reduced messages. If
       FLUX_REDUCE_HWMFLUSH is used, the itemsize callback must be provided to
       interrogate an item for the number of unreduced items it represents.

RETURN VALUE
       flux_reduce_create() returns a flux_reduce_t object on success. On
       error, NULL is returned, and errno is set appropriately.

       flux_reduce_append() and flux_reduce_push() return zero on success. On
       error, -1 is returned, and errno is set appropriately.

       flux_reduce_pop() returns an item or NULL if there are none. It does
       not report errors.

ERRORS
       EINVAL
           Some arguments were invalid.

       EINVAL
           Some arguments were invalid.

EXAMPLES
       In this example comms module, an "item" is a string representation of a
       nodeset, e.g. "[0-3,5-12]". On each heartbeat, each rank provides its
       rank number as input to the reduction handle. The reduction consists of
       building a nodeset string which is printed on rank 0. For example, in a
       1024 rank session, each rank provides "0", "1", "2", ..., "1023", and
       after the reduction, rank 0 prints "[0-1023]".

           #include <stdio.h>
           #include <flux/core.h>
           #include "src/common/libutil/shortjson.h"
           #include "src/common/libutil/nodeset.h"

           struct context {
               int batchnum;
               flux_reduce_t *r;
               char rankstr[16];
               flux_t h;
           };

           int itemsize (void *item)
           {
               int count = 1;
               nodeset_t nodeset;

               if ((nodeset = nodeset_new_str (item))) {
                   count = nodeset_count (nodeset);
                   nodeset_destroy (nodeset);
               }
               return count;
           }

           void sink (flux_reduce_t *r, int batchnum, void *arg)
          {
               char *item;

               while ((item = flux_reduce_pop (r))) {
                   fprintf (stderr, "%d: %s\n", batchnum, item);
                   free (item);
               }
           }

           void forward (flux_reduce_t *r, int batchnum, void *arg)
           {
               struct context *ctx = arg;
               char *item;
               flux_rpc_t *rpc;

               while ((item = flux_reduce_pop (r))) {
                   JSON out = Jnew ();
                   Jadd_int (out, "batchnum", batchnum);
                   Jadd_str (out, "nodeset", item);
                   rpc = flux_rpc (ctx->h, "treduce.forward", Jtostr (out),
                                   FLUX_NODEID_UPSTREAM, FLUX_RPC_NORESPONSE);
                   flux_rpc_destroy (rpc);
                   Jput (out);
                   free (item);
               }
           }

           void reduce (flux_reduce_t *r, int batchnum, void *arg)
           {
               nodeset_t nodeset = NULL;
               char *item;

               if ((item = flux_reduce_pop (r))) {
                   nodeset = nodeset_new_str (item);
                   free (item);
               }
               if (nodeset) {
                   while ((item = flux_reduce_pop (r))) {
                       nodeset_add_str (nodeset, item);
                       free (item);
                   }
                   if ((item = strdup (nodeset_str (nodeset)))) {
                       if (flux_reduce_push (r, item) < 0)
                           free (item);
                   }
                   nodeset_destroy (nodeset);
               }
           }

           void downstream_cb (flux_t h, flux_msg_watcher_t *w,
                               const flux_msg_t *msg, void *arg)
           {
               struct context *ctx = arg;
               const char *json_str, *nodeset_str;
               JSON in = NULL;
               int batchnum;
               char *item;

               if (flux_request_decode (msg, NULL, &json_str) < 0
                       || !(in = Jfromstr (json_str))
                       || !Jget_int (in, "batchnum", &batchnum)
                       || !Jget_str (in, "nodeset", &nodeset_str))
                   return;
               if ((item = strdup (nodeset_str))) {
                   if (flux_reduce_append (ctx->r, item, batchnum) < 0)
                       free (item);
               }
               Jput (in);
           }

           void heartbeat_cb (flux_t h, flux_msg_watcher_t *w,
                              const flux_msg_t *msg, void *arg)
           {
               struct context *ctx = arg;
               char *item = strdup (ctx->rankstr);
               if (item)
                   if (flux_reduce_append (ctx->r, item, ctx->batchnum++) < 0)
                       free (item);
           }

           struct flux_msghandler htab[] = {
               { FLUX_MSGTYPE_EVENT,     "hb",              heartbeat_cb },
               { FLUX_MSGTYPE_REQUEST,   "treduce.forward", downstream_cb },
               FLUX_MSGHANDLER_TABLE_END,
           };

           int mod_main (flux_t h, zhash_t *args)
           {
               struct context ctx;
               int rank = flux_rank (h);

               ctx.batchnum = 0;
               snprintf (ctx.rankstr, sizeof (ctx.rankstr), "%d", rank);
               ctx.h = h;

               if (!(ctx.r = flux_reduce_create (h, free, itemsize, reduce,
                                                 rank > 0 ? forward : NULL,
                                                 rank == 0 ? sink : NULL,
                                                 0, &ctx, FLUX_REDUCE_HWMFLUSH)))
                   return -1;
               if (flux_event_subscribe (h, "hb") < 0)
                   return -1;
               if (flux_msg_watcher_addvec (h, htab, &ctx) < 0)
                   return -1;
               if (flux_reactor_start (h) < 0)
                   return -1;
               flux_msg_watcher_delvec (h, htab);
               return 0;
           }

           MOD_NAME ("treduce");

AUTHOR
       This page is maintained by the Flux community.

RESOURCES
       Github: http://github.com/flux-framework

COPYRIGHT
       Copyright (C) 2014 Lawrence Livermore National Security, LLC.

       Flux is free software; you can redistribute it and/or modify it under
       the terms of the GNU General Public License as published by the Free
       Software Foundation; either version 2 of the license, or (at your
       option) any later version.

flux-core 0.1.0                   07/29/2015             FLUX_REDUCE_CREATE(3)

@dongahn
Copy link
Member

dongahn commented Jul 30, 2015

@garlick: I was wondering about the uniformed timeout issue as well. Because the timer won't start exactly, timeout could be naturally staggered, but still I thought this could be improved. Can I ask what you have in mind as a solution? I was thinking along the line of providing a mechanism to set the timeout value as some linear function of the TBON level. (e.g., 1/level * value) and/or to start the timer when the module gets the first input remotely.

@dongahn
Copy link
Member

dongahn commented Jul 30, 2015

BTW, just to clarify, do you want to merge this PR once you are done with the unit tests or do you consider this still a PR for review only? IMHO this should be considered to be merged once all is said and done. In any case, if this is to be merged, I will take a look at the PR more closely.

@garlick
Copy link
Member Author

garlick commented Jul 30, 2015

Yeah, I was pondering two ideas:

  1. ask the broker for the TBON level of this node and the max depth, and then calculate the timeout as (max_depth - level) * (timeout / max_depth). Problem: the broker would need to either calculate this or capture it via the hello protocol, then offer it as a service.

  2. there could be a training period using some timeout_step. Start rank 0 at timeout = timeout_step and ranks > 0 at timeout = 0. For each batch, if items arrive after timeout, set timeout += timeout_step. Problem: it would take a minimum of max_depth - 1 iterations to train the timeouts for all levels.

Maybe both of these could be implemented, say as FLUX_REDUCE_TIMEDFLUSH and FLUX_REDUCE_ADAPTFLUSH?

Ideally I'll complete this sometime today and ask for review. For now, I mainly was asking for a (cursory) review of the API and user semantics described in the man page.

@garlick
Copy link
Member Author

garlick commented Jul 30, 2015

Also:

  1. Problem: tree depth can change dynamically on parent failure failure, then timeouts may be wrong. (Watch for an event signifying re-parenting?)

  2. Problem: a slow node could push up adaptive timeout. (Limit learning period to max_depth - 1 iterations/batches?)

@garlick
Copy link
Member Author

garlick commented Jul 30, 2015

Just rebased on current master with the following changes:

  • flux_reduce_create() now takes a struct of function pointers. function pointer types are dropped.
  • modified flux_info() so it can obtain the TBON branching factor (needed in timeout calculation)
  • Added flux_timer_watcher_reset() to allow a timer to be reused with a new timeout
  • FLUX_REDUCE_TIMEDFLUSH now works. Timeout at each TBON level is a linear function of the tree level, with leaves at zero, and root at 100%.
  • docs updated

Still todo - unit test.

@garlick garlick force-pushed the revive_mon branch 3 times, most recently from f87a4d2 to cafbbb5 Compare July 31, 2015 00:59
@garlick
Copy link
Member Author

garlick commented Jul 31, 2015

I think this is the first iteration I'd call a merge candidate.

Added unit tests that use the loop connector, and cover the no-flush-policy and HWMFLUSH policy pretty well. The TIMEDFLUSH needs coverage but maybe I could add that after my week off?

The reduce.c stuff was really broken before, and now it is (actually!) working, with improved API, and with documentation.

The "mon" module and flux-mon utility is still a demo but it works and has been updated for the API changes of the past year.

In case it wasn't too clear from my previous comment, I changed the signature of a public API function flux_info(). It now returns rank and size as uint32_t's and the tree branching factor rather than the (useless) treeroot boolean.

A couple of memory leak fixes discussed in #161 were tossed in here as well.

I will keep an eye on this tomorrow; after that I will not be able to respond until monday Aug 10.

@garlick
Copy link
Member Author

garlick commented Jul 31, 2015

Just rebased on current master, updated dictionary, fixed a man page typo, and squashed some commits down further.

@grondo
Copy link
Contributor

grondo commented Jul 31, 2015

Would you actually be willing to split the unrelated leak fixes and doc/man3 fixup into a separate PR? Those ought to go in right away, whereas we might want more comments on the mon and reduce additions (maybe to merge when you return if all is ok).

@garlick
Copy link
Member Author

garlick commented Jul 31, 2015

Sure will do that.

@dongahn
Copy link
Member

dongahn commented Jul 31, 2015

Both FLUX_REDUCE_TIMEDFLUSH and FLUX_REDUCE_ADAPTFLUSH sound great to me. Just as a side note, Can the service used to calculate this also expose level and max depth? This can be useful for other distributed algorithms that should be adapted according to where they are in the TBON.

@dongahn
Copy link
Member

dongahn commented Jul 31, 2015

Sorry -- reading top to bottom. Perhaps you already have this with flux_info!

@garlick
Copy link
Member Author

garlick commented Aug 8, 2015

OK, here's a rebase on current master plus the following changes:

  • Add flux_reduce_opt_get(), flux_reduce_opt_set() to access hwm and timeout values directly, also the weighted and unweighted item count (number of items currently queued).
  • Add test coverage for get/set plus TIMEDWAIT flush policy
  • Rename op.itemsize to op.itemweight
  • Fix bug: under TIMEDFLUSH policy, items arriving after flush would not be flushed until next batch started
  • Fix bug: timer was being reset was being reset when each item was appended, instead of ticking down from the arrival of the first item
  • Update/improve documentation

@garlick
Copy link
Member Author

garlick commented Aug 10, 2015

@dongahn, any feedback on the recent changes?

I was just thinking maybe the flux-mon command and mon module ought to be dropped from this PR as they are not especially useful on their own, and there's now a working example in the flux_reduce_create(3) manual page, plus the unit tests. In fact I will go ahead and do a rebase minus those.

I think what's left is a clear improvement over the reduce.[ch] in the tree now and be merged once @dongahn feels OK with it.

@garlick garlick changed the title revive the 'mon' module from ngrm repo redesign/fix flux_reduce_t handle Aug 10, 2015
@grondo
Copy link
Contributor

grondo commented Aug 12, 2015

This looks good to me, a great improvement.

I think I would be inclined to merge, if @dongahn gives an ACK, since this is a clear improvement over what is in-tree already.

One question, does the loop test offer full coverage of the reduce code, or could we add a multiple broker test in the sharness area? (Haven't studied the code enough to know the answer here.)

@garlick
Copy link
Member Author

garlick commented Aug 12, 2015

I think the loop coverage is probably sufficient for now, although I think what you're proposing would be complimentary to the unit testy stuff in the loop test. Maybe a TBD? I could open an issue on that.

@grondo
Copy link
Contributor

grondo commented Aug 13, 2015

Ok, then I'm inclined to merge if I don't hear any objections

@dongahn
Copy link
Member

dongahn commented Aug 13, 2015

Sorry folks, I have been very busy with guests and etc. I will try to get to this this afternoon.

@dongahn
Copy link
Member

dongahn commented Aug 14, 2015

Yes, this looks great to me!

I just have one comment -- which I should have raised before, but this just dawned on me.

For TIMEDFLUSH, the user of this API would currently pass a timeout value for the root, which then linearly and progressively reduced towards the fringes of the tree. This is good for many case. But for some other cases, I can see the user might want to pass a timeout value used at the fringes of the tree and have it progressively increased towards the root (i.e., inverse formula). This case would be useful when you have a constant number of items to reduce in a scale-independent fashion.

Perhaps, we can provide an option to set the timeout as (H-h) * timeout instead of the current formula?

Come to think of it, though, as we are using linear scaling, either formula can have opposite issues as the tree height gets high. If the user uses a constant timeout regardless of the scale, the passed timeout can become too small at the last tree level with the current formula, and with the inverse formula it can become too large at the root.

A better practice may be for the users themselves to adjust the timeout value in accordance with the tree-height parameters, H and h. We can just document this (i.e., encouraging the users to use a height-aware timeout scheme) in the flux_reduce_create man page and be done with this...

@garlick
Copy link
Member Author

garlick commented Aug 14, 2015

Thanks @dongahn! I did provide a getter/setter for the timeout so the user can override the calculated timeout at each level with their own. Also, in the man page for flux_info() the example provided calculates the tree height (total and at current rank) using rank, size, branching factor. So I think it's already possible to do what you are suggesting.

I think the next step is to make timeouts adaptive as in issue #307 but my feeling is that can wait for another time as we have other priorities right now.

@grondo
Copy link
Contributor

grondo commented Aug 14, 2015

@garlick, want to quickly rebase on top of current master and then I'll push the button? (Just to keep linear history)

Instead of returning the (now fairly useless) boolean 'treeroot',
return the TBON branching factor under "arity".
Return "arity" instead of treeroot.
Change type of "rank" and "size" to uint32_t.
It turns out that starting and stopping a timer watcher doesn't
reset the timeout back to the original.  This is needed for timeouts
that need to be started and stopped as is needed in reduce.c
Change flux_red_t to flux_reduce_t and make it refer to an incomplete
struct rather than a pointer to one (RFC 7).

Rename FLUX_RED_* flags to FLUX_REDUCE_*.

Instead of individual setters for reduction ops, create a structure
of function pointers that is passed into flux_reduce_create().

Make reduce and sink ops have the same prototype.

Add destroy op so that items can be disposed of if sink is not defined.

Add forward op.  Sink used to do double duty as sink and forward
and was always implemented as a conditional with two branches.
Ignore "forward" on rank 0, and "sink" on rank != 0.

Drop flux_reduce_flush() from public API.

Drop flux_redstack_t type and add functions that operate directly
on the flux_reduce_t type: flux_reduce_push(), flux_reduce_pop().

Added itemweight op that allows an item to be interrogated
as to how many original items it represents.  If using the HWM
flush policy, this is required (or EINVAL).

Only call reduce if >1 items are queued

Change timeout to seconds and make the type a double.
Stagger the timeouts based on the level in the tree,
scaled down at the leaves, full timeout at the root.
Since there may be multiple items to reduce at the leaves,
avoid setting TIMEDFLUSH timer there to zero.  Use the tree
height plus one to calculate it.

Ensure that the reactor timer is reset between calls.

Add flux_reduce_opt_get/set() for accessing
- learned high water mark
- calculated timeout values
- unweighted item count
- weighted item count
The live module uses flux_reduce_t to store nodeset into the
kvs at the root.  Update for the new API.
The loop test is able to exercise all the flush policies without
actually instantiating a flux session.  It does this by creating
a single flux_reduce_t queue, putting items in it, and testing which
callbacks are made.  For TIMEDWAIT policy, it starts a reactor loop
so that timers can fire.

Edge cases such as tardy items from earlier batches are covered.
Create a somewhat detailed manual page for flux_reduce_create().
Add a man page for flux_info() that covers the new arguments.
Create an example (included in man page) that is compiled as part
of make check.  The example demonstrates how to calculate tree
height from info returned by flux_info().
Add a few words that came up after flux_reduce_create(3)
and flux_info(3) were added.
@garlick
Copy link
Member Author

garlick commented Aug 14, 2015

OK, done. I did some minor editing of commit messages also.

grondo added a commit that referenced this pull request Aug 14, 2015
redesign/fix flux_reduce_t handle
@grondo grondo merged commit 5396019 into flux-framework:master Aug 14, 2015
@grondo grondo removed the review label Aug 14, 2015
@garlick garlick deleted the revive_mon branch February 25, 2020 17:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants