Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

content-{sqlite,files,s3}: route checkpoint-get and checkpoint-put through broker #4463

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
133 changes: 131 additions & 2 deletions src/broker/content-cache.c
Expand Up @@ -340,13 +340,12 @@ static void cache_entry_remove (struct content_cache *cache,
{
assert (e->load_requests == NULL);
assert (e->store_requests == NULL);
assert (!e->dirty);
list_del (&e->list);
if (e->valid) {
cache->acct_size -= e->len;
cache->acct_valid--;
}
if (e->dirty)
cache->acct_dirty--;
Comment on lines -348 to -349
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, dead code there.

Note "decrement" is misspelled in the commit description.

zhashx_delete (cache->entries, e->hash);
}

Expand Down Expand Up @@ -606,6 +605,124 @@ static void content_store_request (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "content store: flux_respond_error");
}

static void checkpoint_get_continuation (flux_future_t *f, void *arg)
{
struct content_cache *cache = arg;
const flux_msg_t *msg = flux_future_aux_get (f, "msg");
const char *s;

assert (msg);

if (flux_rpc_get (f, &s) < 0)
goto error;

if (flux_respond (cache->h, msg, s) < 0)
flux_log_error (cache->h, "%s: flux_respond", __FUNCTION__);
flux_future_destroy (f);
return;

error:
if (flux_respond_error (cache->h, msg, errno, NULL) < 0)
flux_log_error (cache->h, "flux_respond_error");
flux_future_destroy (f);
}

void content_checkpoint_get_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct content_cache *cache = arg;
const char *topic = "content-backing.checkpoint-get";
const char *s = NULL;
const flux_msg_t *msgcpy = flux_msg_incref (msg);
flux_future_t *f = NULL;

/* Temporarily maintain ENOSYS behavior */
if (!cache->backing) {
errno = ENOSYS;
goto error;
}

if (flux_request_decode (msg, NULL, &s) < 0)
goto error;

if (!(f = flux_rpc (h, topic, s, 0, 0))
|| flux_future_aux_set (f,
"msg",
(void *)msgcpy,
(flux_free_f)flux_msg_decref) < 0
|| flux_future_then (f, -1, checkpoint_get_continuation, cache) < 0) {
flux_log_error (h, "error starting checkpoint-get RPC");
goto error;
}

return;

error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "error responding to checkpoint-get request");
flux_future_destroy (f);
flux_msg_decref (msgcpy);
}

static void checkpoint_put_continuation (flux_future_t *f, void *arg)
{
struct content_cache *cache = arg;
const flux_msg_t *msg = flux_future_aux_get (f, "msg");
const char *s;

assert (msg);

if (flux_rpc_get (f, &s) < 0)
goto error;

if (flux_respond (cache->h, msg, s) < 0)
flux_log_error (cache->h, "%s: flux_respond", __FUNCTION__);
flux_future_destroy (f);
return;

error:
if (flux_respond_error (cache->h, msg, errno, NULL) < 0)
flux_log_error (cache->h, "flux_respond_error");
flux_future_destroy (f);
}

void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct content_cache *cache = arg;
const char *topic = "content-backing.checkpoint-put";
const char *s = NULL;
const flux_msg_t *msgcpy = flux_msg_incref (msg);
flux_future_t *f = NULL;

/* Temporarily maintain ENOSYS behavior */
if (!cache->backing) {
errno = ENOSYS;
goto error;
}

if (flux_request_decode (msg, NULL, &s) < 0)
goto error;

if (!(f = flux_rpc (h, topic, s, 0, 0))
|| flux_future_aux_set (f,
"msg",
(void *)msgcpy,
(flux_free_f)flux_msg_decref) < 0
|| flux_future_then (f, -1, checkpoint_put_continuation, cache) < 0) {
flux_log_error (h, "error starting checkpoint-put RPC");
goto error;
}

return;

error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "error responding to checkpoint-put request");
flux_future_destroy (f);
flux_msg_decref (msgcpy);
}

