Skip to content

Commit

Permalink
Merge pull request #4299 from garlick/content_optimize
Browse files Browse the repository at this point in the history
content: track RFC 10 protocol changes
  • Loading branch information
mergify[bot] committed Apr 26, 2022
2 parents 0dec989 + 36cc523 commit d53b662
Show file tree
Hide file tree
Showing 18 changed files with 369 additions and 200 deletions.
113 changes: 74 additions & 39 deletions src/broker/content-cache.c
Expand Up @@ -48,6 +48,11 @@ static const uint32_t default_blob_size_limit = 1048576*1024;

static const uint32_t default_flush_batch_limit = 256;

/* Hash digests are used as zhashx keys. The digest size needs to be
* available to zhashx comparator so make this global.
*/
static int content_hash_size;

struct msgstack {
const flux_msg_t *msg;
struct msgstack *next;
Expand All @@ -56,7 +61,7 @@ struct msgstack {
struct cache_entry {
void *data;
int len;
char *blobref;
void *hash; // key storage is contiguous with struct
uint8_t valid:1; // entry contains valid data
uint8_t dirty:1; // entry needs to be stored upstream
// or to backing store (rank 0)
Expand Down Expand Up @@ -188,22 +193,31 @@ static void cache_entry_destructor (void **item)
*item = NULL;
}
}
/* zhashx_hash_fn footprint
*/
static size_t cache_entry_hasher (const void *key)
{
return *(size_t *)key;
}
/* zhashx_comparator_fn footprint
*/
static int cache_entry_comparator (const void *item1, const void *item2)
{
return memcmp (item1, item2, content_hash_size);
}

/* Create a cache entry.
* Entries are created with a blobref, but no data (e.g. "invalid").
* The blobref is copied to the space following the entry struct so the entry
* and the blobref can be co-located in memory, and allocated with one malloc.
* Entries are created with no data (e.g. "invalid").
* Returns entry on success, NULL with errno set on failure.
*/
static struct cache_entry *cache_entry_create (const char *blobref)
static struct cache_entry *cache_entry_create (const void *hash)
{
struct cache_entry *e;
int bloblen = strlen (blobref) + 1;

if (!(e = calloc (1, sizeof (*e) + bloblen)))
if (!(e = calloc (1, sizeof (*e) + content_hash_size)))
return NULL;
e->blobref = (char *)(e + 1);
memcpy (e->blobref, blobref, bloblen);
e->hash = (char *)(e + 1);
memcpy (e->hash, hash, content_hash_size);
list_node_init (&e->list);
return e;
}
Expand Down Expand Up @@ -264,40 +278,50 @@ static void cache_entry_dirty_clear (struct content_cache *cache,

request_list_respond_raw (&e->store_requests,
cache->h,
e->blobref,
strlen (e->blobref) + 1,
e->hash,
content_hash_size,
"store");
}
}


