Skip to content

Commit

Permalink
Merge pull request #1124 from chu11/kvs_oom_xzmalloc_cleanup-try4
Browse files Browse the repository at this point in the history
KVS oom() and xzmalloc() cleanup
  • Loading branch information
garlick committed Jul 25, 2017
2 parents a132370 + e464830 commit 5f48548
Show file tree
Hide file tree
Showing 18 changed files with 661 additions and 310 deletions.
39 changes: 27 additions & 12 deletions src/common/libkvs/jansson_dirent.c
Expand Up @@ -31,26 +31,32 @@
#include <jansson.h>

#include "src/common/libutil/blobref.h"
#include "src/common/libutil/oom.h"

#include "jansson_dirent.h"

json_t *j_dirent_create (const char *type, void *arg)
{
json_t *dirent;
json_t *dirent = NULL;
bool valid_type = false;

if (!(dirent = json_object ()))
oom ();
if (!(dirent = json_object ())) {
errno = ENOMEM;
goto error;
}

if (!strcmp (type, "FILEREF") || !strcmp (type, "DIRREF")) {
char *ref = arg;
json_t *o;

if (!(o = json_string (ref)))
oom ();
if (json_object_set_new (dirent, type, o) < 0)
oom ();
if (!(o = json_string (ref))) {
errno = ENOMEM;
goto error;
}
if (json_object_set_new (dirent, type, o) < 0) {
json_decref (o);
errno = ENOMEM;
goto error;
}

valid_type = true;
} else if (!strcmp (type, "FILEVAL") || !strcmp (type, "DIRVAL")
Expand All @@ -60,16 +66,25 @@ json_t *j_dirent_create (const char *type, void *arg)
if (val)
json_incref (val);
else {
if (!(val = json_object ()))
oom ();
if (!(val = json_object ())) {
errno = ENOMEM;
goto error;
}
}
if (json_object_set_new (dirent, type, val) < 0) {
json_decref (val);
errno = ENOMEM;
goto error;
}
if (json_object_set_new (dirent, type, val) < 0)
oom ();
valid_type = true;
}
assert (valid_type == true);

return dirent;

error:
json_decref (dirent);
return NULL;
}

int j_dirent_validate (json_t *dirent)
Expand Down
79 changes: 53 additions & 26 deletions src/modules/kvs/cache.c
Expand Up @@ -41,14 +41,11 @@
#include <jansson.h>

#include "src/common/libutil/blobref.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/tstat.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/iterators.h"
#include "src/common/libutil/oom.h"

#include "waitqueue.h"
#include "kvs_util.h"
#include "cache.h"

struct cache_entry {
Expand All @@ -65,7 +62,11 @@ struct cache {

struct cache_entry *cache_entry_create (json_t *o)
{
struct cache_entry *hp = xzmalloc (sizeof (*hp));
struct cache_entry *hp = calloc (1, sizeof (*hp));
if (!hp) {
errno = ENOMEM;
return NULL;
}
if (o)
hp->o = o;
return hp;
Expand Down Expand Up @@ -145,22 +146,30 @@ void cache_entry_destroy (void *arg)
}
}

void cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait)
int cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait)
{
if (wait) {
if (!hp->waitlist_notdirty)
hp->waitlist_notdirty = wait_queue_create ();
wait_addqueue (hp->waitlist_notdirty, wait);
if (!hp->waitlist_notdirty) {
if (!(hp->waitlist_notdirty = wait_queue_create ()))
return -1;
}
if (wait_addqueue (hp->waitlist_notdirty, wait) < 0)
return -1;
}
return 0;
}

void cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait)
int cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait)
{
if (wait) {
if (!hp->waitlist_valid)
hp->waitlist_valid = wait_queue_create ();
wait_addqueue (hp->waitlist_valid, wait);
if (!hp->waitlist_valid) {
if (!(hp->waitlist_valid = wait_queue_create ()))
return -1;
}
if (wait_addqueue (hp->waitlist_valid, wait) < 0)
return -1;
}
return 0;
}

