Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

added explicit close function #20

Closed
wants to merge 4 commits into from

3 participants

@moonpolysoft

Needed the ability to close a database independent of NIF cleanup / beam GC.

@dizzyd

Generally, this needs some bullet-proofing:

  • If the handle can be deleted, all operations which use it should check for deletion before executing
  • I think the handle may need to be ref-counted for proper behavior while it's also in-use for an iterator (??)

It would be extremely handy to have this functionality, though. Thanks! :)

@moonpolysoft

Added the necessary bulletproofing. Any nif call on a closed database will badarg, and if there are outstanding iterators against a database then the call to close will badarg.

@dizzyd

Hi @cliffmoon, I'm pretty sure there are locking issues remaining in this code but haven't had a chance to dig into it. The API was, IIRC, safe to use across multiple processes, but with the addition of the close I think that safety is bjorked. Happy to be proven wrong. :)

@dizzyd

FWIW, I'm hoping to take a gander at this prior to our next Riak release. Haven't forgotten...just bogged down. Lots of good LevelDB performance improvements on this way. :)

@seancribbs seancribbs was assigned
@seancribbs

@cliffmoon Looks like the explicit close function made it in during the Riak 1.2 release cycle. Closing this PR.

@seancribbs seancribbs closed this
@seancribbs seancribbs removed their assignment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 64 additions and 8 deletions.
  1. +42 −8 c_src/eleveldb.cc
  2. +1 −0  c_src/eleveldb.h
  3. +21 −0 src/eleveldb.erl
