Skip to content
This repository has been archived by the owner on May 2, 2023. It is now read-only.

Commit

Permalink
Merge 57968a1 into 6f2f5d6
Browse files Browse the repository at this point in the history
  • Loading branch information
goncalotomas committed Oct 8, 2018
2 parents 6f2f5d6 + 57968a1 commit 60996f1
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 4 deletions.
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
]},
{test, [
{deps, [
{rand_str, "~>1.0"},
{cmd, "~>1.0"}
]}
]}
Expand Down
30 changes: 26 additions & 4 deletions src/fmke_driver_opt_antidote.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
-include("fmke.hrl").
-include("fmke_kv.hrl").

%% antidotec_pb:commit_transaction has some typing issues.
%% Leave this here until sure that function will not return this error.
-dialyzer({no_match, txn_commit_w_retry/4}).

%% FMKE driver API
-export([
start/1,
Expand Down Expand Up @@ -43,6 +47,7 @@

-define(SERVER, ?MODULE).

-define(ANTIDOTE_TRANSACTION_RETRIES, 3).
-define(MAP, antidote_crdt_map_go).
-define(LWWREG, antidote_crdt_register_lww).
-define(ORSET, antidote_crdt_set_aw).
Expand Down Expand Up @@ -490,6 +495,7 @@ process_get_request(Key, Type, Txn) ->
parse_read_result(get(Key, Type, Txn)).

parse_read_result({_Crdt, []}) -> {error, not_found};
parse_read_result({timeout, _}) -> {error, timeout};
parse_read_result({_Crdt, Object}) -> Object;
parse_read_result(_) -> erlang:error(unknown_object_type).

Expand Down Expand Up @@ -530,13 +536,29 @@ txn_update_objects(ObjectUpdates, {Pid, TxnDetails}) ->
%% A wrapper for Antidote's commit_transaction function
-spec txn_commit(TxnDetails :: txid()) -> ok | {error, term()}.
txn_commit({Pid, TxnDetails}) ->
Result = case antidotec_pb:commit_transaction(Pid, TxnDetails) of
{ok, _CommitTimestamp} -> ok;
Error -> Error
end,
Result = txn_commit_w_retry(Pid, TxnDetails, 0, ?ANTIDOTE_TRANSACTION_RETRIES),
fmke_db_conn_manager:checkin(Pid),
Result.

txn_commit_w_retry(Pid, Txn, MaxTry, MaxTry) ->
lager:error("Transaction ~p failed, aborting...~n"),
case antidotec_pb:abort_transaction(Pid, Txn) of
ok ->
{error, transaction_aborted_max_commit_attempts};
{error, Error} ->
lager:error("Transaction ~p could not be aborted, error returned: ~p~n", [Txn, Error]),
throw({error, transaction_failed_abort_failed})
end;

txn_commit_w_retry(Pid, Txn, CurrTry, MaxTry) ->
case antidotec_pb:commit_transaction(Pid, Txn) of
{ok, _} ->
ok;
{error, Error} ->
lager:warning("Transaction commit failed for ~p: ~p, retrying...~n", [Txn, Error]),
txn_commit_w_retry(Pid, Txn, CurrTry + 1, MaxTry)
end.


%% ------------------------------------------------------------------------------------------------
%% Simple API - Recommended way to interact with Antidote
Expand Down
169 changes: 169 additions & 0 deletions test/ct/fmke_antidote_transactions_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
%%%-------------------------------------------------------------------
%%% File : fmke_antidote_transactions_SUITE.erl
%%% Author : Gonçalo Tomás
%%% Description : Tests the behaviour of AntidoteDB transactions.
%%% Created : Mon 1 Oct 2018 17:42
%%%-------------------------------------------------------------------
-module(fmke_antidote_transactions_SUITE).
-include("fmke.hrl").

-compile([export_all, nowarn_export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

-define (NODENAME, 'fmke@127.0.0.1').
-define (COOKIE, fmke).

suite() ->
[{timetrap, {minutes, 5}}].

%%--------------------------------------------------------------------
%% Function: init_per_suite(Config0) ->
%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
%% Config0 = Config1 = [tuple()]
%% Reason = term()
%%--------------------------------------------------------------------
init_per_suite(Config) ->
{ok, _} = net_kernel:start(['fmke_antidote_ct@127.0.0.1']),
true = erlang:set_cookie('fmke_antidote_ct@127.0.0.1', ?COOKIE),
fmke_test_setup:start_node_with_antidote_backend(?NODENAME, true, non_nested),
true = erlang:set_cookie(?NODENAME, ?COOKIE),
Config.

%%--------------------------------------------------------------------
%% Function: end_per_suite(Config0) -> term() | {save_config,Config1}
%% Config0 = Config1 = [tuple()]
%%--------------------------------------------------------------------
end_per_suite(_Config) ->
fmke_test_setup:stop_node(?NODENAME),
fmke_test_setup:stop_all(),
net_kernel:stop(),
ok.

%%--------------------------------------------------------------------
%% Function: init_per_testcase(TestCase, Config0) ->
%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1}
%% TestCase = atom()
%% Config0 = Config1 = [tuple()]
%% Reason = term()
%%--------------------------------------------------------------------
init_per_testcase(_TestCase, Config) ->
Config.

%%--------------------------------------------------------------------
%% Function: end_per_testcase(TestCase, Config0) ->
%% term() | {save_config,Config1} | {fail,Reason}
%% TestCase = atom()
%% Config0 = Config1 = [tuple()]
%% Reason = term()
%%--------------------------------------------------------------------
end_per_testcase(_TestCase, _Config) ->
ok.

%%--------------------------------------------------------------------
%% Function: all() -> GroupsAndTestCases | {skip,Reason}
%% GroupsAndTestCases = [{group,GroupName} | TestCase]
%% GroupName = atom()
%% TestCase = atom()
%% Reason = term()
%%--------------------------------------------------------------------
all() ->
[
read_read_succeds
,read_write_succeeds
,write_write_aborts
].

read_read_succeds(_Config) ->
Key = list_to_binary(rand_str:get(64)),
%% add key to antidote
Pid = checkout_remote_pid(),
{ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}),
BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>},
Obj = antidotec_counter:increment(1, antidotec_counter:new()),
ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn),
{ok, _} = antidotec_pb:commit_transaction(Pid, Txn),
Pid1 = checkout_remote_pid(),
Pid2 = checkout_remote_pid(),
{ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}),
{ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}),
{ok, [Val1]} = antidotec_pb:read_objects(Pid1, [BoundObject], Txn1),
{ok, [Val2]} = antidotec_pb:read_objects(Pid2, [BoundObject], Txn2),
{ok, _} = antidotec_pb:commit_transaction(Pid1, Txn1),
{ok, _} = antidotec_pb:commit_transaction(Pid2, Txn2),
Value1 = antidotec_counter:value(Val1),
Value2 = antidotec_counter:value(Val2),
checkin_remote_pid(Pid),
checkin_remote_pid(Pid1),
checkin_remote_pid(Pid2),
?assertEqual(Value1, Value2).

