Skip to content

Commit

Permalink
Merge pull request #1237 from chu11/issue1232
Browse files Browse the repository at this point in the history
KVS: Support valref pointing to zero length blob objects
  • Loading branch information
garlick committed Oct 16, 2017
2 parents 2bddc2e + 598fd53 commit 9f03dda
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 93 deletions.
115 changes: 81 additions & 34 deletions src/modules/kvs/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct cache_entry {
waitqueue_t *waitlist_valid;
void *data; /* value object/data */
int len;
bool data_valid; /* flag indicating if data set, don't use
* data == NULL as test */
cache_data_type_t type; /* what does data point to */
int lastuse_epoch; /* time of last use for cache expiry */
uint8_t dirty:1;
Expand All @@ -62,25 +64,38 @@ struct cache {
zhash_t *zh;
};

struct cache_entry *cache_entry_create (void)
struct cache_entry *cache_entry_create (cache_data_type_t t)
{
struct cache_entry *hp = calloc (1, sizeof (*hp));
if (!hp) {
struct cache_entry *hp;

if (t != CACHE_DATA_TYPE_NONE
&& t != CACHE_DATA_TYPE_JSON
&& t != CACHE_DATA_TYPE_RAW) {
errno = EINVAL;
return NULL;
}

if (!(hp = calloc (1, sizeof (*hp)))) {
errno = ENOMEM;
return NULL;
}
hp->type = CACHE_DATA_TYPE_NONE;
hp->type = t;
return hp;
}

struct cache_entry *cache_entry_create_json (json_t *o)
{
struct cache_entry *hp = cache_entry_create ();
if (!hp)
struct cache_entry *hp;

if (!o) {
errno = EINVAL;
return NULL;
if (o)
hp->data = o;
hp->type = CACHE_DATA_TYPE_JSON;
}

if (!(hp = cache_entry_create (CACHE_DATA_TYPE_JSON)))
return NULL;
hp->data = o;
hp->data_valid = true;
return hp;
}

Expand All @@ -93,13 +108,15 @@ struct cache_entry *cache_entry_create_raw (void *data, int len)
return NULL;
}

if (!(hp = cache_entry_create ()))
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_RAW)))
return NULL;

if (data) {
hp->data = data;
hp->len = len;
}
hp->type = CACHE_DATA_TYPE_RAW;
/* true even if data == NULL */
hp->data_valid = true;
return hp;
}

Expand All @@ -125,17 +142,17 @@ bool cache_entry_is_type_raw (struct cache_entry *hp)

bool cache_entry_get_valid (struct cache_entry *hp)
{
return (hp && hp->data != NULL);
return (hp && hp->data_valid);
}

bool cache_entry_get_dirty (struct cache_entry *hp)
{
return (hp && hp->data && hp->dirty);
return (hp && hp->data_valid && hp->dirty);
}