/* Backing store is enabled/disabled by modules that provide the
* 'content.backing' service. At module load time, the backing module
* informs the content service of its availability, and entries are
Expand Down Expand Up @@ -875,6 +992,18 @@ static const struct flux_msg_handler_spec htab[] = {
content_store_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.checkpoint-get",
content_checkpoint_get_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.checkpoint-put",
content_checkpoint_put_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.unregister-backing",
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/builtin/restore.c
Expand Up @@ -326,7 +326,7 @@ static void flush_content (flux_t *h, uint32_t rank)

if (!(f = flux_rpc (h, "content.flush", NULL, rank, 0))
|| flux_rpc_get (f, NULL) < 0)
log_msg ("error flushing content cache: %s", future_strerror (f, errno));
log_msg ("error flushing content cache: %s", future_strerror (f, errno));
flux_future_destroy (f);
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/libkvs/kvs_checkpoint.c
Expand Up @@ -37,7 +37,7 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h,
timestamp = flux_reactor_now (flux_get_reactor (h));

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
"content.checkpoint-put",
0,
0,
"{s:s s:{s:i s:s s:f}}",
Expand All @@ -62,7 +62,7 @@ flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key)
key = KVS_DEFAULT_CHECKPOINT;

return flux_rpc_pack (h,
"kvs-checkpoint.get",
"content.checkpoint-get",
0,
0,
"{s:s}",
Expand Down
33 changes: 16 additions & 17 deletions src/modules/content-files/content-files.c
Expand Up @@ -23,21 +23,22 @@
* content-backing.store:
* Given a blob, store it and return its hash
*
* kvs-checkpoint.get:
* content-backing.checkpoint-get:
* Given a string key, lookup string value and return it or a "not found" error.
*
* kvs-checkpoint.put:
* content-backing.checkpoint-put:
* Given a string key and string value, store it and return.
* If the key exists, overwrite.
*
* The content operations are per RFC 10 and are the main storage behind
* the Flux KVS.
*
* The kvs-checkpoint operations allow the current KVS root reference to
* be saved/restored along with the content so it can persist across a Flux
* instance restart. Multiple KVS namespaces (each with an independent root)
* are technically supported, although currently only the main KVS namespace
* is saved/restored by the KVS module.
* The content-backing.checkpoint operations allow the current KVS
* root reference to be saved/restored along with the content so it
* can persist across a Flux instance restart. Multiple KVS
* namespaces (each with an independent root) are technically
* supported, although currently only the main KVS namespace is
* saved/restored by the KVS module.
*
* The main client of this module is the rank 0 content-cache. The content
* cache is hierarchical: each broker resolves missing content-cache entries
Expand Down Expand Up @@ -192,7 +193,7 @@ void store_cb (flux_t *h,
flux_log_error (h, "error responding to store request");
}

/* Handle a kvs-checkpoint.get request from the rank 0 kvs module.
/* Handle a content-backing.checkpoint-get request from the rank 0 kvs module.
* The KVS stores its last root reference here for restart purposes.
*
* N.B. filedb_get() calls read_all() which ensures that the returned buffer
Expand Down Expand Up @@ -227,18 +228,18 @@ void checkpoint_get_cb (flux_t *h,
"{s:O}",
"value",
o) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.get request");
flux_log_error (h, "error responding to checkpoint-get request");
free (data);
json_decref (o);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.get request");
flux_log_error (h, "error responding to checkpoint-get request");
free (data);
json_decref (o);
}

/* Handle a kvs-checkpoint.put request from the rank 0 kvs module.
/* Handle a content-backing.checkpoint-put request from the rank 0 kvs module.
* The KVS stores its last root reference here for restart purposes.
*/
void checkpoint_put_cb (flux_t *h,
Expand Down Expand Up @@ -268,12 +269,12 @@ void checkpoint_put_cb (flux_t *h,
if (filedb_put (ctx->dbpath, key, value, strlen (value), &errstr) < 0)
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request");
flux_log_error (h, "error responding to checkpoint-put request");
free (value);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request");
flux_log_error (h, "error responding to checkpoint-put request");
free (value);
}

Expand All @@ -296,8 +297,8 @@ static void content_files_destroy (struct content_files *ctx)
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "content-backing.load", load_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-backing.store", store_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.get", checkpoint_get_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.put", checkpoint_put_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-backing.checkpoint-get", checkpoint_get_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-backing.checkpoint-put", checkpoint_put_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-files.stats.get", stats_get_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};
Expand Down Expand Up @@ -396,8 +397,6 @@ int mod_main (flux_t *h, int argc, char **argv)
}
if (content_register_service (h, "content-backing") < 0)
goto done;
if (content_register_service (h, "kvs-checkpoint") < 0)
goto done;
if (!testing) {
if (content_register_backing_store (h, "content-files") < 0)
goto done;
Expand Down
38 changes: 24 additions & 14 deletions src/modules/content-s3/content-s3.c
Expand Up @@ -228,7 +228,8 @@ static void config_reload_cb (flux_t *h,
goto error;
}
free (cfg);
flux_log (h, LOG_WARNING, "config-reload: changes will not take effect until next flux restart");
flux_log (h, LOG_WARNING, "config-reload: changes will not take effect "
"until next flux restart");

if (flux_set_conf (h, flux_conf_incref (conf)) < 0) {
errstr = "error updating cached configuration";
Expand Down Expand Up @@ -290,7 +291,10 @@ static void load_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, v
* The raw response payload is a hash digest.
* These payloads are specified in RFC 10.
*/
void store_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg)
void store_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct content_s3 *ctx = arg;
const void *data;
Expand Down Expand Up @@ -325,14 +329,17 @@ void store_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *a
flux_log_error (h, "error responding to store request");
}

