Skip to content

Commit

Permalink
WIP: Don't offer transactions API, just use it when necessary.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gregory Burd committed Aug 4, 2012
1 parent adb75c5 commit d8afe1c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 121 deletions.
159 changes: 94 additions & 65 deletions c_src/lsm_tree_nifs.c
Expand Up @@ -39,12 +39,10 @@ static lsm_env erl_nif_env;
typedef struct {
lsm_db *pDb; /* LSM database handle */
lsm_cursor* pSharedCsr; /* for use in get() and delete() */
int txn_depth; /* inner most transaction */
} LsmTreeHandle;

typedef struct {
lsm_cursor *pCsr; /* LSM cursor handle */
int txn_id; /* always > 2 */
LsmTreeHandle *tree_handle;
} LsmCursorHandle;

Expand All @@ -69,10 +67,12 @@ static ERL_NIF_TERM ATOM_NORMAL;
static ERL_NIF_TERM ATOM_FULL;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_FALSE;
static ERL_NIF_TERM ATOM_CREATE; // Shorthand for CREATE_IF_MISSING below
static ERL_NIF_TERM ATOM_CREATE_IF_MISSING; //TODO
static ERL_NIF_TERM ATOM_ERROR_IF_EXISTS; //TODO
static ERL_NIF_TERM ATOM_CHECKSUM_VALUES; //TODO
static ERL_NIF_TERM ATOM_PUT;
static ERL_NIF_TERM ATOM_DELETE;
static ERL_NIF_TERM ATOM_CREATE; // Shorthand for CREATE_IF_MISSING below
static ERL_NIF_TERM ATOM_CREATE_IF_MISSING; //TODO, not yet implemented.
static ERL_NIF_TERM ATOM_ERROR_IF_EXISTS; //TODO, not yet implemented.
static ERL_NIF_TERM ATOM_CHECKSUM_VALUES; //TODO, not yet implemented.

// Prototypes
static ERL_NIF_TERM lsm_tree_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
Expand All @@ -92,9 +92,7 @@ static ERL_NIF_TERM lsm_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NI
static ERL_NIF_TERM lsm_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_cursor_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_txn_begin(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_txn_commit(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_txn_abort(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_transact(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_tree_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_tree_flush(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM lsm_tree_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
Expand Down Expand Up @@ -133,9 +131,7 @@ static ErlNifFunc nif_funcs[] =
{"cursor_first", 1, lsm_cursor_first},
{"cursor_last", 1, lsm_cursor_last},
{"cursor_delete", 1, lsm_cursor_delete},
{"txn_begin", 2, lsm_txn_begin},
{"txn_commit", 2, lsm_txn_commit},
{"txn_abort", 2, lsm_txn_abort},
{"transact", 2, lsm_transact},
};


Expand Down Expand Up @@ -410,8 +406,7 @@ static ERL_NIF_TERM lsm_tree_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM
lsm_db* db = tree_handle->pDb;

/* Rollback any uncommitted write transactions */
if (tree_handle->txn_depth >= 2)
lsm_rollback(db, tree_handle->txn_depth);
lsm_rollback(db, 2);

/* Close the shared cursor */
lsm_cursor* cursor = tree_handle->pSharedCsr;
Expand All @@ -438,7 +433,6 @@ static int __shared_cursor(ErlNifEnv* env, LsmTreeHandle* tree_handle)
tree_handle->pSharedCsr = 0;
return make_error(env, rc);
}
tree_handle->txn_depth = 1 > tree_handle->txn_depth ? 1 : tree_handle->txn_depth;
return LSM_OK;
}

Expand Down Expand Up @@ -496,8 +490,18 @@ static ERL_NIF_TERM lsm_tree_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM ar
int rc = LSM_OK;
lsm_db* db = tree_handle->pDb;

// rc = lsm_begin(db, 2); DPRINTF("lsm_tree_put/lsm_begin() = %d\n", rc);
// if (rc != LSM_OK) return make_error(env, rc);

rc = lsm_write(db, key.data, key.size, value.data, value.size);
return rc == LSM_OK ? ATOM_OK : make_error(env, rc);
// if (rc == LSM_OK) {
// int trc = lsm_commit(db, 2); DPRINTF("lsm_tree_put/lsm_commit() = %d/%d\n", rc, trc);
// return LSM_OK;
// } else {
// int trc = lsm_rollback(db, 2); DPRINTF("lsm_tree_put/lsm_rollback() = %d/%d\n", rc, trc);
// return make_error(env, rc);
// }
}
return ATOM_BADARG;
}
Expand Down Expand Up @@ -530,12 +534,15 @@ static ERL_NIF_TERM lsm_tree_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM
return make_error(env, rc);
}

// TODO: do we need a new txn around the delete?
// rc = lsm_begin(db, tree_handle->txn_depth+1); // race?
// if (rc != LSM_OK) return make_error(env, rc);
rc = lsm_begin(db, 2);
if (rc != LSM_OK) return make_error(env, rc);

