Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bz877 listkeys fix #1

Merged
3 commits merged into from Nov 22, 2010
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
129 changes: 105 additions & 24 deletions c_src/bitcask_nifs.c
Expand Up @@ -66,7 +66,6 @@ KHASH_MAP_INIT_INT(fstats, bitcask_fstats_entry*);
typedef struct
{
khash_t(entries)* entries;
khiter_t iterator;
khash_t(fstats)* fstats;
size_t key_count;
size_t key_bytes;
Expand All @@ -79,6 +78,11 @@ typedef struct
typedef struct
{
bitcask_keydir* keydir;
khiter_t iterator;
ErlNifTid il_thread; /* Iterator lock thread */
ErlNifMutex* il_signal_mutex;
ErlNifCond* il_signal;
int il_signal_flag;
} bitcask_keydir_handle;

typedef struct
Expand Down Expand Up @@ -121,7 +125,10 @@ static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_FSTAT_ERROR;
static ERL_NIF_TERM ATOM_FTRUNCATE_ERROR;
static ERL_NIF_TERM ATOM_GETFL_ERROR;
static ERL_NIF_TERM ATOM_ILT_CREATE_ERROR; /* Iteration lock thread creation error */
static ERL_NIF_TERM ATOM_ITERATION_IN_PROCESS;
static ERL_NIF_TERM ATOM_ITERATION_NOT_PERMITTED;
static ERL_NIF_TERM ATOM_ITERATION_NOT_STARTED;
static ERL_NIF_TERM ATOM_LOCK_NOT_WRITABLE;
static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_NOT_READY;
Expand Down Expand Up @@ -195,13 +202,13 @@ ERL_NIF_TERM bitcask_nifs_keydir_new0(ErlNifEnv* env, int argc, const ERL_NIF_TE
bitcask_keydir_handle* handle = enif_alloc_resource_compat(env,
bitcask_keydir_RESOURCE,
sizeof(bitcask_keydir_handle));
memset(handle, '\0', sizeof(bitcask_keydir_handle));

// Now allocate the actual keydir instance. Because it's unnamed/shared, we'll
// leave the name and lock portions null'd out
bitcask_keydir* keydir = enif_alloc_compat(env, sizeof(bitcask_keydir));
memset(keydir, '\0', sizeof(bitcask_keydir));
keydir->entries = kh_init(entries);
keydir->iterator = kh_begin(keydir->entries);
keydir->fstats = kh_init(fstats);

// Assign the keydir to our handle and hand it back
Expand Down Expand Up @@ -252,7 +259,6 @@ ERL_NIF_TERM bitcask_nifs_keydir_new1(ErlNifEnv* env, int argc, const ERL_NIF_TE

// Initialize hash tables
keydir->entries = kh_init(entries);
keydir->iterator = kh_begin(keydir->entries);
keydir->fstats = kh_init(fstats);

// Be sure to initialize the rwlock and set our refcount
Expand All @@ -269,6 +275,7 @@ ERL_NIF_TERM bitcask_nifs_keydir_new1(ErlNifEnv* env, int argc, const ERL_NIF_TE
bitcask_keydir_handle* handle = enif_alloc_resource_compat(env,
bitcask_keydir_RESOURCE,
sizeof(bitcask_keydir_handle));
memset(handle, '\0', sizeof(bitcask_keydir_handle));
handle->keydir = keydir;
ERL_NIF_TERM result = enif_make_resource(env, handle);
enif_release_resource_compat(env, handle);
Expand Down Expand Up @@ -410,10 +417,6 @@ ERL_NIF_TERM bitcask_nifs_keydir_put_int(ErlNifEnv* env, int argc, const ERL_NIF
update_fstats(env, keydir, entry.file_id, 1, 1,
entry.total_sz, entry.total_sz);

// Reset the iterator to ensure that someone doesn't cause a crash
// by trying to interleave change operations with iterations
keydir->iterator = kh_begin(keydir->entries);

RW_UNLOCK(keydir);
return ATOM_OK;
}
Expand Down Expand Up @@ -573,10 +576,6 @@ ERL_NIF_TERM bitcask_nifs_keydir_remove(ErlNifEnv* env, int argc, const ERL_NIF_

kh_del(entries, keydir->entries, itr);

// Reset the iterator to ensure that someone doesn't cause a crash
// by trying to interleave change operations with iterations
keydir->iterator = kh_begin(keydir->entries);

enif_free_compat(env, entry);
}

Expand All @@ -601,14 +600,14 @@ ERL_NIF_TERM bitcask_nifs_keydir_copy(ErlNifEnv* env, int argc, const ERL_NIF_TE
bitcask_keydir_handle* new_handle = enif_alloc_resource_compat(env,
bitcask_keydir_RESOURCE,
sizeof(bitcask_keydir_handle));
memset(handle, '\0', sizeof(bitcask_keydir_handle));

// Now allocate the actual keydir instance. Because it's unnamed/shared, we'll
// leave the name and lock portions null'd out
bitcask_keydir* new_keydir = enif_alloc_compat(env, sizeof(bitcask_keydir));
new_handle->keydir = new_keydir;
memset(new_keydir, '\0', sizeof(bitcask_keydir));
new_keydir->entries = kh_init(entries);
new_keydir->iterator = kh_begin(new_keydir->entries);
new_keydir->fstats = kh_init(fstats);

// Deep copy each item from the existing handle
Expand Down Expand Up @@ -652,17 +651,75 @@ ERL_NIF_TERM bitcask_nifs_keydir_copy(ErlNifEnv* env, int argc, const ERL_NIF_TE
}
}

static void* iterator_lock_thread(void* arg)
{
bitcask_keydir_handle* handle = (bitcask_keydir_handle*)arg;

// First, grab the read lock for iteration
R_LOCK(handle->keydir);

// Set the flag that we are ready to begin iteration
handle->il_signal_flag = 1;

// Signal the invoking thread to indicate that the read lock is acquired
// and then wait for a signal back to release the read lock.
enif_mutex_lock(handle->il_signal_mutex);
enif_cond_signal(handle->il_signal);
while (handle->il_signal_flag)
{
enif_cond_wait(handle->il_signal, handle->il_signal_mutex);
}

// Iterating thread is all done with the read lock; release it and exit
R_UNLOCK(handle->keydir);
enif_mutex_unlock(handle->il_signal_mutex);
return 0;
}

ERL_NIF_TERM bitcask_nifs_keydir_itr(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
bitcask_keydir_handle* handle;

if (enif_get_resource(env, argv[0], bitcask_keydir_RESOURCE, (void**)&handle))
{
bitcask_keydir* keydir = handle->keydir;
// If a iterator thread is already active for this keydir, bail
if (handle->il_thread)
{
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ITERATION_IN_PROCESS);
}

// Create the mutex and signal
handle->il_signal_flag = 0;
handle->il_signal_mutex = enif_mutex_create("bitcask_itr_signal_mutex");
handle->il_signal = enif_cond_create("bitcask_itr_signal");

// Grab the mutex BEFORE spawning the thread; otherwise we might miss the signal
enif_mutex_lock(handle->il_signal_mutex);

// Spawn the lock thread
int rc = enif_thread_create("bitcask_itr_lock_thread", &(handle->il_thread),
iterator_lock_thread, handle, 0);
if (rc != 0)
{
// Failed to create the lock holder -- release the lock and cleanup
enif_mutex_unlock(handle->il_signal_mutex);
enif_cond_destroy(handle->il_signal);
enif_mutex_destroy(handle->il_signal_mutex);
handle->il_thread = 0;
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_tuple2(env, ATOM_ILT_CREATE_ERROR, rc));
}

// Wait for the signal from the lock thread that iteration may commence
while (!handle->il_signal_flag)
{
enif_cond_wait(handle->il_signal, handle->il_signal_mutex);
}

// Ready to go; initialize the iterator and unlock the signal mutex
handle->iterator = kh_begin(handle->keydir->entries);
enif_mutex_unlock(handle->il_signal_mutex);

// Grab the lock and initialize the iterator
R_LOCK(keydir);
keydir->iterator = kh_begin(keydir->entries);
return ATOM_OK;
}
else
Expand All @@ -679,11 +736,17 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr_next(ErlNifEnv* env, int argc, const ERL_NI
{
bitcask_keydir* keydir = handle->keydir;

while (keydir->iterator != kh_end(keydir->entries))
if (handle->il_signal_flag != 1)
{
if (kh_exist(keydir->entries, keydir->iterator))
// Iteration not started!
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ITERATION_NOT_STARTED);
}

while (handle->iterator != kh_end(keydir->entries))
{
if (kh_exist(keydir->entries, handle->iterator))
{
bitcask_keydir_entry* entry = kh_key(keydir->entries, keydir->iterator);
bitcask_keydir_entry* entry = kh_key(keydir->entries, handle->iterator);
ErlNifBinary key;

// Alloc the binary and make sure it succeeded
Expand All @@ -705,13 +768,13 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr_next(ErlNifEnv* env, int argc, const ERL_NI
enif_make_uint(env, entry->tstamp));

// Update the iterator to the next entry
(keydir->iterator)++;
(handle->iterator)++;
return curr;
}
else
{
// No item in this slot; increment the iterator and keep looping
(keydir->iterator)++;
(handle->iterator)++;
}
}

Expand All @@ -730,10 +793,25 @@ ERL_NIF_TERM bitcask_nifs_keydir_itr_release(ErlNifEnv* env, int argc, const ERL

if (enif_get_resource(env, argv[0], bitcask_keydir_RESOURCE, (void**)&handle))
{
bitcask_keydir* keydir = handle->keydir;
if (handle->il_signal_flag != 1)
{
// Iteration not started!
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ITERATION_NOT_STARTED);
}

// Lock the signal mutex, clear the il_signal_flag and tell the lock thread
// to drop the read lock
enif_mutex_lock(handle->il_signal_mutex);
handle->il_signal_flag = 0;
enif_cond_signal(handle->il_signal);
enif_mutex_unlock(handle->il_signal_mutex);
enif_thread_join(handle->il_thread, 0);

// Cleanup
enif_cond_destroy(handle->il_signal);
enif_mutex_destroy(handle->il_signal_mutex);
handle->il_thread = 0;

// Unlock the keydir
R_UNLOCK(keydir);
return ATOM_OK;
}
else
Expand Down Expand Up @@ -1144,7 +1222,10 @@ static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOM_FSTAT_ERROR = enif_make_atom(env, "fstat_error");
ATOM_FTRUNCATE_ERROR = enif_make_atom(env, "ftruncate_error");
ATOM_GETFL_ERROR = enif_make_atom(env, "getfl_error");
ATOM_ILT_CREATE_ERROR = enif_make_atom(env, "ilt_create_error");
ATOM_ITERATION_IN_PROCESS = enif_make_atom(env, "iteration_in_process");
ATOM_ITERATION_NOT_PERMITTED = enif_make_atom(env, "iteration_not_permitted");
ATOM_ITERATION_NOT_STARTED = enif_make_atom(env, "iteration_not_started");
ATOM_LOCK_NOT_WRITABLE = enif_make_atom(env, "lock_not_writable");
ATOM_NOT_FOUND = enif_make_atom(env, "not_found");
ATOM_NOT_READY = enif_make_atom(env, "not_ready");
Expand Down
6 changes: 6 additions & 0 deletions c_src/erl_nif_compat.h
Expand Up @@ -15,6 +15,12 @@ extern "C" {
#define enif_alloc_binary_compat enif_alloc_binary
#define enif_alloc_compat enif_alloc
#define enif_free_compat enif_free
#define enif_cond_create erl_drv_cond_create
#define enif_cond_destroy erl_drv_cond_destroy
#define enif_cond_signal erl_drv_cond_signal
#define enif_cond_broadcast erl_drv_cond_broadcast
#define enif_cond_wait erl_drv_cond_wait
#define ErlNifCond ErlDrvCond
#endif /* R13B04 */

#if ERL_NIF_MAJOR_VERSION == 2 && ERL_NIF_MINOR_VERSION == 0
Expand Down