Skip to content
Permalink
Browse files
Add transaction option to disallow writes
Add `disallow_writes | allow_writes` transaction options.

If the transaction hasn't done any writes yet, it is possible to set the
`disallow_writes` option, and then every write attempt will throw a
`writes_not_allowed` error.
  • Loading branch information
nickva committed Jan 25, 2020
1 parent c89b2d6 commit 6f59a39f4fd52a05c470545913c2a81fdd80f6d1
Showing 6 changed files with 116 additions and 2 deletions.
@@ -33,6 +33,7 @@ ATOM_MAP(erlfdb_transaction);

ATOM_MAP(invalid_future_type);

ATOM_MAP(writes_not_allowed);

// Network Options
ATOM_MAP(local_address);
@@ -98,6 +99,8 @@ ATOM_MAP(lock_aware);
ATOM_MAP(used_during_commit_protection_disable);
ATOM_MAP(read_lock_aware);
ATOM_MAP(size_limit);
ATOM_MAP(allow_writes);
ATOM_MAP(disallow_writes);


// Streaming mode
@@ -817,6 +817,7 @@ erlfdb_database_create_transaction(

t->txid = 0;
t->read_only = true;
t->writes_allowed = true;

ret = enif_make_resource(env, t);
enif_release_resource(t);
@@ -855,6 +856,18 @@ erlfdb_transaction_set_option(
return enif_make_badarg(env);
}

if(IS_ATOM(argv[1], allow_writes)) {
t->writes_allowed = true;
return ATOM_ok;
} else if (IS_ATOM(argv[1], disallow_writes)) {
if(!t->read_only) {
return enif_make_badarg(env);
}
t->writes_allowed = false;
return ATOM_ok;
}


if(IS_ATOM(argv[1], causal_write_risky)) {
option = FDB_TR_OPTION_CAUSAL_WRITE_RISKY;
} else if(IS_ATOM(argv[1], causal_read_risky)) {
@@ -1295,6 +1308,10 @@ erlfdb_transaction_set(
return enif_make_badarg(env);
}

if(!t->writes_allowed) {
return enif_raise_exception(env, ATOM_writes_not_allowed);
}

if(!enif_inspect_binary(env, argv[1], &key)) {
return enif_make_badarg(env);
}
@@ -1346,6 +1363,10 @@ erlfdb_transaction_clear(
return enif_make_badarg(env);
}

if(!t->writes_allowed) {
return enif_raise_exception(env, ATOM_writes_not_allowed);
}

if(!enif_inspect_binary(env, argv[1], &key)) {
return enif_make_badarg(env);
}
@@ -1388,6 +1409,10 @@ erlfdb_transaction_clear_range(
return enif_make_badarg(env);
}

if(!t->writes_allowed) {
return enif_raise_exception(env, ATOM_writes_not_allowed);
}

if(!enif_inspect_binary(env, argv[1], &skey)) {
return enif_make_badarg(env);
}
@@ -1441,6 +1466,10 @@ erlfdb_transaction_atomic_op(
return enif_make_badarg(env);
}

if(!t->writes_allowed) {
return enif_raise_exception(env, ATOM_writes_not_allowed);
}

if(!enif_inspect_binary(env, argv[1], &key)) {
return enif_make_badarg(env);
}
@@ -1801,6 +1830,9 @@ erlfdb_transaction_add_conflict_range(
if(IS_ATOM(argv[3], read)) {
rtype = FDB_CONFLICT_RANGE_TYPE_READ;
} else if(IS_ATOM(argv[3], write)) {
if(!t->writes_allowed) {
return enif_raise_exception(env, ATOM_writes_not_allowed);
}
rtype = FDB_CONFLICT_RANGE_TYPE_WRITE;
} else {
return enif_make_badarg(env);
@@ -1934,6 +1966,42 @@ erlfdb_transaction_is_read_only(
}


static ERL_NIF_TERM
erlfdb_transaction_get_writes_allowed(
ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]
)
{
ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
ErlFDBTransaction* t;
void* res;

if(st->lib_state != ErlFDB_CONNECTED) {
return enif_make_badarg(env);
}

if(argc != 1) {
return enif_make_badarg(env);
}

if(!enif_get_resource(env, argv[0], ErlFDBTransactionRes, &res)) {
return enif_make_badarg(env);
}
t = (ErlFDBTransaction*) res;

if(!erlfdb_transaction_is_owner(env, t)) {
return enif_make_badarg(env);
}

if(t->writes_allowed) {
return ATOM_true;
} else {
return ATOM_false;
}
}


static ERL_NIF_TERM
erlfdb_get_error(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
@@ -2043,6 +2111,7 @@ static ErlNifFunc funcs[] =
NIF_FUNC(erlfdb_transaction_get_approximate_size, 1),
NIF_FUNC(erlfdb_transaction_get_next_tx_id, 1),
NIF_FUNC(erlfdb_transaction_is_read_only, 1),
NIF_FUNC(erlfdb_transaction_get_writes_allowed, 1),

NIF_FUNC(erlfdb_get_error, 1),
NIF_FUNC(erlfdb_error_predicate, 2)
@@ -60,6 +60,7 @@ typedef struct _ErlFDBTransaction
ERL_NIF_TERM owner;
unsigned int txid;
bool read_only;
bool writes_allowed;
} ErlFDBTransaction;


@@ -109,6 +109,7 @@
% Transaction status
get_next_tx_id/1,
is_read_only/1,
get_writes_allowed/1,

% Locality
get_addresses_for_key/2,
@@ -592,6 +593,13 @@ is_read_only(?IS_SS = SS) ->
is_read_only(?GET_TX(SS)).


get_writes_allowed(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_writes_allowed(Tx);

get_writes_allowed(?IS_SS = SS) ->
get_writes_allowed(?GET_TX(SS)).


get_addresses_for_key(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
wait(get_addresses_for_key(Tx, Key))
@@ -52,6 +52,7 @@
transaction_add_conflict_range/4,
transaction_get_next_tx_id/1,
transaction_is_read_only/1,
transaction_get_writes_allowed/1,
transaction_get_approximate_size/1,

get_error/1,
@@ -142,7 +143,9 @@
lock_aware |
used_during_commit_protection_disable |
read_lock_aware |
size_limit.
size_limit |
allow_writes |
disallow_writes.


-type streaming_mode() ::
@@ -427,6 +430,11 @@ transaction_is_read_only({erlfdb_transaction, Tx}) ->
erlfdb_transaction_is_read_only(Tx).


-spec transaction_get_writes_allowed(transaction()) -> true | false.
transaction_get_writes_allowed({erlfdb_transaction, Tx}) ->
erlfdb_transaction_get_writes_allowed(Tx).


-spec get_error(integer()) -> binary().
get_error(Error) ->
erlfdb_get_error(Error).
@@ -567,6 +575,7 @@ erlfdb_transaction_add_conflict_range(
) -> ?NOT_LOADED.
erlfdb_transaction_get_next_tx_id(_Transaction) -> ?NOT_LOADED.
erlfdb_transaction_is_read_only(_Transaction) -> ?NOT_LOADED.
erlfdb_transaction_get_writes_allowed(_Transaction) -> ?NOT_LOADED.
erlfdb_transaction_get_approximate_size(_Transaction) -> ?NOT_LOADED.


@@ -10,7 +10,7 @@
% License for the specific language governing permissions and limitations under
% the License.

-module(erlfdb_03_transaction_size_test).
-module(erlfdb_03_transaction_options_test).

-include_lib("eunit/include/eunit.hrl").

@@ -35,5 +35,29 @@ size_limit_test() ->
end)).


writes_allowed_test() ->
Db1 = erlfdb_util:get_test_db(),
?assertError(writes_not_allowed, erlfdb:transactional(Db1, fun(Tx) ->
?assert(erlfdb:get_writes_allowed(Tx)),

erlfdb:set_option(Tx, disallow_writes),
?assert(not erlfdb:get_writes_allowed(Tx)),

erlfdb:set_option(Tx, allow_writes),
?assert(erlfdb:get_writes_allowed(Tx)),

erlfdb:set_option(Tx, disallow_writes),
erlfdb:set(Tx, gen(10), gen(10))
end)).


once_writes_happend_cannot_disallow_them_test() ->
Db1 = erlfdb_util:get_test_db(),
?assertError(badarg, erlfdb:transactional(Db1, fun(Tx) ->
ok = erlfdb:set(Tx, gen(10), gen(10)),
erlfdb:set_option(Tx, disallow_writes)
end)).


gen(Size) ->
crypto:strong_rand_bytes(Size).

0 comments on commit 6f59a39

Please sign in to comment.