Navigation Menu

Skip to content

Commit

Permalink
cache: add persistent implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Apr 4, 2017
1 parent 0296d2f commit b6f3dca
Showing 1 changed file with 247 additions and 24 deletions.
271 changes: 247 additions & 24 deletions lib/cache.c
Expand Up @@ -24,20 +24,34 @@
#include "grn_store.h"
#include "grn_db.h"

typedef struct _grn_cache_entry grn_cache_entry;
typedef struct _grn_cache_entry_memory grn_cache_entry_memory;

struct _grn_cache_entry_memory {
grn_cache_entry_memory *next;
grn_cache_entry_memory *prev;
grn_obj *value;
grn_timeval tv;
grn_id id;
};

typedef struct _grn_cache_entry_persistent {
grn_id next;
grn_id prev;
grn_timeval modified_time;
} grn_cache_entry_persistent;

struct _grn_cache {
union {
struct {
grn_cache_entry *next;
grn_cache_entry *prev;
grn_cache_entry_memory *next;
grn_cache_entry_memory *prev;
grn_hash *hash;
grn_mutex mutex;
} memory;
struct {
grn_hash *keys;
grn_ja *data;
grn_pat *timestamps;
grn_ja *values;
int timeout;
} persistent;
} impl;
grn_bool is_memory;
Expand All @@ -47,13 +61,10 @@ struct _grn_cache {
uint32_t nhits;
};

struct _grn_cache_entry {
grn_cache_entry *next;
grn_cache_entry *prev;
grn_obj *value;
grn_timeval tv;
grn_id id;
};
#define GRN_CACHE_PERSISTENT_ROOT_ID 1
#define GRN_CACHE_PERSISTENT_ROOT_KEY "\0"
#define GRN_CACHE_PERSISTENT_ROOT_KEY_LEN \
(sizeof(GRN_CACHE_PERSISTENT_ROOT_KEY) - 1)

static grn_ctx grn_cache_ctx;
static grn_cache *grn_cache_current = NULL;
Expand All @@ -62,12 +73,12 @@ static grn_cache *grn_cache_default = NULL;
inline static void
grn_cache_open_memory(grn_ctx *ctx, grn_cache *cache)
{
cache->impl.memory.next = (grn_cache_entry *)cache;
cache->impl.memory.prev = (grn_cache_entry *)cache;
cache->impl.memory.next = (grn_cache_entry_memory *)cache;
cache->impl.memory.prev = (grn_cache_entry_memory *)cache;
cache->impl.memory.hash = grn_hash_create(cache->ctx,
NULL,
GRN_CACHE_MAX_KEY_SIZE,
sizeof(grn_cache_entry),
sizeof(grn_cache_entry_memory),
GRN_OBJ_KEY_VAR_SIZE);
if (!cache->impl.memory.hash) {
ERR(GRN_NO_MEMORY_AVAILABLE, "[cache] failed to create hash table");
Expand All @@ -79,6 +90,32 @@ grn_cache_open_memory(grn_ctx *ctx, grn_cache *cache)
inline static void
grn_cache_open_persistent(grn_ctx *ctx, grn_cache *cache)
{
cache->impl.persistent.keys =
grn_hash_create(cache->ctx,
NULL,
GRN_CACHE_MAX_KEY_SIZE,
sizeof(grn_cache_entry_persistent),
GRN_OBJ_KEY_VAR_SIZE);
{
grn_cache_entry_persistent *entry;
/* TODO: validate ID == GRN_CACHE_PERSISTENT_ROOT_ID */
grn_hash_add(ctx,
cache->impl.persistent.keys,
GRN_CACHE_PERSISTENT_ROOT_KEY,
GRN_CACHE_PERSISTENT_ROOT_KEY_LEN,
(void **)&entry,
NULL);
entry->next = GRN_CACHE_PERSISTENT_ROOT_ID;
entry->prev = GRN_CACHE_PERSISTENT_ROOT_ID;
entry->modified_time.tv_sec = 0;
entry->modified_time.tv_nsec = 0;
}
cache->impl.persistent.values =
grn_ja_create(cache->ctx,
NULL,
1 << 16,
0);
cache->impl.persistent.timeout = 1000;
}

grn_cache *
Expand Down Expand Up @@ -126,7 +163,7 @@ exit :
inline static void
grn_cache_close_memory(grn_ctx *ctx, grn_cache *cache)
{
grn_cache_entry *vp;
grn_cache_entry_memory *vp;

GRN_HASH_EACH(ctx, cache->impl.memory.hash, id, NULL, NULL, &vp, {
grn_obj_close(ctx, vp->value);
Expand All @@ -136,8 +173,10 @@ grn_cache_close_memory(grn_ctx *ctx, grn_cache *cache)
}

inline static void
grn_cache_close_persistent(grn_ctx *tx, grn_cache *cache)
grn_cache_close_persistent(grn_ctx *ctx, grn_cache *cache)
{
grn_hash_close(ctx, cache->impl.persistent.keys);
grn_ja_close(ctx, cache->impl.persistent.values);
}

grn_rc
Expand Down Expand Up @@ -221,22 +260,80 @@ grn_cache_get_statistics(grn_ctx *ctx, grn_cache *cache,
}

static void
grn_cache_expire_entry_memory(grn_cache *cache, grn_cache_entry *ce)
grn_cache_expire_entry_memory(grn_cache *cache, grn_cache_entry_memory *ce)
{
ce->prev->next = ce->next;
ce->next->prev = ce->prev;
grn_obj_close(cache->ctx, ce->value);
grn_hash_delete_by_id(cache->ctx, cache->impl.memory.hash, ce->id, NULL);
}

static void
grn_cache_entry_persistent_delete_link(grn_cache *cache,
grn_cache_entry_persistent *entry)
{
grn_ctx *ctx = cache->ctx;
grn_hash *keys = cache->impl.persistent.keys;
grn_cache_entry_persistent *prev_entry;
grn_cache_entry_persistent *next_entry;

prev_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
entry->prev,
NULL);
next_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
entry->next,
NULL);
prev_entry->next = entry->next;
next_entry->prev = entry->prev;
}

static void
grn_cache_entry_persistent_prepend_link(grn_cache *cache,
grn_cache_entry_persistent *entry,
grn_id entry_id,
grn_cache_entry_persistent *head_entry,
grn_id head_entry_id)
{
grn_ctx *ctx = cache->ctx;
grn_hash *keys = cache->impl.persistent.keys;
grn_cache_entry_persistent *head_next_entry;

entry->next = head_entry->next;
entry->prev = head_entry_id;
head_next_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
head_entry->next,
NULL);
head_next_entry->prev = entry_id;
head_entry->next = entry_id;
}

static void
grn_cache_expire_entry_persistent(grn_cache *cache,
grn_cache_entry_persistent *entry,
grn_id cache_id)
{
grn_hash *keys = cache->impl.persistent.keys;
grn_ja *values = cache->impl.persistent.values;

grn_cache_entry_persistent_delete_link(cache, entry);
grn_ja_put(cache->ctx, values, cache_id, NULL, 0, GRN_OBJ_SET, NULL);
grn_hash_delete_by_id(cache->ctx, keys, cache_id, NULL);
}

inline static grn_rc
grn_cache_fetch_memory(grn_ctx *ctx, grn_cache *cache,
const char *key, uint32_t key_len,
grn_obj *output)
{
/* TODO: How about GRN_NOT_FOUND? */
grn_rc rc = GRN_INVALID_ARGUMENT;
grn_cache_entry *ce;
grn_cache_entry_memory *ce;

MUTEX_LOCK(cache->impl.memory.mutex);
cache->nfetches++;
Expand All @@ -254,7 +351,8 @@ grn_cache_fetch_memory(grn_ctx *ctx, grn_cache *cache,
ce->prev->next = ce->next;
ce->next->prev = ce->prev;
{
grn_cache_entry *ce0 = (grn_cache_entry *)(&(cache->impl.memory));
grn_cache_entry_memory *ce0 =
(grn_cache_entry_memory *)(&(cache->impl.memory));
ce->next = ce0->next;
ce->prev = ce0;
ce0->next->prev = ce;
Expand All @@ -274,6 +372,54 @@ grn_cache_fetch_persistent(grn_ctx *ctx, grn_cache *cache,
{
/* TODO: How about GRN_NOT_FOUND? */
grn_rc rc = GRN_INVALID_ARGUMENT;
grn_hash *keys = cache->impl.persistent.keys;
grn_ja *values = cache->impl.persistent.values;
grn_id cache_id;
grn_cache_entry_persistent *entry;

if (key_len == GRN_CACHE_PERSISTENT_ROOT_KEY_LEN &&
memcmp(key,
GRN_CACHE_PERSISTENT_ROOT_KEY,
GRN_CACHE_PERSISTENT_ROOT_KEY_LEN) == 0) {
return rc;
}

rc = grn_io_lock(ctx, keys->io, grn_lock_timeout);
if (rc != GRN_SUCCESS) {
return rc;
}

cache->nfetches++;
cache_id = grn_hash_get(cache->ctx, keys, key, key_len, (void **)&entry);
if (cache_id != GRN_ID_NIL) {
if (entry->modified_time.tv_sec <=
grn_db_get_last_modified(ctx, ctx->impl->db)) {
grn_cache_expire_entry_persistent(cache, entry, cache_id);
goto exit;
}

rc = GRN_SUCCESS;
grn_ja_get_value(ctx, values, cache_id, output);
grn_cache_entry_persistent_delete_link(cache, entry);
{
grn_cache_entry_persistent *head_entry;
head_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
GRN_CACHE_PERSISTENT_ROOT_ID,
NULL);
grn_cache_entry_persistent_prepend_link(cache,
entry,
cache_id,
head_entry,
GRN_CACHE_PERSISTENT_ROOT_ID);
}
cache->nhits++;
}

exit :
grn_io_unlock(keys->io);

return rc;
}

Expand All @@ -298,12 +444,11 @@ grn_cache_update_memory(grn_ctx *ctx, grn_cache *cache,
{
grn_id id;
int added = 0;
grn_cache_entry *ce;
grn_cache_entry_memory *ce;
grn_rc rc = GRN_SUCCESS;
grn_obj *old = NULL;
grn_obj *obj = NULL;


MUTEX_LOCK(cache->impl.memory.mutex);
obj = grn_obj_open(cache->ctx, GRN_BULK, 0, GRN_DB_TEXT);
if (!obj) {
Expand All @@ -322,7 +467,8 @@ grn_cache_update_memory(grn_ctx *ctx, grn_cache *cache,
ce->value = obj;
ce->tv = ctx->impl->tv;
{
grn_cache_entry *ce0 = (grn_cache_entry *)(&(cache->impl.memory));
grn_cache_entry_memory *ce0 =
(grn_cache_entry_memory *)(&(cache->impl.memory));
ce->next = ce0->next;
ce->prev = ce0;
ce0->next->prev = ce;
Expand All @@ -345,6 +491,56 @@ grn_cache_update_persistent(grn_ctx *ctx, grn_cache *cache,
const char *key, uint32_t key_len,
grn_obj *value)
{
grn_rc rc;
grn_hash *keys = cache->impl.persistent.keys;
grn_id cache_id;
grn_cache_entry_persistent *entry;
int added;

if (key_len == GRN_CACHE_PERSISTENT_ROOT_KEY_LEN &&
memcmp(key,
GRN_CACHE_PERSISTENT_ROOT_KEY,
GRN_CACHE_PERSISTENT_ROOT_KEY_LEN) == 0) {
return;
}

rc = grn_io_lock(ctx, keys->io, grn_lock_timeout);
if (rc != GRN_SUCCESS) {
return;
}

cache_id = grn_hash_add(cache->ctx, keys, key, key_len, (void **)&entry,
&added);
if (cache_id) {
grn_cache_entry_persistent *head_entry;

if (!added) {
grn_cache_entry_persistent_delete_link(cache, entry);
}
entry->modified_time = ctx->impl->tv;

head_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
GRN_CACHE_PERSISTENT_ROOT_ID,
NULL);
grn_cache_entry_persistent_prepend_link(cache,
entry,
cache_id,
head_entry,
GRN_CACHE_PERSISTENT_ROOT_ID);
if (GRN_HASH_SIZE(keys) > cache->max_nentries) {
grn_cache_entry_persistent *tail_entry;
tail_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
head_entry->prev,
NULL);
grn_cache_expire_entry_persistent(cache, tail_entry, head_entry->prev);
}
}

grn_io_unlock(keys->io);
}

void
Expand All @@ -363,7 +559,8 @@ grn_cache_update(grn_ctx *ctx, grn_cache *cache,
inline static void
grn_cache_expire_memory(grn_cache *cache, int32_t size)
{
grn_cache_entry *ce0 = (grn_cache_entry *)(&(cache->impl.memory));
grn_cache_entry_memory *ce0 =
(grn_cache_entry_memory *)(&(cache->impl.memory));
MUTEX_LOCK(cache->impl.memory.mutex);
while (ce0 != ce0->prev && size--) {
grn_cache_expire_entry_memory(cache, ce0->prev);
Expand All @@ -374,6 +571,32 @@ grn_cache_expire_memory(grn_cache *cache, int32_t size)
inline static void
grn_cache_expire_persistent(grn_cache *cache, int32_t size)
{
grn_rc rc;
grn_ctx *ctx = cache->ctx;
grn_hash *keys = cache->impl.persistent.keys;
grn_cache_entry_persistent *head_entry;

rc = grn_io_lock(ctx, keys->io, grn_lock_timeout);
if (rc != GRN_SUCCESS) {
return;
}

head_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
GRN_CACHE_PERSISTENT_ROOT_ID,
NULL);
while (head_entry->prev != GRN_CACHE_PERSISTENT_ROOT_ID && size > 0) {
grn_cache_entry_persistent *tail_entry;
tail_entry =
(grn_cache_entry_persistent *)grn_hash_get_value_(ctx,
keys,
head_entry->prev,
NULL);
grn_cache_expire_entry_persistent(cache, tail_entry, head_entry->prev);
size--;
}
grn_io_unlock(keys->io);
}

void
Expand Down

0 comments on commit b6f3dca

Please sign in to comment.