Skip to content

Commit

Permalink
Merge 96b46c0 into c6c48fd
Browse files Browse the repository at this point in the history
  • Loading branch information
chu11 committed Mar 7, 2018
2 parents c6c48fd + 96b46c0 commit f02661f
Show file tree
Hide file tree
Showing 4 changed files with 446 additions and 81 deletions.
28 changes: 23 additions & 5 deletions src/modules/kvs/kvs.c
Expand Up @@ -881,6 +881,7 @@ static void kvstxn_apply (kvstxn_t *kt)
wait_t *wait = NULL;
int errnum = 0;
kvstxn_process_t ret;
bool fallback = false;

namespace = kvstxn_get_namespace (kt);
assert (namespace);
Expand Down Expand Up @@ -986,17 +987,28 @@ static void kvstxn_apply (kvstxn_t *kt)
setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1);
setroot_event_send (ctx, root, names);
} else {
flux_log (ctx->h, LOG_ERR, "transaction failed: %s",
flux_strerror (errnum));
error_event_send (ctx, root->namespace, kvstxn_get_names (kt),
errnum);
fallback = kvstxn_fallback_mergeable (kt);

flux_log (ctx->h, LOG_ERR, "kvstxn failed: %s%s",
flux_strerror (errnum),
fallback ? " (is fallbackable)" : "");

/* if merged transaction is fallbackable, ignore the fallback option
* if it's an extreme "death" like error.
*/
if (errnum == ENOMEM || errnum == ENOTSUP)
fallback = false;

if (!fallback)
error_event_send (ctx, root->namespace, kvstxn_get_names (kt),
errnum);
}
wait_destroy (wait);

/* Completed: remove from 'ready' list.
* N.B. treq_t remains in the treq_mgr_t hash until event is received.
*/
kvstxn_mgr_remove_transaction (root->ktm, kt);
kvstxn_mgr_remove_transaction (root->ktm, kt, fallback);
return;

stall:
Expand Down Expand Up @@ -1046,6 +1058,12 @@ static int kvstxn_check_root_cb (struct kvsroot *root, void *arg)
*/
if (kvstxn_mgr_merge_ready_transactions (root->ktm) < 0)
kvstxn_set_aux_errnum (kt, errno);
else {
/* grab new head ready commit, if above succeeds, this
* must succeed */
kt = kvstxn_mgr_get_ready_transaction (root->ktm);
assert (kt);
}
}

/* It does not matter if root has been marked for removal,
Expand Down
187 changes: 131 additions & 56 deletions src/modules/kvs/kvstxn.c
Expand Up @@ -43,6 +43,10 @@
#include "kvstxn.h"
#include "kvs_util.h"

#define KVSTXN_PROCESSING 0x01
#define KVSTXN_MERGED 0x02 /* kvstxn is a merger of transactions */
#define KVSTXN_MERGE_COMPONENT 0x04 /* kvstxn is member of a merger */

struct kvstxn_mgr {
struct cache *cache;
const char *namespace;
Expand All @@ -64,6 +68,7 @@ struct kvstxn {
blobref_t newroot;
zlist_t *missing_refs_list;
zlist_t *dirty_cache_entries_list;
int internal_flags;
kvstxn_mgr_t *ktm;
enum {
KVSTXN_STATE_INIT = 1,
Expand Down Expand Up @@ -155,6 +160,13 @@ int kvstxn_set_aux_errnum (kvstxn_t *kt, int errnum)
return kt->aux_errnum;
}

bool kvstxn_fallback_mergeable (kvstxn_t *kt)
{
if (kt->internal_flags & KVSTXN_MERGED)
return true;
return false;
}

json_t *kvstxn_get_ops (kvstxn_t *kt)
{
return kt->ops;
Expand Down Expand Up @@ -717,6 +729,11 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt,
if (kt->errnum)
return KVSTXN_PROCESS_ERROR;

if (!(kt->internal_flags & KVSTXN_PROCESSING)) {
kt->errnum = EINVAL;
return KVSTXN_PROCESS_ERROR;
}

switch (kt->state) {
case KVSTXN_STATE_INIT:
case KVSTXN_STATE_LOAD_ROOT:
Expand Down Expand Up @@ -1022,14 +1039,39 @@ bool kvstxn_mgr_transaction_ready (kvstxn_mgr_t *ktm)

kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm)
{
if (kvstxn_mgr_transaction_ready (ktm))
return zlist_first (ktm->ready);
if (kvstxn_mgr_transaction_ready (ktm)) {
kvstxn_t *kt = zlist_first (ktm->ready);
kt->internal_flags |= KVSTXN_PROCESSING;
return kt;
}
return NULL;
}

void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt)
void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt,
bool fallback)
{
zlist_remove (ktm->ready, kt);
if (kt->internal_flags & KVSTXN_PROCESSING) {
bool kvstxn_is_merged = false;

if (kt->internal_flags & KVSTXN_MERGED)
kvstxn_is_merged = true;

zlist_remove (ktm->ready, kt);

if (kvstxn_is_merged) {
kvstxn_t *kt_tmp = zlist_first (ktm->ready);
while (kt_tmp && (kt_tmp->internal_flags & KVSTXN_MERGE_COMPONENT)) {
if (fallback) {
kt_tmp->internal_flags &= ~KVSTXN_MERGE_COMPONENT;
kt_tmp->flags |= FLUX_KVS_NO_MERGE;
}
else
zlist_remove (ktm->ready, kt_tmp);

kt_tmp = zlist_next (ktm->ready);
}
}
}
}

int kvstxn_mgr_get_noop_stores (kvstxn_mgr_t *ktm)
Expand All @@ -1049,66 +1091,69 @@ int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm)

