From 9684033286a0da1342df4d471ba65dd7d1715b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 1 Oct 2018 14:43:32 +0100 Subject: [PATCH 1/8] add retry mechanism upon timeout failure --- src/fmke_driver_opt_antidote.erl | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/fmke_driver_opt_antidote.erl b/src/fmke_driver_opt_antidote.erl index 3e44625..e2b68e2 100644 --- a/src/fmke_driver_opt_antidote.erl +++ b/src/fmke_driver_opt_antidote.erl @@ -43,6 +43,7 @@ -define(SERVER, ?MODULE). +-define(ANTIDOTE_TRANSACTION_RETRIES, 3). -define(MAP, antidote_crdt_map_go). -define(LWWREG, antidote_crdt_register_lww). -define(ORSET, antidote_crdt_set_aw). @@ -528,15 +529,33 @@ txn_update_objects(ObjectUpdates, {Pid, TxnDetails}) -> ok = antidotec_pb:update_objects(Pid, ObjectUpdates, TxnDetails). %% A wrapper for Antidote's commit_transaction function --spec txn_commit(TxnDetails :: txid()) -> ok | {error, term()}. +-spec txn_commit(TxnDetails :: txid()) -> ok. txn_commit({Pid, TxnDetails}) -> - Result = case antidotec_pb:commit_transaction(Pid, TxnDetails) of - {ok, _CommitTimestamp} -> ok; - Error -> Error - end, + Result = txn_commit_w_retry(Pid, TxnDetails, 0, ?ANTIDOTE_TRANSACTION_RETRIES), fmke_db_conn_manager:checkin(Pid), Result. +txn_commit_w_retry(Pid, Txn, MaxTry, MaxTry) -> + lager:error("Transaction ~p failed, aborting...~n"), + case antidotec_pb:abort_transaction(Pid, Txn) of + ok -> + {error, transaction_aborted_max_commit_attempts}; + {ok, _} -> + {error, transaction_aborted_max_commit_attempts}; + Error -> + lager:error("Transaction ~p could not be aborted, error returned: ~p~n", [Txn, Error]), + throw({error, transaction_failed_abort_failed}) + end; + +txn_commit_w_retry(Pid, Txn, CurrTry, MaxTry) -> + case antidotec_pb:commit_transaction(Pid, Txn) of + {ok, _} -> + ok; + {error, Error} -> + lager:warning("Transaction commit failed for ~p: ~p, retrying...~n", [Txn, Error]), + txn_commit_w_retry(Pid, Txn, CurrTry + 1, MaxTry) + end. + %% ------------------------------------------------------------------------------------------------ %% Simple API - Recommended way to interact with Antidote From 6a7de27af3902b2b7f1740b94f48f39a2f980d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 1 Oct 2018 15:21:59 +0100 Subject: [PATCH 2/8] detect timeout in read results --- src/fmke_driver_opt_antidote.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/fmke_driver_opt_antidote.erl b/src/fmke_driver_opt_antidote.erl index e2b68e2..c6d59e5 100644 --- a/src/fmke_driver_opt_antidote.erl +++ b/src/fmke_driver_opt_antidote.erl @@ -491,6 +491,7 @@ process_get_request(Key, Type, Txn) -> parse_read_result(get(Key, Type, Txn)). parse_read_result({_Crdt, []}) -> {error, not_found}; +parse_read_result({timeout, _}) -> {error, timeout}; parse_read_result({_Crdt, Object}) -> Object; parse_read_result(_) -> erlang:error(unknown_object_type). From 4dac5befb6ea8440ddf73d7ae8902f71fbaac001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 1 Oct 2018 15:24:21 +0100 Subject: [PATCH 3/8] add random string lib to test deps --- rebar.config | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rebar.config b/rebar.config index a2c76f2..369665e 100644 --- a/rebar.config +++ b/rebar.config @@ -37,6 +37,11 @@ {plugins, [ {rebar3_lint, {git, "https://github.com/project-fifo/rebar3_lint.git", {tag, "v0.1.9"}}} ]} + ]}, + {test, [ + {deps, [ + rand_str + ]} ]} ]}. From d30bad13ae9f6e22af217bed0901177c03d30f11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 1 Oct 2018 15:24:41 +0100 Subject: [PATCH 4/8] ensure write write txn conflicts result in at least 1 abort --- test/ct/fmke_antidote_transactions_SUITE.erl | 170 +++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 test/ct/fmke_antidote_transactions_SUITE.erl diff --git a/test/ct/fmke_antidote_transactions_SUITE.erl b/test/ct/fmke_antidote_transactions_SUITE.erl new file mode 100644 index 0000000..be69c59 --- /dev/null +++ b/test/ct/fmke_antidote_transactions_SUITE.erl @@ -0,0 +1,170 @@ +%%%------------------------------------------------------------------- +%%% File : fmke_db_conn_manager_SUITE.erl +%%% Author : Gonçalo Tomás +%%% Description : Tests the behaviour of the DB connection manager +%%% under several scenarios. +%%% Created : Fri 9 Feb 2018 17:42 +%%%------------------------------------------------------------------- +-module(fmke_antidote_transactions_SUITE). +-include("fmke.hrl"). + +-compile([export_all, nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define (NODENAME, 'fmke@127.0.0.1'). +-define (COOKIE, fmke). + +suite() -> + [{timetrap, {minutes, 5}}]. + +%%-------------------------------------------------------------------- +%% Function: init_per_suite(Config0) -> +%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%%-------------------------------------------------------------------- +init_per_suite(Config) -> + {ok, _} = net_kernel:start(['fmke_antidote_ct@127.0.0.1']), + true = erlang:set_cookie('fmke_antidote_ct@127.0.0.1', ?COOKIE), + fmke_test_setup:start_node_with_antidote_backend(?NODENAME, true, non_nested), + true = erlang:set_cookie(?NODENAME, ?COOKIE), + Config. + +%%-------------------------------------------------------------------- +%% Function: end_per_suite(Config0) -> term() | {save_config,Config1} +%% Config0 = Config1 = [tuple()] +%%-------------------------------------------------------------------- +end_per_suite(_Config) -> + fmke_test_setup:stop_node(?NODENAME), + fmke_test_setup:stop_all(), + net_kernel:stop(), + ok. + +%%-------------------------------------------------------------------- +%% Function: init_per_testcase(TestCase, Config0) -> +%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} +%% TestCase = atom() +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%%-------------------------------------------------------------------- +init_per_testcase(_TestCase, Config) -> + Config. + +%%-------------------------------------------------------------------- +%% Function: end_per_testcase(TestCase, Config0) -> +%% term() | {save_config,Config1} | {fail,Reason} +%% TestCase = atom() +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%%-------------------------------------------------------------------- +end_per_testcase(_TestCase, _Config) -> + ok. + +%%-------------------------------------------------------------------- +%% Function: all() -> GroupsAndTestCases | {skip,Reason} +%% GroupsAndTestCases = [{group,GroupName} | TestCase] +%% GroupName = atom() +%% TestCase = atom() +%% Reason = term() +%%-------------------------------------------------------------------- +all() -> + [ + read_read_succeds + ,read_write_succeeds + ,write_write_aborts + ]. + +read_read_succeds(_Config) -> + Key = list_to_binary(rand_str:get(64)), + %% add key to antidote + Pid = checkout_remote_pid(), + {ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}), + BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>}, + Obj = antidotec_counter:increment(1, antidotec_counter:new()), + ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn), + {ok, _} = antidotec_pb:commit_transaction(Pid, Txn), + Pid1 = checkout_remote_pid(), + Pid2 = checkout_remote_pid(), + {ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}), + {ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}), + {ok, [Val1]} = antidotec_pb:read_objects(Pid1, [BoundObject], Txn1), + {ok, [Val2]} = antidotec_pb:read_objects(Pid2, [BoundObject], Txn2), + {ok, _} = antidotec_pb:commit_transaction(Pid1, Txn1), + {ok, _} = antidotec_pb:commit_transaction(Pid2, Txn2), + Value1 = antidotec_counter:value(Val1), + Value2 = antidotec_counter:value(Val2), + checkin_remote_pid(Pid), + checkin_remote_pid(Pid1), + checkin_remote_pid(Pid2), + ?assertEqual(Value1, Value2). + +read_write_succeeds(_Config) -> + Key = list_to_binary(rand_str:get(64)), + %% add key to antidote + Pid = checkout_remote_pid(), + {ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}), + BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>}, + Obj = antidotec_counter:increment(1, antidotec_counter:new()), + ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn), + {ok, _} = antidotec_pb:commit_transaction(Pid, Txn), + Pid1 = checkout_remote_pid(), + Pid2 = checkout_remote_pid(), + {ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}), + {ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}), + {ok, [_Val1]} = antidotec_pb:read_objects(Pid1, [BoundObject], Txn1), + ObjUpdate = antidotec_counter:increment(1, Obj), + ok = antidotec_pb:update_objects(Pid2, antidotec_counter:to_ops(BoundObject, ObjUpdate), Txn2), + {ok, _} = antidotec_pb:commit_transaction(Pid1, Txn1), + {ok, _} = antidotec_pb:commit_transaction(Pid2, Txn2), + checkin_remote_pid(Pid), + checkin_remote_pid(Pid1), + checkin_remote_pid(Pid2), + ok. + +write_write_aborts(_Config) -> + Key = list_to_binary(rand_str:get(64)), + %% add key to antidote + Pid = checkout_remote_pid(), + {ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}), + BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>}, + Obj = antidotec_counter:increment(1, antidotec_counter:new()), + ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn), + {ok, _} = antidotec_pb:commit_transaction(Pid, Txn), + Pid1 = checkout_remote_pid(), + Pid2 = checkout_remote_pid(), + {ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}), + {ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}), + ObjUpdate1 = antidotec_counter:increment(1, Obj), + ObjUpdate2 = antidotec_counter:increment(2, Obj), + ok = antidotec_pb:update_objects(Pid2, antidotec_counter:to_ops(BoundObject, ObjUpdate2), Txn2), + ok = antidotec_pb:update_objects(Pid1, antidotec_counter:to_ops(BoundObject, ObjUpdate1), Txn1), + %% check if both transactions committed successfully + case {antidotec_pb:commit_transaction(Pid1, Txn1), antidotec_pb:commit_transaction(Pid2, Txn2)} of + {{error, _}, {error, _}} -> + %% both failed + ok; + {{ok, _}, {error, _}} -> + %% one of them failed, the other succeeded + ok; + {{error, _}, {ok, _}} -> + %% one of them failed, the other succeeded + ok; + {{ok, _}, {ok, _}} -> + %% both transactions succeeded in an update to the same key at the same time + throw("write_write_transaction_succeded") + end, + checkin_remote_pid(Pid), + checkin_remote_pid(Pid1), + checkin_remote_pid(Pid2), + ok. + +checkin_remote_pid(Pid) -> + rpc(fmke_db_conn_manager,checkout, [Pid]). + +checkout_remote_pid() -> + rpc(fmke_db_conn_manager,checkout, []). + +rpc(Mod, Fun, Args) -> + rpc:call(?NODENAME, Mod, Fun, Args). From 2b36e0c072777d58819189b38bbef0a2b9a396ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 1 Oct 2018 15:30:33 +0100 Subject: [PATCH 5/8] readd error tuple to function spec --- src/fmke_driver_opt_antidote.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fmke_driver_opt_antidote.erl b/src/fmke_driver_opt_antidote.erl index c6d59e5..0e0c86c 100644 --- a/src/fmke_driver_opt_antidote.erl +++ b/src/fmke_driver_opt_antidote.erl @@ -530,7 +530,7 @@ txn_update_objects(ObjectUpdates, {Pid, TxnDetails}) -> ok = antidotec_pb:update_objects(Pid, ObjectUpdates, TxnDetails). %% A wrapper for Antidote's commit_transaction function --spec txn_commit(TxnDetails :: txid()) -> ok. +-spec txn_commit(TxnDetails :: txid()) -> ok | {error, term()}. txn_commit({Pid, TxnDetails}) -> Result = txn_commit_w_retry(Pid, TxnDetails, 0, ?ANTIDOTE_TRANSACTION_RETRIES), fmke_db_conn_manager:checkin(Pid), From 1fa57916534958f384d55b4b54164cf36479dd47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 1 Oct 2018 15:33:45 +0100 Subject: [PATCH 6/8] update file header --- test/ct/fmke_antidote_transactions_SUITE.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/ct/fmke_antidote_transactions_SUITE.erl b/test/ct/fmke_antidote_transactions_SUITE.erl index be69c59..26ad8fc 100644 --- a/test/ct/fmke_antidote_transactions_SUITE.erl +++ b/test/ct/fmke_antidote_transactions_SUITE.erl @@ -1,9 +1,8 @@ %%%------------------------------------------------------------------- -%%% File : fmke_db_conn_manager_SUITE.erl +%%% File : fmke_antidote_transactions_SUITE.erl %%% Author : Gonçalo Tomás -%%% Description : Tests the behaviour of the DB connection manager -%%% under several scenarios. -%%% Created : Fri 9 Feb 2018 17:42 +%%% Description : Tests the behaviour of AntidoteDB transactions. +%%% Created : Mon 1 Oct 2018 17:42 %%%------------------------------------------------------------------- -module(fmke_antidote_transactions_SUITE). -include("fmke.hrl"). From 711d9b244d801c84f56f0a3d6118eddbf1ad5e6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 1 Oct 2018 18:27:35 +0100 Subject: [PATCH 7/8] remove impossible case from txn_abort result --- src/fmke_driver_opt_antidote.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/fmke_driver_opt_antidote.erl b/src/fmke_driver_opt_antidote.erl index 0e0c86c..be109ae 100644 --- a/src/fmke_driver_opt_antidote.erl +++ b/src/fmke_driver_opt_antidote.erl @@ -541,9 +541,7 @@ txn_commit_w_retry(Pid, Txn, MaxTry, MaxTry) -> case antidotec_pb:abort_transaction(Pid, Txn) of ok -> {error, transaction_aborted_max_commit_attempts}; - {ok, _} -> - {error, transaction_aborted_max_commit_attempts}; - Error -> + {error, Error} -> lager:error("Transaction ~p could not be aborted, error returned: ~p~n", [Txn, Error]), throw({error, transaction_failed_abort_failed}) end; From 7490548f3d932caf965a7c01cac5e43d68c360ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A7alo=20Tom=C3=A1s?= Date: Mon, 8 Oct 2018 13:50:39 +0100 Subject: [PATCH 8/8] suppress typing warning from commit_txn --- src/fmke_driver_opt_antidote.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/fmke_driver_opt_antidote.erl b/src/fmke_driver_opt_antidote.erl index be109ae..c8cdc70 100644 --- a/src/fmke_driver_opt_antidote.erl +++ b/src/fmke_driver_opt_antidote.erl @@ -8,6 +8,10 @@ -include("fmke.hrl"). -include("fmke_kv.hrl"). +%% antidotec_pb:commit_transaction has some typing issues. +%% Leave this here until sure that function will not return this error. +-dialyzer({no_match, txn_commit_w_retry/4}). + %% FMKE driver API -export([ start/1,