View
50 c_src/eleveldb.cc
@@ -32,6 +32,7 @@ static ErlNifResourceType* eleveldb_itr_RESOURCE;
typedef struct
{
leveldb::DB* db;
+ int iterators;
leveldb::Options options;
} eleveldb_db_handle;
@@ -86,6 +87,7 @@ static ErlNifFunc nif_funcs[] =
{"open", 2, eleveldb_open},
{"get", 3, eleveldb_get},
{"write", 3, eleveldb_write},
+ {"close", 1, eleveldb_close},
{"iterator", 2, eleveldb_iterator},
{"iterator", 3, eleveldb_iterator},
{"iterator_move", 2, eleveldb_iterator_move},
@@ -270,6 +272,7 @@ ERL_NIF_TERM eleveldb_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
(eleveldb_db_handle*) enif_alloc_resource(eleveldb_db_RESOURCE,
sizeof(eleveldb_db_handle));
memset(handle, '\0', sizeof(eleveldb_db_handle));
+ handle->iterators = 0;
handle->db = db;
handle->options = opts;
ERL_NIF_TERM result = enif_make_resource(env, handle);
@@ -287,6 +290,7 @@ ERL_NIF_TERM eleveldb_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
eleveldb_db_handle* handle;
ErlNifBinary key;
if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&handle) &&
+ handle->db &&
enif_inspect_binary(env, argv[1], &key) &&
enif_is_list(env, argv[2]))
{
@@ -322,6 +326,7 @@ ERL_NIF_TERM eleveldb_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
eleveldb_db_handle* handle;
if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&handle) &&
+ handle->db &&
enif_is_list(env, argv[1]) && // Actions
enif_is_list(env, argv[2])) // Opts
{
@@ -361,10 +366,28 @@ ERL_NIF_TERM eleveldb_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
}
+ERL_NIF_TERM eleveldb_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+ eleveldb_db_handle* db_handle;
+ if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&db_handle) &&
+ db_handle->db &&
+ db_handle->iterators == 0)
+ {
+ delete db_handle->db;
+ db_handle->db = NULL;
+ return ATOM_OK;
+ }
+ else
+ {
+ return enif_make_badarg(env);
+ }
+}
+
ERL_NIF_TERM eleveldb_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
eleveldb_db_handle* db_handle;
if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&db_handle) &&
+ db_handle->db &&
enif_is_list(env, argv[1])) // Options
{
// Increment references to db_handle for duration of the iterator
@@ -387,6 +410,9 @@ ERL_NIF_TERM eleveldb_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
itr_handle->snapshot = db_handle->db->GetSnapshot();
opts.snapshot = itr_handle->snapshot;
itr_handle->itr = db_handle->db->NewIterator(opts);
+
+ // Increment a counter so we don't have any dangling iterators on close
+ db_handle->iterators++;
// Check for keys_only iterator flag
itr_handle->keys_only = ((argc == 3) && (argv[2] == ATOM_KEYS_ONLY));
@@ -412,10 +438,10 @@ static ERL_NIF_TERM slice_to_binary(ErlNifEnv* env, leveldb::Slice s)
ERL_NIF_TERM eleveldb_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
eleveldb_itr_handle* itr_handle;
- if (enif_get_resource(env, argv[0], eleveldb_itr_RESOURCE, (void**)&itr_handle))
+ if (enif_get_resource(env, argv[0], eleveldb_itr_RESOURCE, (void**)&itr_handle) &&
+ itr_handle->db_handle->db)
{
enif_mutex_lock(itr_handle->itr_lock);
-
leveldb::Iterator* itr = itr_handle->itr;
if (itr == NULL)
@@ -484,12 +510,14 @@ ERL_NIF_TERM eleveldb_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TER
if (enif_get_resource(env, argv[0], eleveldb_itr_RESOURCE, (void**)&itr_handle))
{
enif_mutex_lock(itr_handle->itr_lock);
-
+ itr_handle->db_handle->iterators--;
if (itr_handle->itr != 0)
{
delete itr_handle->itr;
itr_handle->itr = 0;
- itr_handle->db_handle->db->ReleaseSnapshot(itr_handle->snapshot);
+ if (itr_handle->db_handle->db) {
+ itr_handle->db_handle->db->ReleaseSnapshot(itr_handle->snapshot);
+ }
enif_release_resource(itr_handle->db_handle);
}
@@ -507,6 +535,7 @@ ERL_NIF_TERM eleveldb_status(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]
eleveldb_db_handle* db_handle;
ErlNifBinary name_bin;
if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&db_handle) &&
+ db_handle->db &&
enif_inspect_binary(env, argv[1], &name_bin))
{
leveldb::Slice name((const char*)name_bin.data, name_bin.size);
@@ -582,7 +611,8 @@ ERL_NIF_TERM eleveldb_destroy(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
ERL_NIF_TERM eleveldb_is_empty(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
eleveldb_db_handle* db_handle;
- if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&db_handle))
+ if (enif_get_resource(env, argv[0], eleveldb_db_RESOURCE, (void**)&db_handle) &&
+ db_handle->db)
{
leveldb::ReadOptions opts;
leveldb::Iterator* itr = db_handle->db->NewIterator(opts);
@@ -609,7 +639,10 @@ static void eleveldb_db_resource_cleanup(ErlNifEnv* env, void* arg)
{
// Delete any dynamically allocated memory stored in eleveldb_db_handle
eleveldb_db_handle* handle = (eleveldb_db_handle*)arg;
- delete handle->db;
+ if (handle->db) {
+ delete handle->db;
+ handle->db = NULL;
+ }
}
static void eleveldb_itr_resource_cleanup(ErlNifEnv* env, void* arg)
@@ -620,10 +653,11 @@ static void eleveldb_itr_resource_cleanup(ErlNifEnv* env, void* arg)
{
delete itr_handle->itr;
itr_handle->itr = 0;
- itr_handle->db_handle->db->ReleaseSnapshot(itr_handle->snapshot);
+ if (itr_handle->db_handle->db) {
+ itr_handle->db_handle->db->ReleaseSnapshot(itr_handle->snapshot);
+ }
enif_release_resource(itr_handle->db_handle);
}
-
enif_mutex_destroy(itr_handle->itr_lock);
}
View
1  c_src/eleveldb.h
@@ -30,6 +30,7 @@ extern "C" {
ERL_NIF_TERM eleveldb_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_write(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
+ERL_NIF_TERM eleveldb_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_iterator(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM eleveldb_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
View
21 src/eleveldb.erl
@@ -26,6 +26,7 @@
put/4,
delete/3,
write/3,
+ close/1,
fold/4,
fold_keys/4,
status/2,
@@ -108,6 +109,10 @@ delete(Ref, Key, Opts) ->
write(_Ref, _Updates, _Opts) ->
erlang:nif_error({error, not_loaded}).
+-spec close(db_ref()) -> ok.
+close(_Ref) ->
+ erlang:nif_error({error, not_loaded}).
+
-spec iterator(db_ref(), read_options()) -> {ok, itr_ref()}.
iterator(_Ref, _Opts) ->
erlang:nif_error({error, not_loaded}).
@@ -193,6 +198,8 @@ open_test() ->
{ok, <<"123">>} = ?MODULE:get(Ref, <<"abc">>, []),
not_found = ?MODULE:get(Ref, <<"def">>, []).
+
+
fold_test() ->
os:cmd("rm -rf /tmp/eleveldb.fold.test"),
{ok, Ref} = open("/tmp/eleveldb.fold.test", [{create_if_missing, true}]),
@@ -255,6 +262,20 @@ compression_test() ->
CompressedSize = Size("/tmp/eleveldb.compress.1"),
?assert(UncompressedSize > CompressedSize).
+close_test() ->
+ os:cmd("rm -rf /tmp/eleveldb.close.test"),
+ {ok, Ref} = open("/tmp/eleveldb.close.test", [{create_if_missing, true}]),
+ ok = ?MODULE:close(Ref),
+ ?assertError(badarg, ?MODULE:put(Ref, <<"a">>, <<"b">>, [])),
+ ?assertError(badarg, ?MODULE:get(Ref, <<"a">>, [])),
+ ?assertError(badarg, ?MODULE:status(Ref, <<"a">>)),
+ ?assertError(badarg, ?MODULE:iterator(Ref, [])).
+
+close_with_iterator_test() ->
+ os:cmd("rm -rf /tmp/eleveldb.close_itr.test"),
+ {ok, Ref} = open("/tmp/eleveldb.close_itr.test", [{create_if_missing, true}]),
+ {ok, _Itr} = ?MODULE:iterator(Ref, []),
+ ?assertError(badarg, ?MODULE:close(Ref)).
-ifdef(EQC).
Something went wrong with that request. Please try again.