struct cache_entry *cache_lookup (struct cache *cache, const char *ref,
Expand Down Expand Up @@ -224,8 +233,10 @@ int cache_expire_entries (struct cache *cache, int current_epoch, int thresh)
struct cache_entry *hp;
int count = 0;

if (!(keys = zhash_keys (cache->zh)))
oom ();
if (!(keys = zhash_keys (cache->zh))) {
errno = ENOMEM;
return -1;
}
while ((ref = zlist_pop (keys))) {
if ((hp = zhash_lookup (cache->zh, ref))
&& !cache_entry_get_dirty (hp)
Expand All @@ -240,26 +251,32 @@ int cache_expire_entries (struct cache *cache, int current_epoch, int thresh)
return count;
}

void cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep,
int *incompletep, int *dirtyp)
int cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep,
int *incompletep, int *dirtyp)
{
zlist_t *keys;
zlist_t *keys = NULL;
struct cache_entry *hp;
char *ref;
int size = 0;
int incomplete = 0;
int dirty = 0;
int rc = -1;

if (!(keys = zhash_keys (cache->zh)))
oom ();
if (!(keys = zhash_keys (cache->zh))) {
errno = ENOMEM;
goto cleanup;
}
while ((ref = zlist_pop (keys))) {
hp = zhash_lookup (cache->zh, ref);
if (cache_entry_get_valid (hp)) {
/* must pass JSON_ENCODE_ANY, object could be anything */
char *s = json_dumps (hp->o, JSON_ENCODE_ANY);
if (!s)
oom ();
int obj_size = strlen (s);
int obj_size;
if (!s) {
errno = ENOMEM;
goto cleanup;
}
obj_size = strlen (s);
free (s);
size += obj_size;
tstat_push (ts, obj_size);
Expand All @@ -269,13 +286,16 @@ void cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep,
dirty++;
free (ref);
}
zlist_destroy (&keys);
if (sizep)
*sizep = size;
if (incompletep)
*incompletep = incomplete;
if (dirtyp)
*dirtyp = dirty;
rc = 0;
cleanup:
zlist_destroy (&keys);
return rc;
}

int cache_wait_destroy_msg (struct cache *cache, wait_test_msg_f cb, void *arg)
Expand Down Expand Up @@ -304,9 +324,16 @@ int cache_wait_destroy_msg (struct cache *cache, wait_test_msg_f cb, void *arg)

struct cache *cache_create (void)
{
struct cache *cache = xzmalloc (sizeof (*cache));
if (!(cache->zh = zhash_new ()))
oom ();
struct cache *cache = calloc (1, sizeof (*cache));
if (!cache) {
errno = ENOMEM;
return NULL;
}
if (!(cache->zh = zhash_new ())) {
free (cache);
errno = ENOMEM;
return NULL;
}
return cache;
}

Expand Down
12 changes: 7 additions & 5 deletions src/modules/kvs/cache.h
Expand Up @@ -45,14 +45,14 @@ int cache_entry_clear_dirty (struct cache_entry *hp);
*/
json_t *cache_entry_get_json (struct cache_entry *hp);
void cache_entry_set_json (struct cache_entry *hp, json_t *o);
int cache_entry_clear_json (struct cache_entry *hp);

/* Arrange for message handler represented by 'wait' to be restarted
* once cache entry becomes valid or not dirty at completion of a
* load or store RPC.
* Returns -1 on error, 0 on success
*/
void cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait);
void cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait);
int cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait);
int cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait);

/* Create/destroy the cache container and its contents.
*/
Expand Down Expand Up @@ -93,13 +93,15 @@ int cache_count_entries (struct cache *cache);

/* Expire cache entries that are not dirty, not incomplete, and last
* used more than 'thresh' epoch's ago.
* Returns -1 on error, expired count on success.
*/
int cache_expire_entries (struct cache *cache, int current_epoch, int thresh);

/* Obtain statistics on the cache.
* Returns -1 on error, 0 on success
*/
void cache_get_stats (struct cache *cache, tstat_t *ts, int *size,
int *incomplete, int *dirty);
int cache_get_stats (struct cache *cache, tstat_t *ts, int *size,
int *incomplete, int *dirty);

/* Destroy wait_t's on the waitqueue_t of any cache entry
* if they meet match criteria.
Expand Down

0 comments on commit 5f48548

Please sign in to comment.