/* Create and insert a cache entry, using 'blobref' as the hash key.
/* Create and insert a cache entry.
* Returns 0 on success, -1 on failure with errno set.
*/
static struct cache_entry *cache_entry_insert (struct content_cache *cache,
const char *blobref)
const void *hash,
int hash_size)
{
struct cache_entry *e;
if (!(e = cache_entry_create (blobref)))

if (hash_size != content_hash_size) {
errno = EINVAL;
return NULL;
}
if (!(e = cache_entry_create (hash)))
return NULL;
if (zhashx_insert (cache->entries, e->blobref, e) < 0) {
if (zhashx_insert (cache->entries, e->hash, e) < 0) {
errno = EEXIST;
cache_entry_destroy (e);
return NULL;
}
return e;
}

/* Look up a cache entry, by blobref.
/* Look up a cache entry.
* Move to front of LRU because it was looked up.
* Returns entry on success, NULL on failure.
* N.B. errno is not set
*/
static struct cache_entry *cache_entry_lookup (struct content_cache *cache,
const char *blobref)
const void *hash,
int hash_size)
{
struct cache_entry *e;
if (!(e = zhashx_lookup (cache->entries, blobref)))

if (hash_size != content_hash_size)
return NULL;
if (!(e = zhashx_lookup (cache->entries, hash)))
return NULL;

if (e->valid && !e->dirty) {
Expand All @@ -323,7 +347,7 @@ static void cache_entry_remove (struct content_cache *cache,
}
if (e->dirty)
cache->acct_dirty--;
zhashx_delete (cache->entries, e->blobref);
zhashx_delete (cache->entries, e->hash);
}

/* Load operation
Expand Down Expand Up @@ -377,7 +401,7 @@ static int cache_load (struct content_cache *cache, struct cache_entry *e)
return 0;
if (cache->rank == 0)
flags = CONTENT_FLAG_CACHE_BYPASS;
if (!(f = content_load (cache->h, e->blobref, flags))
if (!(f = content_load_byhash (cache->h, e->hash, content_hash_size, flags))
|| flux_future_aux_set (f, "entry", e, NULL) < 0
|| flux_future_then (f, -1., cache_load_continuation, cache) < 0) {
flux_log_error (cache->h, "content load");
Expand All @@ -392,25 +416,24 @@ void content_load_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct content_cache *cache = arg;
const char *blobref;
int blobref_size;
const void *hash;
int hash_size;
void *data = NULL;
int len = 0;
struct cache_entry *e;

if (flux_request_decode_raw (msg, NULL, (const void **)&blobref,
&blobref_size) < 0)
if (flux_request_decode_raw (msg, NULL, &hash, &hash_size) < 0)
goto error;
if (!blobref || blobref[blobref_size - 1] != '\0') {
if (hash_size != content_hash_size) {
errno = EPROTO;
goto error;
}
if (!(e = cache_entry_lookup (cache, blobref))) {
if (!(e = cache_entry_lookup (cache, hash, hash_size))) {
if (cache->rank == 0 && !cache->backing) {
errno = ENOENT;
goto error;
}
if (!(e = cache_entry_insert (cache, blobref))) {
if (!(e = cache_entry_insert (cache, hash, hash_size))) {
flux_log_error (h, "content load");
goto error;
}
Expand Down Expand Up @@ -469,21 +492,22 @@ static void cache_store_continuation (flux_future_t *f, void *arg)
{
struct content_cache *cache = arg;
struct cache_entry *e = flux_future_aux_get (f, "entry");
const char *blobref;
const void *hash;
int hash_size;

e->store_pending = 0;
assert (cache->flush_batch_count > 0);
cache->flush_batch_count--;
if (content_store_get (f, &blobref) < 0) {
if (content_store_get_hash (f, &hash, &hash_size) < 0) {
if (cache->rank == 0 && errno == ENOSYS)
flux_log (cache->h, LOG_DEBUG, "content store: %s",
"backing store service unavailable");
else
flux_log_error (cache->h, "content store");
goto error;
}
if (strcmp (blobref, e->blobref)) {
flux_log (cache->h, LOG_ERR, "content store: wrong blobref");
if (hash_size != content_hash_size
|| memcmp (hash, e->hash, content_hash_size) != 0) {
errno = EIO;
goto error;
}
Expand Down Expand Up @@ -536,20 +560,23 @@ static void content_store_request (flux_t *h, flux_msg_handler_t *mh,
const void *data;
int len;
struct cache_entry *e = NULL;
char blobref[BLOBREF_MAX_STRING_SIZE];
uint8_t hash[BLOBREF_MAX_DIGEST_SIZE];
int hash_size;

if (flux_request_decode_raw (msg, NULL, &data, &len) < 0)
goto error;
if (len > cache->blob_size_limit) {
errno = EFBIG;
goto error;
}
if (blobref_hash (cache->hash_name, (uint8_t *)data, len, blobref,
sizeof (blobref)) < 0)
if ((hash_size = blobref_hash_raw (cache->hash_name,
data,
len,
hash,
sizeof (hash))) < 0)
goto error;

if (!(e = cache_entry_lookup (cache, blobref))) {
if (!(e = cache_entry_insert (cache, blobref)))
if (!(e = cache_entry_lookup (cache, hash, hash_size))) {
if (!(e = cache_entry_insert (cache, hash, hash_size)))
goto error;
}
if (cache_entry_fill (cache, e, data, len, true) < 0)
Expand All @@ -565,7 +592,7 @@ static void content_store_request (flux_t *h, flux_msg_handler_t *mh,
}
}
}
if (flux_respond_raw (h, msg, blobref, strlen (blobref) + 1) < 0)
if (flux_respond_raw (h, msg, hash, hash_size) < 0)
flux_log_error (h, "content store: flux_respond_raw");
return;
error:
Expand Down Expand Up @@ -890,13 +917,15 @@ static int register_attrs (struct content_cache *cache, attr_t *attr)
return -1;
}
else {
if (blobref_validate_hashtype (s) < 0) {
int hash_size;
if ((hash_size = blobref_validate_hashtype (s)) < 0) {
log_msg ("%s: unknown hash type", s);
return -1;
}
if (attr_set_flags (attr, "content.hash", FLUX_ATTRFLAG_IMMUTABLE) < 0)
return -1;
cache->hash_name = s;
content_hash_size = hash_size;
}

/* Purge tunables
Expand Down Expand Up @@ -944,7 +973,10 @@ struct content_cache *content_cache_create (flux_t *h, attr_t *attrs)
return NULL;
if (!(cache->entries = zhashx_new ()))
goto nomem;

zhashx_set_destructor (cache->entries, cache_entry_destructor);
zhashx_set_key_hasher (cache->entries, cache_entry_hasher);
zhashx_set_key_comparator (cache->entries, cache_entry_comparator);
zhashx_set_key_destructor (cache->entries, NULL); // key is part of entry
zhashx_set_key_duplicator (cache->entries, NULL); // key is part of entry

Expand All @@ -954,13 +986,16 @@ struct content_cache *content_cache_create (flux_t *h, attr_t *attrs)
cache->purge_target_size = default_cache_purge_target_size;
cache->purge_old_entry = default_cache_purge_old_entry;
cache->hash_name = default_hash;
if ((content_hash_size = blobref_validate_hashtype (default_hash)) < 0)
goto error;
cache->h = h;
cache->reactor = flux_get_reactor (h);
list_head_init (&cache->lru);
list_head_init (&cache->flush);

if (register_attrs (cache, attrs) < 0)
goto error;
assert (content_hash_size >= sizeof (size_t)); // hasher assumes this

if (flux_msg_handler_addvec (h, htab, cache, &cache->handlers) < 0)
goto error;
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/builtin/content.c
Expand Up @@ -39,8 +39,8 @@ static int internal_content_load (optparse_t *p, int ac, char *av[])
log_err_exit ("flux_open");
if (optparse_hasopt (p, "bypass-cache"))
flags |= CONTENT_FLAG_CACHE_BYPASS;
if (!(f = content_load (h, ref, flags)))
log_err_exit ("content_load");
if (!(f = content_load_byblobref (h, ref, flags)))
log_err_exit ("content_load_byblobref");
if (content_load_get (f, (const void **)&data, &size) < 0)
log_err_exit ("content_load_get");
if (write_all (STDOUT_FILENO, data, size) < 0)
Expand Down Expand Up @@ -71,8 +71,8 @@ static int internal_content_store (optparse_t *p, int ac, char *av[])
log_err_exit ("read");
if (!(f = content_store (h, data, size, flags)))
log_err_exit ("content_store");
if (content_store_get (f, &blobref) < 0)
log_err_exit ("content_store_get");
if (content_store_get_blobref (f, &blobref) < 0)
log_err_exit ("content_store_get_blobref");
printf ("%s\n", blobref);
flux_future_destroy (f);
flux_close (h);
Expand Down
14 changes: 7 additions & 7 deletions src/cmd/builtin/dump.c
Expand Up @@ -134,9 +134,9 @@ static void dump_valref (struct archive *ar,
log_err_exit ("could not create message list");
for (int i = 0; i < count; i++) {
flux_future_t *f;
if (!(f = content_load (h,
treeobj_get_blobref (treeobj, i),
content_flags))
if (!(f = content_load_byblobref (h,
treeobj_get_blobref (treeobj, i),
content_flags))
|| flux_future_get (f, (const void **)&msg) < 0
|| flux_msg_get_payload (msg, &data, &len) < 0) {
log_msg_exit ("%s: missing blobref %d: %s",
Expand Down Expand Up @@ -260,9 +260,9 @@ static void dump_dirref (struct archive *ar,

if (treeobj_get_count (treeobj) != 1)
log_msg_exit ("%s: blobref count is not 1", path);
if (!(f = content_load (h,
treeobj_get_blobref (treeobj, 0),
content_flags))
if (!(f = content_load_byblobref (h,
treeobj_get_blobref (treeobj, 0),
content_flags))
|| content_load_get (f, &buf, &buflen) < 0) {
log_msg_exit ("%s: missing blobref: %s",
path,
Expand Down Expand Up @@ -319,7 +319,7 @@ static void dump_blobref (struct archive *ar,
const char *key;
json_t *entry;

if (!(f = content_load (h, blobref, content_flags))
if (!(f = content_load_byblobref (h, blobref, content_flags))
|| content_load_get (f, &buf, &buflen) < 0)
log_msg_exit ("cannot load root tree object: %s",
future_strerror (f, errno));
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/builtin/restore.c
Expand Up @@ -116,7 +116,7 @@ static json_t *restore_dir (flux_t *h, json_t *dir)
if (!(s = treeobj_encode (ndir)))
log_msg_exit ("out of memory");
if (!(f = content_store (h, s, strlen (s), content_flags))
|| content_store_get (f, &blobref) < 0)
|| content_store_get_blobref (f, &blobref) < 0)
log_msg_exit ("error storing dirref blob: %s",
future_strerror (f, errno));
progress (1, 0);
Expand Down Expand Up @@ -214,7 +214,7 @@ static void restore_value (flux_t *h,
const char *blobref;

if (!(f = content_store (h, buf, size, content_flags))
|| content_store_get (f, &blobref) < 0)
|| content_store_get_blobref (f, &blobref) < 0)
log_msg_exit ("error storing blob for %s: %s",
path,
future_strerror (f, errno));
Expand Down

0 comments on commit d53b662

Please sign in to comment.