Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Dss nonblocking nif #25

Closed
wants to merge 3 commits into
from
View
@@ -36,6 +36,7 @@ case "$1" in
test)
export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
+ export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L $BASEDIR/system/lib"
export LD_LIBRARY_PATH="$BASEDIR/system/lib:$LD_LIBRARY_PATH"
@@ -51,6 +52,7 @@ case "$1" in
(cd snappy-$SNAPPY_VSN && $MAKE && $MAKE install)
export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
+ export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L $BASEDIR/system/lib"
export LD_LIBRARY_PATH="$BASEDIR/system/lib:$LD_LIBRARY_PATH"
View
@@ -33,6 +33,15 @@ typedef struct
{
leveldb::DB* db;
leveldb::Options options;
+ leveldb::WriteBatch* write_batch;
+ leveldb::WriteOptions* write_options;
+ ErlNifEnv* write_env;
+ ErlNifMutex* write_lock;
+ ErlNifCond* write_cv;
+ ErlNifTid write_thread;
+ ERL_NIF_TERM write_caller;
+ ERL_NIF_TERM write_ref;
+ bool write_thread_shutdown;
} eleveldb_db_handle;
typedef struct
@@ -80,12 +89,16 @@ static ERL_NIF_TERM ATOM_ERROR_DB_DESTROY;
static ERL_NIF_TERM ATOM_KEYS_ONLY;
static ERL_NIF_TERM ATOM_COMPRESSION;
static ERL_NIF_TERM ATOM_ERROR_DB_REPAIR;
+static ERL_NIF_TERM ATOM_ERROR_CREATE_WRITER_THREAD;
+static ERL_NIF_TERM ATOM_ERROR_WRITE_PENDING;
+
+static void* eleveldb_write_thread(void* args);
static ErlNifFunc nif_funcs[] =
{
{"open", 2, eleveldb_open},
{"get", 3, eleveldb_get},
- {"write", 3, eleveldb_write},
+ {"write_async", 4, eleveldb_write_async},
{"iterator", 2, eleveldb_iterator},
{"iterator", 3, eleveldb_iterator},
{"iterator_move", 2, eleveldb_iterator_move},
@@ -272,6 +285,29 @@ ERL_NIF_TERM eleveldb_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
memset(handle, '\0', sizeof(eleveldb_db_handle));
handle->db = db;
handle->options = opts;
+
+ // Spin up the necessary subsystems for handling writes off-scheduler
+ handle->write_batch = new leveldb::WriteBatch();
+ handle->write_options = new leveldb::WriteOptions();
+ handle->write_env = enif_alloc_env();
+ handle->write_lock = enif_mutex_create((char*)"eleveldb_write_lock");
+ handle->write_cv = enif_cond_create((char*)"eleveldb_write_cv");
+
+ int error = enif_thread_create((char*)"eleveldb_write_thread", &(handle->write_thread),
+ &eleveldb_write_thread, handle, NULL);
+ if (error != 0)
+ {
+ enif_free_env(handle->write_env);
+ enif_cond_destroy(handle->write_cv);
+ enif_mutex_destroy(handle->write_lock);
+ delete handle->write_options;
+ delete handle->write_batch;
+ delete db;
+
+ enif_release_resource(handle);
+ return enif_make_tuple2(env, ATOM_ERROR_CREATE_WRITER_THREAD, enif_make_uint(env, error));
+ }
+
ERL_NIF_TERM result = enif_make_resource(env, handle);
enif_release_resource(handle);
return enif_make_tuple2(env, ATOM_OK, result);
@@ -318,38 +354,58 @@ ERL_NIF_TERM eleveldb_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
}
-ERL_NIF_TERM eleveldb_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+
+ERL_NIF_TERM eleveldb_write_async(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
eleveldb_db_handle* handle;
if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&handle) &&
enif_is_list(env, argv[1]) && // Actions
- enif_is_list(env, argv[2])) // Opts
+ enif_is_list(env, argv[2]) && // Opts
+ enif_is_ref(env, argv[3])) // Write-Ref
{
+ ERL_NIF_TERM writeref = argv[3];
+
+ // Grab the lock
+ enif_mutex_lock(handle->write_lock);
+
+ // If another write is already in process, bail
+ if (handle->write_caller)
+ {
+ enif_mutex_unlock(handle->write_lock);
+ return enif_make_tuple2(env, ATOM_ERROR, ATOM_ERROR_WRITE_PENDING);
+ }
+
// Traverse actions and build a write batch
- leveldb::WriteBatch batch;
- ERL_NIF_TERM result = fold(env, argv[1], write_batch_item, batch);
+ ERL_NIF_TERM result = fold(env, argv[1], write_batch_item, *(handle->write_batch));
if (result == ATOM_OK)
{
// Was able to fold across all items cleanly -- apply the batch
// Parse out the write options
- leveldb::WriteOptions opts;
- fold(env, argv[2], parse_write_option, opts);
+ fold(env, argv[2], parse_write_option, *(handle->write_options));
- // TODO: Why does the API want a WriteBatch* versus a ref?
- leveldb::Status status = handle->db->Write(opts, &batch);
- if (status.ok())
- {
- return ATOM_OK;
- }
- else
- {
- return error_tuple(env, ATOM_ERROR_DB_WRITE, status);
- }
+ // Identify the calling PID and make that available to the worker
@jonmeredith

jonmeredith Apr 20, 2012

Contributor

Would benefit from a comment to explain why the Eterm gymnastics is being used (erring on the side of caution)

+ ErlNifPid caller;
+ enif_self(env, &caller);
+ handle->write_caller = enif_make_pid(handle->write_env, &caller);
+
+ // Save the write-reference for the response back to the caller;
+ // enables us to use receive optimizations in the VM
+ handle->write_ref = enif_make_copy(handle->write_env, writeref);
+
+ // Kick the writer thread and wait for a response
+ enif_cond_signal(handle->write_cv);
+ enif_mutex_unlock(handle->write_lock);
+
+ // Return the reference
@jonmeredith

jonmeredith Apr 20, 2012

Contributor

Reference is passed in now

+ return ATOM_OK;
}
else
{
- // Failed to parse out batch commands; bad item was returned from fold.
+ // Failed to parse out batch commands; bad item was returned from fold. Clear out
+ // the batch so that we aren't holding on to any copies of unused actions
+ handle->write_batch->Clear();
+ enif_mutex_unlock(handle->write_lock);
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_tuple2(env, ATOM_BAD_WRITE_ACTION,
result));
@@ -361,6 +417,61 @@ ERL_NIF_TERM eleveldb_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
}
+static void* eleveldb_write_thread(void* arg)
+{
+ eleveldb_db_handle* handle = (eleveldb_db_handle*)arg;
+ enif_mutex_lock(handle->write_lock);
+ while(1)
+ {
+ if (handle->write_thread_shutdown)
+ {
+ enif_cond_signal(handle->write_cv);
+ enif_mutex_unlock(handle->write_lock);
+ break;
+ }
+
+ if (handle->write_caller)
+ {
+ // Release the mutex while we perform the lock to ensure any calling
+ // erlang processes don't get held up.
+ enif_mutex_unlock(handle->write_lock);
+
+ leveldb::Status status = handle->db->Write(*(handle->write_options), handle->write_batch);
+
+ // Reacquire our lock
+ enif_mutex_lock(handle->write_lock);
+
+ ERL_NIF_TERM result;
+ if (status.ok())
+ {
+ result = enif_make_tuple2(handle->write_env, handle->write_ref, ATOM_OK);
+ }
+ else
+ {
+ ERL_NIF_TERM error = error_tuple(handle->write_env, ATOM_ERROR_DB_WRITE, status);
+ result = enif_make_tuple2(handle->write_env, handle->write_ref, error);
+ }
+
+ // Send back the result of the write
+ ErlNifPid caller;
+ enif_get_local_pid(handle->write_env, handle->write_caller, &caller);
+ enif_send(NULL, &caller, handle->write_env, result);
@jonmeredith

jonmeredith Apr 20, 2012

Contributor

Docs say you can only call this from a non-NIF called thread on the SMP emulator. We need to add similar protection in the NIF init code that we have in innostore https://github.com/basho/innostore/blob/master/c_src/innostore_drv.c#L188

+
+ // Clean up request and result data (we've already copied the data when
+ // sending it back to caller)
+ enif_clear_env(handle->write_env);
+ handle->write_batch->Clear();
+ handle->write_caller = 0;
+ }
+ else
+ {
+ enif_cond_wait(handle->write_cv, handle->write_lock);
+ }
+ }
+
+ return 0;
+}
+
ERL_NIF_TERM eleveldb_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
eleveldb_db_handle* db_handle;
@@ -609,6 +720,22 @@ static void eleveldb_db_resource_cleanup(ErlNifEnv* env, void* arg)
{
// Delete any dynamically allocated memory stored in eleveldb_db_handle
eleveldb_db_handle* handle = (eleveldb_db_handle*)arg;
+
+ // Shutdown the worker
+ enif_mutex_lock(handle->write_lock);
+ handle->write_thread_shutdown = true;
+ enif_cond_signal(handle->write_cv);
+ enif_mutex_unlock(handle->write_lock);
+ enif_thread_join(handle->write_thread, 0);
+
+ // Cleanup mutexes, cv, etc.
+ enif_cond_destroy(handle->write_cv);
+ enif_mutex_destroy(handle->write_lock);
+ enif_free_env(handle->write_env);
+
+ // Cleanup eleveldb stuff
+ delete handle->write_batch;
+ delete handle->write_options;
delete handle->db;
}
@@ -675,6 +802,9 @@ static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOM(ATOM_ERROR_DB_REPAIR, "error_db_repair");
ATOM(ATOM_KEYS_ONLY, "keys_only");
ATOM(ATOM_COMPRESSION, "compression");
+ ATOM(ATOM_ERROR_CREATE_WRITER_THREAD, "create_writer_thread");
+ ATOM(ATOM_ERROR_WRITE_PENDING, "write_pending");
+
return 0;
}
View
@@ -29,7 +29,7 @@ extern "C" {
// Prototypes
ERL_NIF_TERM eleveldb_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
-ERL_NIF_TERM eleveldb_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
+ERL_NIF_TERM eleveldb_write_async(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
View
@@ -33,7 +33,8 @@
repair/2,
is_empty/1]).
--export([iterator/2,
+-export([write_async/4,
+ iterator/2,
iterator_move/2,
iterator_close/1]).
@@ -105,9 +106,23 @@ delete(Ref, Key, Opts) ->
write(Ref, [{delete, Key}], Opts).
-spec write(db_ref(), write_actions(), write_options()) -> ok | {error, any()}.
-write(_Ref, _Updates, _Opts) ->
+write(Ref, Updates, Opts) ->
+ Wref = make_ref(),
+ case write_async(Ref, Updates, Opts, Wref) of
+ ok ->
+ receive
+ {Wref, Result} ->
+ Result
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+-spec write_async(db_ref(), write_actions(), write_options(), reference()) -> ok | {error, any()}.
+write_async(_Ref, _Updates, _Opts, _Wref) ->
erlang:nif_error({error, not_loaded}).
+
-spec iterator(db_ref(), read_options()) -> {ok, itr_ref()}.
iterator(_Ref, _Opts) ->
erlang:nif_error({error, not_loaded}).