static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src)
{
json_t *names = NULL;
json_t *ops = NULL;
int i, len, saved_errno;
int i, len;

if ((dest->flags & FLUX_KVS_NO_MERGE) || (src->flags & FLUX_KVS_NO_MERGE))
if (src->flags & FLUX_KVS_NO_MERGE
|| dest->flags != src->flags)
return 0;

if ((len = json_array_size (src->names))) {
if (!(names = json_copy (dest->names))) {
saved_errno = ENOMEM;
goto error;
}
for (i = 0; i < len; i++) {
json_t *name;
if ((name = json_array_get (src->names, i))) {
if (json_array_append (names, name) < 0) {
saved_errno = ENOMEM;
goto error;
if (json_array_append (dest->names, name) < 0) {
errno = ENOMEM;
return -1;
}
}
}
}
if ((len = json_array_size (src->ops))) {
if (!(ops = json_copy (dest->ops))) {
saved_errno = ENOMEM;
goto error;
}
for (i = 0; i < len; i++) {
json_t *op;
if ((op = json_array_get (src->ops, i))) {
if (json_array_append (ops, op) < 0) {
saved_errno = ENOMEM;
goto error;
if (json_array_append (dest->ops, op) < 0) {
errno = ENOMEM;
return -1;
}
}
}
}

if (names) {
json_decref (dest->names);
dest->names = names;
}
if (ops) {
json_decref (dest->ops);
dest->ops = ops;
}
return 1;
}

error:
json_decref (names);
json_decref (ops);
errno = saved_errno;
return -1;
static kvstxn_t *kvstxn_create_empty (kvstxn_mgr_t *ktm, int flags)
{
kvstxn_t *ktnew;

if (!(ktnew = calloc (1, sizeof (*ktnew))))
goto error_enomem;
if (!(ktnew->ops = json_array ()))
goto error_enomem;
if (!(ktnew->names = json_array ()))
goto error_enomem;
if (!(ktnew->missing_refs_list = zlist_new ()))
goto error_enomem;
if (!(ktnew->dirty_cache_entries_list = zlist_new ()))
goto error_enomem;
ktnew->flags = flags;
ktnew->ktm = ktm;
ktnew->state = KVSTXN_STATE_INIT;
return ktnew;

error_enomem:
kvstxn_destroy (ktnew);
errno = ENOMEM;
return NULL;
}

/* Merge ready transactions that are mergeable, where merging consists
* of popping the "donor" transaction off the ready list, and
* appending its ops to the top transaction. The top transaction can
* be appended to if it hasn't started, or is still building the
* rootcpy, e.g. stalled walking the namespace.
* creating a new kvstxn_t, and merging the other transactions in the
* ready queue and appending their ops/names to the new transaction.
* After merging, push the new kvstxn_t onto the head of the ready
* queue. Merging can occur if the top transaction hasn't started, or
* is still building the rootcpy, e.g. stalled walking the namespace.
*
* Break when an unmergeable transaction is discovered. We do not
* wish to merge non-adjacent transactions, as it can create
Expand All @@ -1124,30 +1169,60 @@ static int kvstxn_merge (kvstxn_t *dest, kvstxn_t *src)

int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm)
{
kvstxn_t *kt = zlist_first (ktm->ready);
kvstxn_t *first, *second, *new;
kvstxn_t *nextkt;
int count = 0;

/* transaction must still be in state where merged in ops can be
* applied */
if (kt
&& kt->errnum == 0
&& kt->state <= KVSTXN_STATE_APPLY_OPS
&& !(kt->flags & FLUX_KVS_NO_MERGE)) {
kvstxn_t *nkt;
while ((nkt = zlist_next (ktm->ready))) {
int ret;
first = zlist_first (ktm->ready);
if (!first
|| first->errnum != 0
|| first->aux_errnum != 0
|| first->state > KVSTXN_STATE_APPLY_OPS
|| (first->flags & FLUX_KVS_NO_MERGE)
|| first->internal_flags & KVSTXN_MERGED)
return 0;

if ((ret = kvstxn_merge (kt, nkt)) < 0)
return -1;
second = zlist_next (ktm->ready);
if (!second
|| (second->flags & FLUX_KVS_NO_MERGE)
|| (first->flags != second->flags))
return 0;

/* if return == 0, we've merged as many as we currently
* can */
if (!ret)
break;
if (!(new = kvstxn_create_empty (ktm, first->flags)))
return -1;
new->internal_flags |= KVSTXN_MERGED;

nextkt = zlist_first (ktm->ready);
do {
int ret;

/* Merged kvstxn, remove off ready list */
zlist_remove (ktm->ready, nkt);
if ((ret = kvstxn_merge (new, nextkt)) < 0) {
kvstxn_destroy (new);
return -1;
}
}

if (!ret)
break;

count++;
} while ((nextkt = zlist_next (ktm->ready)));

/* if count is zero, checks at beginning of function are invalid */
assert (count);

nextkt = zlist_first (ktm->ready);
do {
/* Wipe out KVSTXN_PROCESSING flag if user previously got
* the kvstxn_t
*/
nextkt->internal_flags &= ~KVSTXN_PROCESSING;
nextkt->internal_flags |= KVSTXN_MERGE_COMPONENT;
} while (--count && (nextkt = zlist_next (ktm->ready)));

zlist_push (ktm->ready, new);
zlist_freefn (ktm->ready, new, (zlist_free_fn *)kvstxn_destroy, false);
return 0;
}

Expand Down
45 changes: 39 additions & 6 deletions src/modules/kvs/kvstxn.h
Expand Up @@ -35,6 +35,18 @@ int kvstxn_get_errnum (kvstxn_t *kt);
int kvstxn_get_aux_errnum (kvstxn_t *kt);
int kvstxn_set_aux_errnum (kvstxn_t *kt, int errnum);

/* Returns true if a kvstxn was merged and the user can fallback to
* the original transactions that it was made up of. This function
* should be used when a merged kvstxn has failed. Instead of failing
* all transactions in this merged kvstxn, the kvstxn manager can be
* told to fallback to the original transactions via a flag in
* kvstxn_mgr_remove_transaction(). By falling back to the original
* transactions, each can be played one by one and only the specific
* failing transaction can be sent an error. See
* kvstxn_mgr_remove_kvstxn() below for more details.
*/
bool kvstxn_fallback_mergeable (kvstxn_t *kt);

json_t *kvstxn_get_ops (kvstxn_t *kt);
json_t *kvstxn_get_names (kvstxn_t *kt);
int kvstxn_get_flags (kvstxn_t *kt);
Expand Down Expand Up @@ -133,8 +145,22 @@ kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm);

/* remove a transaction from the kvstxn manager after it is done
* processing
*
* If the kvstxn was merged, and the caller would like to fallback to
* the original individual transactions (so they can be retried
* individually), set `fallback` to true. This will put the original
* transactions back on the ready queue, but will make it so they
* cannot be merged in the future (e.g. setting FLUX_KVS_NO_MERGE on
* them).
*
* Be careful with the 'fallback' option. If a transaction was
* successful, you can still fallback the merged kvstxn into its
* individual components. 'fallback' should only be set when you get
* an error (i.e. you don't use kvstxn_get_newroot_ref to get a new
* root).
*/
void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt);
void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt,
bool fallback);

int kvstxn_mgr_get_noop_stores (kvstxn_mgr_t *ktm);
void kvstxn_mgr_clear_noop_stores (kvstxn_mgr_t *ktm);
Expand All @@ -143,11 +169,18 @@ void kvstxn_mgr_clear_noop_stores (kvstxn_mgr_t *ktm);
int kvstxn_mgr_ready_transaction_count (kvstxn_mgr_t *ktm);

/* In internally stored ready transactions (moved to ready status via
* kvstxn_mgr_process_transaction_request()), merge them if they are
* capable of being merged. Returns -1 on error, 0 on success. On
* error, it is possible that the ready transaction has been modified
* with different transaction names and operations. The caller is
* responsible for sending errors to all appropriately.
* kvstxn_mgr_add_transaction()), merge them into a new ready transaction
* if they are capable of being merged.
*
* Callers should be cautioned to re-call
* kvstxn_mgr_get_ready_transaction() for the new head commit as the
* prior one has been removed.
*
* A merged kvstxn can be backed out if an error occurs. See
* kvstxn_fallback_mergeable() and kvstxn_mgr_remove_transaction()
* above.
*
* Returns -1 on error, 0 on success.
*/
int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm);

Expand Down

0 comments on commit f02661f

Please sign in to comment.