rc = lsm_delete(db, raw_key, raw_key_size);
if (rc != LSM_OK) {
if (rc == LSM_OK) {
lsm_commit(db, 2);
return LSM_OK;
} else {
lsm_rollback(db, 2);
lsm_csr_close(cursor);
tree_handle->pSharedCsr = 0;
return make_error(env, rc);
Expand Down Expand Up @@ -800,7 +807,7 @@ static ERL_NIF_TERM __cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_T
if (lsm_csr_invalid(cursor))
return ATOM_BADARG;
}
return __make_error(env, rc);
return make_error(env, rc);
}
return ATOM_BADARG;
}
Expand Down Expand Up @@ -873,66 +880,86 @@ static ERL_NIF_TERM lsm_cursor_delete(ErlNifEnv* env, int argc, const ERL_NIF_TE
return ATOM_BADARG;
}

/* Transactions:
* level==1 -- begin an outermost read transaction
* level==2 -- begin an outermost write transaction
* level>2 -- begin a nested write transaction
*
* 'level' may not be less than 1. After this routine returns successfully
* the transaction level will be equal to level. The transaction level
* must be at least 1 to read and at least 2 to write.
*/
static ERL_NIF_TERM lsm_txn_begin(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
// -spec transact(tree(), [transact_spec()]) -> ok | {error, term()}.
static ERL_NIF_TERM lsm_transact(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
LsmTreeHandle* tree_handle;
int level = 0;

if (!(enif_get_resource(env, argv[0], lsm_tree_RESOURCE, (void**)&tree_handle) &&
enif_get_int(env, argv[1], &level)) || level < 1)
return ATOM_BADARG;

LsmTreeHandle *tree_handle;
ERL_NIF_TERM head, tail, list;
const ERL_NIF_TERM* array;
int rc = LSM_OK;
lsm_db* db = tree_handle->pDb;
rc = lsm_begin(db, level);
return rc == LSM_OK ? ATOM_OK : make_error(env, rc);
}

static ERL_NIF_TERM lsm_txn_commit(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
LsmTreeHandle* tree_handle;
int level = 0;

if (!(enif_get_resource(env, argv[0], lsm_tree_RESOURCE, (void**)&tree_handle) &&
enif_get_int(env, argv[1], &level)) || level < 1)
if (!(enif_get_resource(env, argv[0], lsm_tree_RESOURCE, (void**)&tree_handle) && argc == 2))
return ATOM_BADARG;

int rc = LSM_OK;
lsm_db* db = tree_handle->pDb;
rc = lsm_commit(db, level);
return rc == LSM_OK ? ATOM_OK : make_error(env, rc);
}

static ERL_NIF_TERM lsm_txn_abort(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
LsmTreeHandle* tree_handle;
int level = 0;
// We'll use the shared cursor to position, then delete keys
rc = __shared_cursor(env, tree_handle);
if (rc != LSM_OK) return make_error(env, rc);
lsm_cursor* cursor = tree_handle->pSharedCsr;

if (!(enif_get_resource(env, argv[0], lsm_tree_RESOURCE, (void**)&tree_handle) &&
enif_get_int(env, argv[1], &level)) || level < 1)
return ATOM_BADARG;
// put/delete/trasact calls are serialized so they all operate within the "2"
// transaction isolation level.
rc = lsm_begin(db, 2);
if (rc != LSM_OK) return make_error(env, rc);

int rc = LSM_OK;
lsm_db* db = tree_handle->pDb;
rc = lsm_rollback(db, level);
return rc == LSM_OK ? ATOM_OK : make_error(env, rc);
// Now, simply iterate over the list of operations, perform them atomically.
//-type transact_spec() :: {put, key(), value()} | {delete, value()}.
list = argv[1];
while (enif_get_list_cell(env, list, &head, &tail)) {
static char msg[1024], o[1024];
ErlNifBinary key, value;
int arity;

if (!enif_get_tuple(env, head, &arity, &array)) {
if (array[0] == ATOM_PUT && arity == 3) {
rc = enif_inspect_binary(env, array[1], &key);
if (!rc) goto err;
rc = enif_inspect_binary(env, array[2], &value);
if (!rc) goto err;
rc = lsm_write(db, key.data, key.size, value.data, value.size);
if (rc != LSM_OK) goto err;
} else if (array[0] == ATOM_DELETE && arity == 2) {
rc = enif_inspect_binary(env, array[1], &key);
if (!rc) goto err;
rc = lsm_csr_seek(cursor, key.data, key.size, LSM_SEEK_EQ);
if (rc == LSM_OK) {
// Silently ignore deletes for non-existent keys.
if (lsm_csr_invalid(cursor))
continue;
void *raw_key;
int raw_key_size;
rc = lsm_csr_key(cursor, &raw_key, &raw_key_size);
if (rc != LSM_OK) goto err;
rc = lsm_delete(db, raw_key, raw_key_size);
if (rc != LSM_OK) goto err;
} // else... Again, silently ignore deletes for non-existent keys.
} else {
enif_get_atom(env, head, o, sizeof o, ERL_NIF_LATIN1);
snprintf(msg, 1024, "lsm_tree:transact \"%s\" is not a valid tuple", o);
lsm_csr_close(cursor);
tree_handle->pSharedCsr = 0;
lsm_rollback(db, 2);
return make_error_msg(env, msg); // TODO should goto err...
goto err;
}
}
}
lsm_commit(db, 2);
return LSM_OK;
err:
lsm_csr_close(cursor);
tree_handle->pSharedCsr = 0;
lsm_rollback(db, 2);
return make_error(env, rc);
}

#if 0 //TODO
static ERL_NIF_TERM lsm_snapshot??(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
static ERL_NIF_TERM lsm_snapshot(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
}

static ERL_NIF_TERM lsm_stats??(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
static ERL_NIF_TERM lsm_stats(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
lsm_info();
}
Expand Down Expand Up @@ -1033,6 +1060,8 @@ static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOM_FULL = make_atom(env, "full");
ATOM_TRUE = make_atom(env, "true");
ATOM_FALSE = make_atom(env, "false");
ATOM_PUT = make_atom(env, "put");
ATOM_DELETE = make_atom(env, "delete");
ATOM_CREATE = make_atom(env, "create");
ATOM_CREATE_IF_MISSING = make_atom(env, "create_if_missing");
ATOM_ERROR_IF_EXISTS = make_atom(env, "error_if_exists");
Expand Down
60 changes: 4 additions & 56 deletions src/lsm_tree.erl
Expand Up @@ -65,7 +65,6 @@
, cursor_first/1
, cursor_last/1
, cursor_delete/1
, txn_begin/2, txn_commit/2, txn_abort/2, txn_rollback/2
, transact/2
% TODO snapshot/1, stat/2
]).
Expand All @@ -84,7 +83,6 @@

-opaque tree() :: reference().
-opaque cursor() :: reference().
-opaque txn() :: pos_integer().
-type config_list() :: [{atom(), any()}].
-type key() :: binary().
-type value() :: binary().
Expand Down Expand Up @@ -232,60 +230,11 @@ cursor_last(_Cursor) ->
cursor_delete(_Ref) ->
?nif_stub.

-spec txn_begin(tree(), txn()) -> ok | {error, term()}.
txn_begin(_Ref, _Txn) ->
?nif_stub.

-spec txn_commit(tree(), txn()) -> ok | {error, term()}.
txn_commit(_Ref, _Txn) ->
?nif_stub.

-spec txn_rollback(tree(), txn()) -> ok | {error, term()}.
txn_rollback(Ref, Txn) ->
txn_abort(Ref, Txn).

-spec txn_abort(tree(), txn()) -> ok | {error, term()}.
txn_abort(_Ref, _Txn) ->
?nif_stub.

-type transact_spec() :: {put, key(), value()} | {delete, value()}.

-spec do_transact(tree(), txn(), [transact_spec()]) -> ok | {error, term()}.
do_transact(Tree, Txn, [{put, Key, Value} | Rest]) ->
case lsm_tree:put(Tree, Key, Value) of
ok ->
do_transact(Tree, Txn, Rest);
{error, Reason} ->
{error, Reason}
end;
do_transact(Tree, Txn, [{delete, Key} | Rest]) ->
case lsm_tree:delete(Tree, Key) of
ok ->
do_transact(Tree, Txn, Rest);
{error, Reason} ->
{error, Reason}
end;
do_transact(_Tree, _Txn, []) ->
ok.

-spec transact(tree(), [transact_spec()]) -> ok | {error, term()}.
transact(Tree, TransactionSpec) ->
Txn = 2,
try
txn_begin(Tree, Txn),
case do_transact(Tree, Txn, TransactionSpec) of
ok ->
txn_commit(Tree, Txn);
{error, Reason} ->
txn_abort(Tree, Txn),
{error, Reason}
end
catch
Class:Exception ->
?log("Exception in lsm_tree transact: ~p ~p", [Exception, erlang:get_stacktrace()]),
txn_abort(Tree, Txn),
{error, {Class, Exception, erlang:get_stacktrace()}}
end.
transact(_Tree, _TransactionSpec) ->
?nif_stub.

-type fold_keys_fun() :: fun((key(), any()) -> any()).

Expand Down Expand Up @@ -461,14 +410,13 @@ reset_db(DataDir) ->
?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))).

open_db(DataDir) ->
%% 104857600 bytes == 10MB
reset_db(DataDir),
{ok, Ref} = ?MODULE:open(DataDir, [{create,true},{cache_size,104857600}]),
{ok, Ref} = ?MODULE:open(DataDir),
Ref.

open_test_config(DataDir) ->
reset_db(DataDir),
{ok, Ref} = ?MODULE:open(DataDir, [{create,true},{cache_size,104857600}]),
{ok, Ref} = ?MODULE:open(DataDir),
%% TODO
Ref.

Expand Down

0 comments on commit d8afe1c

Please sign in to comment.