diff --git a/rebar.config b/rebar.config index 922eb68..82c5df0 100644 --- a/rebar.config +++ b/rebar.config @@ -40,6 +40,7 @@ ]}, {test, [ {deps, [ + {rand_str, "~>1.0"}, {cmd, "~>1.0"} ]} ]} diff --git a/src/fmke_driver_opt_antidote.erl b/src/fmke_driver_opt_antidote.erl index 3e44625..c8cdc70 100644 --- a/src/fmke_driver_opt_antidote.erl +++ b/src/fmke_driver_opt_antidote.erl @@ -8,6 +8,10 @@ -include("fmke.hrl"). -include("fmke_kv.hrl"). +%% antidotec_pb:commit_transaction has some typing issues. +%% Leave this here until sure that function will not return this error. +-dialyzer({no_match, txn_commit_w_retry/4}). + %% FMKE driver API -export([ start/1, @@ -43,6 +47,7 @@ -define(SERVER, ?MODULE). +-define(ANTIDOTE_TRANSACTION_RETRIES, 3). -define(MAP, antidote_crdt_map_go). -define(LWWREG, antidote_crdt_register_lww). -define(ORSET, antidote_crdt_set_aw). @@ -490,6 +495,7 @@ process_get_request(Key, Type, Txn) -> parse_read_result(get(Key, Type, Txn)). parse_read_result({_Crdt, []}) -> {error, not_found}; +parse_read_result({timeout, _}) -> {error, timeout}; parse_read_result({_Crdt, Object}) -> Object; parse_read_result(_) -> erlang:error(unknown_object_type). @@ -530,13 +536,29 @@ txn_update_objects(ObjectUpdates, {Pid, TxnDetails}) -> %% A wrapper for Antidote's commit_transaction function -spec txn_commit(TxnDetails :: txid()) -> ok | {error, term()}. txn_commit({Pid, TxnDetails}) -> - Result = case antidotec_pb:commit_transaction(Pid, TxnDetails) of - {ok, _CommitTimestamp} -> ok; - Error -> Error - end, + Result = txn_commit_w_retry(Pid, TxnDetails, 0, ?ANTIDOTE_TRANSACTION_RETRIES), fmke_db_conn_manager:checkin(Pid), Result. +txn_commit_w_retry(Pid, Txn, MaxTry, MaxTry) -> + lager:error("Transaction ~p failed, aborting...~n"), + case antidotec_pb:abort_transaction(Pid, Txn) of + ok -> + {error, transaction_aborted_max_commit_attempts}; + {error, Error} -> + lager:error("Transaction ~p could not be aborted, error returned: ~p~n", [Txn, Error]), + throw({error, transaction_failed_abort_failed}) + end; + +txn_commit_w_retry(Pid, Txn, CurrTry, MaxTry) -> + case antidotec_pb:commit_transaction(Pid, Txn) of + {ok, _} -> + ok; + {error, Error} -> + lager:warning("Transaction commit failed for ~p: ~p, retrying...~n", [Txn, Error]), + txn_commit_w_retry(Pid, Txn, CurrTry + 1, MaxTry) + end. + %% ------------------------------------------------------------------------------------------------ %% Simple API - Recommended way to interact with Antidote diff --git a/test/ct/fmke_antidote_transactions_SUITE.erl b/test/ct/fmke_antidote_transactions_SUITE.erl new file mode 100644 index 0000000..26ad8fc --- /dev/null +++ b/test/ct/fmke_antidote_transactions_SUITE.erl @@ -0,0 +1,169 @@ +%%%------------------------------------------------------------------- +%%% File : fmke_antidote_transactions_SUITE.erl +%%% Author : Gonçalo Tomás +%%% Description : Tests the behaviour of AntidoteDB transactions. +%%% Created : Mon 1 Oct 2018 17:42 +%%%------------------------------------------------------------------- +-module(fmke_antidote_transactions_SUITE). +-include("fmke.hrl"). + +-compile([export_all, nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define (NODENAME, 'fmke@127.0.0.1'). +-define (COOKIE, fmke). + +suite() -> + [{timetrap, {minutes, 5}}]. + +%%-------------------------------------------------------------------- +%% Function: init_per_suite(Config0) -> +%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%%-------------------------------------------------------------------- +init_per_suite(Config) -> + {ok, _} = net_kernel:start(['fmke_antidote_ct@127.0.0.1']), + true = erlang:set_cookie('fmke_antidote_ct@127.0.0.1', ?COOKIE), + fmke_test_setup:start_node_with_antidote_backend(?NODENAME, true, non_nested), + true = erlang:set_cookie(?NODENAME, ?COOKIE), + Config. + +%%-------------------------------------------------------------------- +%% Function: end_per_suite(Config0) -> term() | {save_config,Config1} +%% Config0 = Config1 = [tuple()] +%%-------------------------------------------------------------------- +end_per_suite(_Config) -> + fmke_test_setup:stop_node(?NODENAME), + fmke_test_setup:stop_all(), + net_kernel:stop(), + ok. + +%%-------------------------------------------------------------------- +%% Function: init_per_testcase(TestCase, Config0) -> +%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} +%% TestCase = atom() +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%%-------------------------------------------------------------------- +init_per_testcase(_TestCase, Config) -> + Config. + +%%-------------------------------------------------------------------- +%% Function: end_per_testcase(TestCase, Config0) -> +%% term() | {save_config,Config1} | {fail,Reason} +%% TestCase = atom() +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%%-------------------------------------------------------------------- +end_per_testcase(_TestCase, _Config) -> + ok. + +%%-------------------------------------------------------------------- +%% Function: all() -> GroupsAndTestCases | {skip,Reason} +%% GroupsAndTestCases = [{group,GroupName} | TestCase] +%% GroupName = atom() +%% TestCase = atom() +%% Reason = term() +%%-------------------------------------------------------------------- +all() -> + [ + read_read_succeds + ,read_write_succeeds + ,write_write_aborts + ]. + +read_read_succeds(_Config) -> + Key = list_to_binary(rand_str:get(64)), + %% add key to antidote + Pid = checkout_remote_pid(), + {ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}), + BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>}, + Obj = antidotec_counter:increment(1, antidotec_counter:new()), + ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn), + {ok, _} = antidotec_pb:commit_transaction(Pid, Txn), + Pid1 = checkout_remote_pid(), + Pid2 = checkout_remote_pid(), + {ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}), + {ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}), + {ok, [Val1]} = antidotec_pb:read_objects(Pid1, [BoundObject], Txn1), + {ok, [Val2]} = antidotec_pb:read_objects(Pid2, [BoundObject], Txn2), + {ok, _} = antidotec_pb:commit_transaction(Pid1, Txn1), + {ok, _} = antidotec_pb:commit_transaction(Pid2, Txn2), + Value1 = antidotec_counter:value(Val1), + Value2 = antidotec_counter:value(Val2), + checkin_remote_pid(Pid), + checkin_remote_pid(Pid1), + checkin_remote_pid(Pid2), + ?assertEqual(Value1, Value2). + +read_write_succeeds(_Config) -> + Key = list_to_binary(rand_str:get(64)), + %% add key to antidote + Pid = checkout_remote_pid(), + {ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}), + BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>}, + Obj = antidotec_counter:increment(1, antidotec_counter:new()), + ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn), + {ok, _} = antidotec_pb:commit_transaction(Pid, Txn), + Pid1 = checkout_remote_pid(), + Pid2 = checkout_remote_pid(), + {ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}), + {ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}), + {ok, [_Val1]} = antidotec_pb:read_objects(Pid1, [BoundObject], Txn1), + ObjUpdate = antidotec_counter:increment(1, Obj), + ok = antidotec_pb:update_objects(Pid2, antidotec_counter:to_ops(BoundObject, ObjUpdate), Txn2), + {ok, _} = antidotec_pb:commit_transaction(Pid1, Txn1), + {ok, _} = antidotec_pb:commit_transaction(Pid2, Txn2), + checkin_remote_pid(Pid), + checkin_remote_pid(Pid1), + checkin_remote_pid(Pid2), + ok. + +write_write_aborts(_Config) -> + Key = list_to_binary(rand_str:get(64)), + %% add key to antidote + Pid = checkout_remote_pid(), + {ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}), + BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>}, + Obj = antidotec_counter:increment(1, antidotec_counter:new()), + ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn), + {ok, _} = antidotec_pb:commit_transaction(Pid, Txn), + Pid1 = checkout_remote_pid(), + Pid2 = checkout_remote_pid(), + {ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}), + {ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}), + ObjUpdate1 = antidotec_counter:increment(1, Obj), + ObjUpdate2 = antidotec_counter:increment(2, Obj), + ok = antidotec_pb:update_objects(Pid2, antidotec_counter:to_ops(BoundObject, ObjUpdate2), Txn2), + ok = antidotec_pb:update_objects(Pid1, antidotec_counter:to_ops(BoundObject, ObjUpdate1), Txn1), + %% check if both transactions committed successfully + case {antidotec_pb:commit_transaction(Pid1, Txn1), antidotec_pb:commit_transaction(Pid2, Txn2)} of + {{error, _}, {error, _}} -> + %% both failed + ok; + {{ok, _}, {error, _}} -> + %% one of them failed, the other succeeded + ok; + {{error, _}, {ok, _}} -> + %% one of them failed, the other succeeded + ok; + {{ok, _}, {ok, _}} -> + %% both transactions succeeded in an update to the same key at the same time + throw("write_write_transaction_succeded") + end, + checkin_remote_pid(Pid), + checkin_remote_pid(Pid1), + checkin_remote_pid(Pid2), + ok. + +checkin_remote_pid(Pid) -> + rpc(fmke_db_conn_manager,checkout, [Pid]). + +checkout_remote_pid() -> + rpc(fmke_db_conn_manager,checkout, []). + +rpc(Mod, Fun, Args) -> + rpc:call(?NODENAME, Mod, Fun, Args).