Skip to content

Commit

Permalink
broker: add content.checkpoint-{get,put} targets
Browse files Browse the repository at this point in the history
Problem: In the future, we would like to cache checkpoint
data in the broker in the event a content-backing module is
not loaded.  But this requires checkpoint data to be routed
through the broker and not sent directly to the content-backing
modules.

Solution: Add new content.checkpoint-get and content-checkpoint-put
handlers in the broker which will route to content-backing.checkpoint-get
and content-backing.checkpoint-put if they are available.  Update
callers to call content.checkpoint-{get,put}.

Update t0018-content-files.t tests that had previously assumed no
registration with the broker.

Update t2807-dump-cmd.t test that will get different error message
as a result.
  • Loading branch information
chu11 committed Aug 5, 2022
1 parent 8974961 commit 049da09
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 71 deletions.
139 changes: 139 additions & 0 deletions src/broker/content-cache.c
Expand Up @@ -605,6 +605,133 @@ static void content_store_request (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "content store: flux_respond_error");
}

static void checkpoint_get_continuation (flux_future_t *f, void *arg)
{
struct content_cache *cache = arg;
const flux_msg_t *msg = flux_future_aux_get (f, "msg");
const char *s;

assert (msg);

if (flux_rpc_get (f, &s) < 0)
goto error;

if (flux_respond (cache->h, msg, s) < 0) {
flux_log_error (cache->h, "%s: flux_respond", __FUNCTION__);
goto error;
}

flux_future_destroy (f);
return;

error:
if (flux_respond_error (cache->h, msg, errno, NULL) < 0)
flux_log_error (cache->h, "flux_respond_error");
flux_future_destroy (f);
}

/* flux_free_f type */
static void flux_msg_decref_wrapper (void *arg)
{
const flux_msg_t *msg = arg;
flux_msg_decref (msg);
}

void content_checkpoint_get_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct content_cache *cache = arg;
const char *topic = "content-backing.checkpoint-get";
const char *s = NULL;
flux_future_t *f = NULL;

/* Temporarily maintain ENOSYS behavior */
if (!cache->backing) {
errno = ENOSYS;
goto error;
}

if (flux_request_decode (msg, NULL, &s) < 0)
goto error;

if (!(f = flux_rpc (h, topic, s, 0, 0))
|| flux_future_aux_set (f,
"msg",
(void *)flux_msg_incref (msg),
flux_msg_decref_wrapper) < 0
|| flux_future_then (f, -1, checkpoint_get_continuation, cache) < 0) {
flux_log_error (h, "%s: checkpoint backing get", __FUNCTION__);
goto error;
}

return;

error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "flux_respond_error");
flux_future_destroy (f);
}

static void checkpoint_put_continuation (flux_future_t *f, void *arg)
{
struct content_cache *cache = arg;
const flux_msg_t *msg = flux_future_aux_get (f, "msg");
const char *s;

assert (msg);

if (flux_rpc_get (f, &s) < 0)
goto error;

if (flux_respond (cache->h, msg, s) < 0) {
flux_log_error (cache->h, "%s: flux_respond", __FUNCTION__);
goto error;
}

flux_future_destroy (f);
return;

error:
if (flux_respond_error (cache->h, msg, errno, NULL) < 0)
flux_log_error (cache->h, "flux_respond_error");
flux_future_destroy (f);
}

void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct content_cache *cache = arg;
const char *topic = "content-backing.checkpoint-put";
const char *s = NULL;
flux_future_t *f = NULL;

/* Temporarily maintain ENOSYS behavior */
if (!cache->backing) {
errno = ENOSYS;
goto error;
}

if (flux_request_decode (msg, NULL, &s) < 0)
goto error;

if (!(f = flux_rpc (h, topic, s, 0, 0))
|| flux_future_aux_set (f,
"msg",
(void *)flux_msg_incref (msg),
flux_msg_decref_wrapper) < 0
|| flux_future_then (f, -1, checkpoint_put_continuation, cache) < 0) {
flux_log_error (h, "%s: checkpoint backing put", __FUNCTION__);
goto error;
}

return;

error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "flux_respond_error");
flux_future_destroy (f);
}