int cache_entry_set_dirty (struct cache_entry *hp, bool val)
{
if (hp && hp->data) {
if (hp && hp->data_valid) {
if ((val && hp->dirty) || (!val && !hp->dirty))
; /* no-op */
else if (val && !hp->dirty)
Expand All @@ -157,7 +174,7 @@ int cache_entry_set_dirty (struct cache_entry *hp, bool val)

int cache_entry_clear_dirty (struct cache_entry *hp)
{
if (hp && hp->data) {
if (hp && hp->data_valid) {
if (hp->dirty
&& (!hp->waitlist_notdirty
|| !wait_queue_length (hp->waitlist_notdirty)))
Expand All @@ -169,7 +186,7 @@ int cache_entry_clear_dirty (struct cache_entry *hp)

int cache_entry_force_clear_dirty (struct cache_entry *hp)
{
if (hp && hp->data) {
if (hp && hp->data_valid) {
if (hp->dirty) {
if (hp->waitlist_notdirty) {
wait_queue_destroy (hp->waitlist_notdirty);
Expand All @@ -184,44 +201,50 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp)

json_t *cache_entry_get_json (struct cache_entry *hp)
{
if (!hp || !hp->data || hp->type != CACHE_DATA_TYPE_JSON)
if (!hp || !hp->data_valid || hp->type != CACHE_DATA_TYPE_JSON)
return NULL;
/* should be non-NULL for json */
assert (hp->data);
return hp->data;
}

int cache_entry_set_json (struct cache_entry *hp, json_t *o)
{
if (hp
&& o
&& (hp->type == CACHE_DATA_TYPE_NONE
|| hp->type == CACHE_DATA_TYPE_JSON)) {
if ((o && hp->data) || (!o && !hp->data)) {
if (hp->data_valid) {
assert (hp->data);
json_decref (o); /* no-op, 'o' is assumed identical to hp->data */
} else if (o && !hp->data) {
} else {
assert (!hp->data);
hp->data = o;
hp->data_valid = true;
if (hp->waitlist_valid) {
if (wait_runqueue (hp->waitlist_valid) < 0) {
/* set back to orig */
hp->data = NULL;
hp->data_valid = false;
return -1;
}
}
} else if (!o && hp->data) {
json_decref (hp->data);
hp->data = NULL;
}
hp->type = CACHE_DATA_TYPE_JSON;
return 0;
}
return -1;
}

void *cache_entry_get_raw (struct cache_entry *hp, int *len)
int cache_entry_get_raw (struct cache_entry *hp, void **data, int *len)
{
if (!hp || !hp->data || hp->type != CACHE_DATA_TYPE_RAW)
return NULL;
if (!hp || !hp->data_valid || hp->type != CACHE_DATA_TYPE_RAW)
return -1;
if (data)
(*data) = hp->data;
if (len)
(*len) = hp->len;
return hp->data;
return 0;
}

int cache_entry_set_raw (struct cache_entry *hp, void *data, int len)
Expand All @@ -234,35 +257,59 @@ int cache_entry_set_raw (struct cache_entry *hp, void *data, int len)
if (hp
&& (hp->type == CACHE_DATA_TYPE_NONE
|| hp->type == CACHE_DATA_TYPE_RAW)) {
if ((data && hp->data) || (!data && !hp->data)) {
free (data); /* no-op, 'data' is assumed identical to hp->data */
} else if (data && !hp->data) {
if (hp->data_valid) {
if ((data && hp->data) || (!data && !hp->data))
free (data); /* no-op, 'data' is assumed identical to hp->data */
else {
/* attempt to change already valid cache entry,
* cannot, must call cache_entry_clear_data() */
errno = EBADE;
return -1;
}
}
else {
hp->data = data;
hp->len = len;
hp->data_valid = true;

if (hp->waitlist_valid) {
if (wait_runqueue (hp->waitlist_valid) < 0) {
/* set back to orig */
hp->data = NULL;
hp->len = 0;
hp->data_valid = false;
return -1;
}
}
} else if (!data && hp->data) {
free (hp->data);
hp->data = NULL;
hp->len = 0;
}
hp->type = CACHE_DATA_TYPE_RAW;
return 0;
}
return -1;
}

int cache_entry_clear_data (struct cache_entry *hp)
{
if (hp) {
if (hp->data) {
if (hp->type == CACHE_DATA_TYPE_JSON)
json_decref (hp->data);
else if (hp->type == CACHE_DATA_TYPE_RAW)
free (hp->data);
}
hp->data = NULL;
hp->len = 0;
hp->data_valid = false;
return 0;
}
return -1;
}

void cache_entry_destroy (void *arg)
{
struct cache_entry *hp = arg;
if (hp) {
if (hp->data) {
if (hp->data_valid) {
if (hp->type == CACHE_DATA_TYPE_JSON)
json_decref (hp->data);
else if (hp->type == CACHE_DATA_TYPE_RAW)
Expand Down
42 changes: 29 additions & 13 deletions src/modules/kvs/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@ struct cache;
/* Create/destroy cache entry.
*
* cache_entry_create() creates an entry, setting the cache entry type
* to CACHE_DATA_TYPE_NONE.
* to specified type. CACHE_DATA_TYPE_NONE indicates user is not yet
* sure of the type of data to be stored, and it will be determined
* later when cache_entry_set_X() function is called.
* cache_entry_get_valid() will return false after
* cache_entry_create() is initially called, regardless of the type
* passed in.
*
* cache_entry_create_json() creates an entry, setting the cache entry
* type to CACHE_DATA_TYPE_JSON. The create transfers ownership of
* 'o' to the cache entry. On destroy, json_decref() will be called
* on 'o'. If 'o' is NULL, no data is set, but the type is still set
* to CACHE_DATA_TYPE_JSON and only json can be used for the entry.
* on 'o'. 'o' cannot be NULL.
*
* cache_entry_create_raw() creates an entry, setting the cache entry
* type to CACHE_DATA_TYPE_RAW. The create transfers ownership of
* 'data' to the cache entry. On destroy, free() will be called on
* 'data'. If 'data' is NULL, no data is set, but the type is still
* set to CACHE_DATA_TYPE_RAW and only raw can be used for the entry.
* 'data'. If 'data' is NULL, 'len' must be zero. If 'data' is
* non-NULL, 'len' must be > 0.
*
* cache_entry_get_valid() will return true on entries when
* cache_entry_create_json() and cache_entry_get_raw() return success.
*/
struct cache_entry *cache_entry_create (void);
struct cache_entry *cache_entry_create (cache_data_type_t t);
struct cache_entry *cache_entry_create_json (json_t *o);
struct cache_entry *cache_entry_create_raw (void *data, int len);
void cache_entry_destroy (void *arg);
Expand Down Expand Up @@ -89,26 +96,35 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp);
*
* json set accessor must have type of CACHE_DATA_TYPE_NONE or
* CACHE_DATA_TYPE_JSON to set json object. After setting, the type
* is converted to CACHE_DATA_TYPE_JSON. If non-NULL, set transfers
* ownership of 'o' to the cache entry.
* is converted to CACHE_DATA_TYPE_JSON. 'o' must be non-NULL. Set
* transfers ownership of 'o' to the cache entry.
*
* raw set accessor must have type of CACHE_DATA_TYPE_NONE or
* CACHE_DATA_TYPE_RAW to set raw data. After setting, the type is
* converted to CACHE_DATA_TYPE_RAW. If non-NULL, set transfers
* ownership of 'data' to the cache entry.
* converted to CACHE_DATA_TYPE_RAW. If 'data' is NULL, 'len' must be
* zero. If 'data' is non-NULL, 'len' must be > 0. If non-NULL, set
* transfers ownership of 'data' to the cache entry.
*
* cache_entry_clear_data () will clear any data in the entry.
*
* An invalid->valid transition runs the entry's wait queue, if any in
* both set accessors.
*
* cache_entry_set_json() & cache_entry_set_raw() returns -1 on error,
* 0 on success
* Generally speaking, a cache entry can only bet set once. If you
* wish to set it again, you must run cache_entry_clear_data() before
* doing so.
*
* cache_entry_set_json() & cache_entry_set_raw() &
* cache_entry_clear_data() returns -1 on error, 0 on success
*/
json_t *cache_entry_get_json (struct cache_entry *hp);
int cache_entry_set_json (struct cache_entry *hp, json_t *o);

void *cache_entry_get_raw (struct cache_entry *hp, int *len);
int cache_entry_get_raw (struct cache_entry *hp, void **data, int *len);
int cache_entry_set_raw (struct cache_entry *hp, void *data, int len);

int cache_entry_clear_data (struct cache_entry *hp);

/* Arrange for message handler represented by 'wait' to be restarted
* once cache entry becomes valid or not dirty at completion of a
* load or store RPC.
Expand Down
2 changes: 1 addition & 1 deletion src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o,
}
}
if (!(hp = cache_lookup (c->cm->cache, ref, current_epoch))) {
if (!(hp = cache_entry_create ())) {
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_NONE))) {
saved_errno = ENOMEM;
goto done;
}
Expand Down
28 changes: 16 additions & 12 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,15 @@ static void content_load_completion (flux_future_t *f, void *arg)
* this error scenario appropriately.
*/
if (cache_entry_is_type_raw (hp)) {
char *datacpy;
char *datacpy = NULL;

if (!(datacpy = malloc (size))) {
flux_log_error (ctx->h, "%s: malloc", __FUNCTION__);
goto done;
if (size) {
if (!(datacpy = malloc (size))) {
flux_log_error (ctx->h, "%s: malloc", __FUNCTION__);
goto done;
}
memcpy (datacpy, data, size);
}
memcpy (datacpy, data, size);

if (cache_entry_set_raw (hp, datacpy, size) < 0) {
flux_log_error (ctx->h, "%s: cache_entry_set_raw", __FUNCTION__);
Expand Down Expand Up @@ -276,15 +278,15 @@ static int load (kvs_ctx_t *ctx, const href_t ref, bool is_raw, wait_t *wait,
*/
if (!hp) {
if (is_raw) {
if (!(hp = cache_entry_create_raw (NULL, 0))) {
flux_log_error (ctx->h, "%s: cache_entry_create_raw",
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_RAW))) {
flux_log_error (ctx->h, "%s: cache_entry_create",
__FUNCTION__);
return -1;
}
}
else {
if (!(hp = cache_entry_create_json (NULL))) {
flux_log_error (ctx->h, "%s: cache_entry_create_json",
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_JSON))) {
flux_log_error (ctx->h, "%s: cache_entry_create",
__FUNCTION__);
return -1;
}
Expand Down Expand Up @@ -481,7 +483,7 @@ static int commit_cache_cb (commit_t *c, struct cache_entry *hp, void *data)

is_raw = cache_entry_is_type_raw (hp);
if (is_raw)
storedata = cache_entry_get_raw (hp, &storedatalen);
cache_entry_get_raw (hp, &storedata, &storedatalen);
else
storedata = cache_entry_get_json (hp);

Expand Down Expand Up @@ -1628,14 +1630,16 @@ static int store_initial_rootdir (kvs_ctx_t *ctx, json_t *o, href_t ref)
goto decref_done;
}
if (!(hp = cache_lookup (ctx->cache, ref, ctx->epoch))) {
if (!(hp = cache_entry_create ())) {
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_JSON))) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__);
flux_log_error (ctx->h, "%s: cache_entry_create_json_empty",
__FUNCTION__);
goto decref_done;
}
cache_insert (ctx->cache, ref, hp);
}
if (!cache_entry_get_valid (hp)) {
assert (o);
if (cache_entry_set_json (hp, o) < 0) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: cache_entry_set_json",
Expand Down
Loading

0 comments on commit 9f03dda

Please sign in to comment.