/* Handle a kvs-checkpoint.get request from the rank 0 kvs module.
/* Handle a content-backing.checkpoint-get request from the rank 0 kvs module.
* The KVS stores its last root reference here for restart purposes.
*
* N.B. filedb_get() calls read_all() which ensures that the returned buffer
* is padded with an extra NULL not included in the returned length,
* so it is safe to use the result as a string argument in flux_respond_pack().
*/
void checkpoint_get_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg)
void checkpoint_get_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
const char *errstr = NULL;
struct content_s3 *ctx = arg;
Expand Down Expand Up @@ -361,23 +368,26 @@ void checkpoint_get_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg
"value",
o) < 0) {
errno = EIO;
flux_log_error (h, "error responding to kvs-checkpoint.get request (pack)");
flux_log_error (h, "error responding to checkpoint-get request (pack)");
}
free (data);
json_decref (o);
return;

error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.get request");
flux_log_error (h, "error responding to checkpoint-get request");
free (data);
json_decref (o);
}

/* Handle a kvs-checkpoint.put request from the rank 0 kvs module.
/* Handle a content-backing.checkpoint-put request from the rank 0 kvs module.
* The KVS stores its last root reference here for restart purposes.
*/
void checkpoint_put_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg)
void checkpoint_put_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct content_s3 *ctx = arg;
const char *key;
Expand All @@ -401,13 +411,13 @@ void checkpoint_put_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg
if (s3_put (ctx->cfg, key, value, strlen (value), &errstr) < 0)
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request (pack)");
flux_log_error (h, "error responding to checkpoint-put request (pack)");
free (value);
return;

error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request");
flux_log_error (h, "error responding to checkpoint-put request");
free (value);
}

Expand All @@ -417,8 +427,10 @@ void checkpoint_put_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "content-backing.load", load_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-backing.store", store_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.get", checkpoint_get_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.put", checkpoint_put_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-backing.checkpoint-get",
checkpoint_get_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-backing.checkpoint-put",
checkpoint_put_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "content-s3.config-reload", config_reload_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};
Expand Down Expand Up @@ -499,8 +511,6 @@ int mod_main (flux_t *h, int argc, char **argv)
}
if (content_register_service (h, "content-backing") < 0)
goto done;
if (content_register_service (h, "kvs-checkpoint") < 0)
goto done;
if (content_register_backing_store (h, "content-s3") < 0)
goto done;
if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
Expand Down