read_write_succeeds(_Config) ->
Key = list_to_binary(rand_str:get(64)),
%% add key to antidote
Pid = checkout_remote_pid(),
{ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}),
BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>},
Obj = antidotec_counter:increment(1, antidotec_counter:new()),
ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn),
{ok, _} = antidotec_pb:commit_transaction(Pid, Txn),
Pid1 = checkout_remote_pid(),
Pid2 = checkout_remote_pid(),
{ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}),
{ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}),
{ok, [_Val1]} = antidotec_pb:read_objects(Pid1, [BoundObject], Txn1),
ObjUpdate = antidotec_counter:increment(1, Obj),
ok = antidotec_pb:update_objects(Pid2, antidotec_counter:to_ops(BoundObject, ObjUpdate), Txn2),
{ok, _} = antidotec_pb:commit_transaction(Pid1, Txn1),
{ok, _} = antidotec_pb:commit_transaction(Pid2, Txn2),
checkin_remote_pid(Pid),
checkin_remote_pid(Pid1),
checkin_remote_pid(Pid2),
ok.

write_write_aborts(_Config) ->
Key = list_to_binary(rand_str:get(64)),
%% add key to antidote
Pid = checkout_remote_pid(),
{ok, Txn} = antidotec_pb:start_transaction(Pid, ignore, {}),
BoundObject = {Key, antidote_crdt_counter_pn, <<"bucket">>},
Obj = antidotec_counter:increment(1, antidotec_counter:new()),
ok = antidotec_pb:update_objects(Pid, antidotec_counter:to_ops(BoundObject, Obj), Txn),
{ok, _} = antidotec_pb:commit_transaction(Pid, Txn),
Pid1 = checkout_remote_pid(),
Pid2 = checkout_remote_pid(),
{ok, Txn1} = antidotec_pb:start_transaction(Pid1, ignore, {}),
{ok, Txn2} = antidotec_pb:start_transaction(Pid2, ignore, {}),
ObjUpdate1 = antidotec_counter:increment(1, Obj),
ObjUpdate2 = antidotec_counter:increment(2, Obj),
ok = antidotec_pb:update_objects(Pid2, antidotec_counter:to_ops(BoundObject, ObjUpdate2), Txn2),
ok = antidotec_pb:update_objects(Pid1, antidotec_counter:to_ops(BoundObject, ObjUpdate1), Txn1),
%% check if both transactions committed successfully
case {antidotec_pb:commit_transaction(Pid1, Txn1), antidotec_pb:commit_transaction(Pid2, Txn2)} of
{{error, _}, {error, _}} ->
%% both failed
ok;
{{ok, _}, {error, _}} ->
%% one of them failed, the other succeeded
ok;
{{error, _}, {ok, _}} ->
%% one of them failed, the other succeeded
ok;
{{ok, _}, {ok, _}} ->
%% both transactions succeeded in an update to the same key at the same time
throw("write_write_transaction_succeded")
end,
checkin_remote_pid(Pid),
checkin_remote_pid(Pid1),
checkin_remote_pid(Pid2),
ok.

checkin_remote_pid(Pid) ->
rpc(fmke_db_conn_manager,checkout, [Pid]).

checkout_remote_pid() ->
rpc(fmke_db_conn_manager,checkout, []).

rpc(Mod, Fun, Args) ->
rpc:call(?NODENAME, Mod, Fun, Args).

0 comments on commit 60996f1

Please sign in to comment.