/* Backing store is enabled/disabled by modules that provide the
* 'content.backing' service. At module load time, the backing module
* informs the content service of its availability, and entries are
Expand Down Expand Up @@ -874,6 +1001,18 @@ static const struct flux_msg_handler_spec htab[] = {
content_store_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.checkpoint-get",
content_checkpoint_get_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.checkpoint-put",
content_checkpoint_put_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.unregister-backing",
Expand Down
4 changes: 2 additions & 2 deletions src/common/libkvs/kvs_checkpoint.c
Expand Up @@ -37,7 +37,7 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h,
timestamp = flux_reactor_now (flux_get_reactor (h));

if (!(f = flux_rpc_pack (h,
"content-backing.checkpoint-put",
"content.checkpoint-put",
0,
0,
"{s:s s:{s:i s:s s:f}}",
Expand All @@ -62,7 +62,7 @@ flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key)
key = KVS_DEFAULT_CHECKPOINT;

return flux_rpc_pack (h,
"content-backing.checkpoint-get",
"content.checkpoint-get",
0,
0,
"{s:s}",
Expand Down
4 changes: 2 additions & 2 deletions t/t0012-content-sqlite.t
Expand Up @@ -123,10 +123,10 @@ test_expect_success 'fill the cache with more data for later purging' '

checkpoint_put() {
o="{key:\"$1\",value:{version:1,rootref:\"$2\",timestamp:2.2}}"
jq -j -c -n ${o} | $RPC content-backing.checkpoint-put
jq -j -c -n ${o} | $RPC content.checkpoint-put
}
checkpoint_get() {
jq -j -c -n "{key:\"$1\"}" | $RPC content-backing.checkpoint-get
jq -j -c -n "{key:\"$1\"}" | $RPC content.checkpoint-get
}

test_expect_success HAVE_JQ 'checkpoint-put foo w/ rootref bar' '
Expand Down
129 changes: 67 additions & 62 deletions t/t0018-content-files.t
Expand Up @@ -58,11 +58,11 @@ recheck_cache_blob() {
# Usage: checkpoint_put key rootref
checkpoint_put() {
o="{key:\"$1\",value:{version:1,rootref:\"$2\",timestamp:2.2}}"
jq -j -c -n ${o} | $RPC content-backing.checkpoint-put
jq -j -c -n ${o} | $RPC content.checkpoint-put
}
# Usage: checkpoint_get key >value
checkpoint_get() {
jq -j -c -n "{key:\"$1\"}" | $RPC content-backing.checkpoint-get
jq -j -c -n "{key:\"$1\"}" | $RPC content.checkpoint-get
}

##
Expand Down Expand Up @@ -100,32 +100,6 @@ test_expect_success LONGTEST 'store/load/verify various size large blobs' '
test $err -eq 0
'

test_expect_success HAVE_JQ 'checkpoint-put foo w/ rootref bar' '
checkpoint_put foo bar
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref bar' '
echo bar >rootref.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref.out &&
test_cmp rootref.exp rootref.out
'

# use grep instead of compare, incase of floating point rounding
test_expect_success HAVE_JQ 'checkpoint-get foo returned correct timestamp' '
checkpoint_get foo | jq -r .value | jq -r .timestamp >timestamp.out &&
grep 2.2 timestamp.out
'

test_expect_success HAVE_JQ 'checkpoint-put updates foo rootref to baz' '
checkpoint_put foo baz
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref baz' '
echo baz >rootref2.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref2.out &&
test_cmp rootref2.exp rootref2.out
'

test_expect_success 'reload content-files module' '
flux module reload content-files testing
'
Expand All @@ -146,44 +120,10 @@ test_expect_success LONGTEST 'reload/verify various size large blobs' '
test $err -eq 0
'

test_expect_success HAVE_JQ 'checkpoint-get foo still returns rootref baz' '
echo baz >rootref3.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref3.out &&
test_cmp rootref3.exp rootref3.out
'

test_expect_success HAVE_JQ 'checkpoint-put updates foo rootref with longer rootref' '
checkpoint_put foo abcdefghijklmnopqrstuvwxyz
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref with longer rootref' '
echo abcdefghijklmnopqrstuvwxyz >rootref4.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref4.out &&
test_cmp rootref4.exp rootref4.out
'

test_expect_success HAVE_JQ 'checkpoint-put updates foo rootref to shorter rootref' '
checkpoint_put foo foobar
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref with shorter rootref' '
echo foobar >rootref5.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref5.out &&
test_cmp rootref5.exp rootref5.out
'

test_expect_success 'load with invalid hash size fails with EPROTO' '
test_must_fail backing_load </dev/null 2>badhash.err &&
grep "Protocol error" badhash.err
'
test_expect_success 'checkpoint-get bad request fails with EPROTO' '
test_must_fail $RPC content-backing.checkpoint-get </dev/null 2>badget.err &&
grep "Protocol error" badget.err
'
test_expect_success 'checkpoint-put bad request fails with EPROTO' '
test_must_fail $RPC content-backing.checkpoint-put </dev/null 2>badput.err &&
grep "Protocol error" badput.err
'

##
# Tests of the module acting as backing store for content cache
Expand Down Expand Up @@ -225,6 +165,71 @@ test_expect_success 'flux module stats reports zero object count' '
--type int --parse object_count content-files) -eq 0
'

test_expect_success HAVE_JQ 'checkpoint-put foo w/ rootref bar' '
checkpoint_put foo bar
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref bar' '
echo bar >rootref.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref.out &&
test_cmp rootref.exp rootref.out
'

# use grep instead of compare, incase of floating point rounding
test_expect_success HAVE_JQ 'checkpoint-get foo returned correct timestamp' '
checkpoint_get foo | jq -r .value | jq -r .timestamp >timestamp.out &&
grep 2.2 timestamp.out
'

test_expect_success HAVE_JQ 'checkpoint-put updates foo rootref to baz' '
checkpoint_put foo baz
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref baz' '
echo baz >rootref2.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref2.out &&
test_cmp rootref2.exp rootref2.out
'

test_expect_success 'reload content-files module' '
flux module reload content-files
'

test_expect_success HAVE_JQ 'checkpoint-get foo still returns rootref baz' '
echo baz >rootref3.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref3.out &&
test_cmp rootref3.exp rootref3.out
'

test_expect_success HAVE_JQ 'checkpoint-put updates foo rootref with longer rootref' '
checkpoint_put foo abcdefghijklmnopqrstuvwxyz
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref with longer rootref' '
echo abcdefghijklmnopqrstuvwxyz >rootref4.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref4.out &&
test_cmp rootref4.exp rootref4.out
'

test_expect_success HAVE_JQ 'checkpoint-put updates foo rootref to shorter rootref' '
checkpoint_put foo foobar
'

test_expect_success HAVE_JQ 'checkpoint-get foo returned rootref with shorter rootref' '
echo foobar >rootref5.exp &&
checkpoint_get foo | jq -r .value | jq -r .rootref >rootref5.out &&
test_cmp rootref5.exp rootref5.out
'

test_expect_success 'checkpoint-get bad request fails with EPROTO' '
test_must_fail $RPC content.checkpoint-get </dev/null 2>badget.err &&
grep "Protocol error" badget.err
'
test_expect_success 'checkpoint-put bad request fails with EPROTO' '
test_must_fail $RPC content.checkpoint-put </dev/null 2>badput.err &&
grep "Protocol error" badput.err
'

test_expect_success 'remove content-files module' '
flux module remove content-files
'
Expand Down
4 changes: 2 additions & 2 deletions t/t0024-content-s3.t
Expand Up @@ -63,11 +63,11 @@ recheck_cache_blob() {
# Usage: checkpoint_put key rootref
checkpoint_put() {
o="{key:\"$1\",value:{version:1,rootref:\"$2\",timestamp:2.2}}"
jq -j -c -n ${o} | $RPC content-backing.checkpoint-put
jq -j -c -n ${o} | $RPC content.checkpoint-put
}
# Usage: checkpoint_get key >value
checkpoint_get() {
jq -j -c -n "{key:\"$1\"}" | $RPC content-backing.checkpoint-get
jq -j -c -n "{key:\"$1\"}" | $RPC content.checkpoint-get
}

##
Expand Down
2 changes: 1 addition & 1 deletion t/t1010-kvs-commit-sync.t
Expand Up @@ -17,7 +17,7 @@ test_under_flux ${SIZE} minimal
TESTNAMESPACE="testnamespace"

checkpoint_get() {
jq -j -c -n "{key:\"$1\"}" | $RPC content-backing.checkpoint-get
jq -j -c -n "{key:\"$1\"}" | $RPC content.checkpoint-get
}

test_expect_success 'load content-sqlite and kvs and add some data' '
Expand Down
2 changes: 1 addition & 1 deletion t/t1011-kvs-checkpoint-period.t
Expand Up @@ -17,7 +17,7 @@ test_under_flux ${SIZE} minimal

checkpoint_get() {
jq -j -c -n "{key:\"$1\"}" \
| $RPC content-backing.checkpoint-get \
| $RPC content.checkpoint-get \
| jq -r .value.rootref
}

Expand Down
2 changes: 1 addition & 1 deletion t/t2807-dump-cmd.t
Expand Up @@ -23,7 +23,7 @@ test_expect_success 'flux-restore with no args prints Usage message' '
'
test_expect_success 'flux-dump with no backing store fails' '
test_must_fail flux dump --checkpoint foo.tar 2>nostore.err &&
grep "No service matching content-backing.checkpoint-get" nostore.err
grep "Function not implemented" nostore.err
'
test_expect_success 'flux-dump with bad archive file fails' '
test_must_fail flux dump /badfile.tar 2>badfile.err &&
Expand Down

0 comments on commit 049da09

Please sign in to comment.