diff --git a/lib/mnesia/src/Makefile b/lib/mnesia/src/Makefile index 72aa054fb326..cac581fa0232 100644 --- a/lib/mnesia/src/Makefile +++ b/lib/mnesia/src/Makefile @@ -47,10 +47,12 @@ MODULES= \ mnesia_backend_type \ mnesia_backup \ mnesia_bup \ + mnesia_causal \ mnesia_checkpoint \ mnesia_checkpoint_sup \ mnesia_controller \ mnesia_dumper\ + mnesia_ec \ mnesia_event \ mnesia_ext_sup \ mnesia_frag \ @@ -58,11 +60,12 @@ MODULES= \ mnesia_index \ mnesia_kernel_sup \ mnesia_late_loader \ - mnesia_lib\ + mnesia_lib \ mnesia_loader \ mnesia_locker \ mnesia_log \ mnesia_monitor \ + mnesia_pawset \ mnesia_recover \ mnesia_registry \ mnesia_rpc \ diff --git a/lib/mnesia/src/mnesia.app.src b/lib/mnesia/src/mnesia.app.src index dfb5e82c9399..52dbe958c41e 100644 --- a/lib/mnesia/src/mnesia.app.src +++ b/lib/mnesia/src/mnesia.app.src @@ -7,10 +7,12 @@ mnesia_backend_type, mnesia_backup, mnesia_bup, + mnesia_causal, mnesia_checkpoint, mnesia_checkpoint_sup, mnesia_controller, mnesia_dumper, + mnesia_ec, mnesia_event, mnesia_ext_sup, mnesia_frag, @@ -23,6 +25,7 @@ mnesia_locker, mnesia_log, mnesia_monitor, + mnesia_pawset, mnesia_recover, mnesia_registry, mnesia_rpc, @@ -35,7 +38,9 @@ mnesia_tm ]}, {registered, [ + mnesia_causal, mnesia_dumper_load_regulator, + mnesia_ec, mnesia_event, mnesia_fallback, mnesia_controller, diff --git a/lib/mnesia/src/mnesia.erl b/lib/mnesia/src/mnesia.erl index bb5858b3f832..53e196366058 100644 --- a/lib/mnesia/src/mnesia.erl +++ b/lib/mnesia/src/mnesia.erl @@ -35,6 +35,7 @@ abort/1, transaction/1, transaction/2, transaction/3, sync_transaction/1, sync_transaction/2, sync_transaction/3, async_dirty/1, async_dirty/2, sync_dirty/1, sync_dirty/2, ets/1, ets/2, + async_ec/1, async_ec/2, sync_ec/1, sync_ec/2, activity/2, activity/3, activity/4, % Not for public use is_transaction/0, @@ -133,6 +134,8 @@ remote_dirty_select/2 % Not for public use ]). +-export_type([table/0]). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -include("mnesia.hrl"). @@ -326,9 +329,11 @@ ms() -> mnesia_app, mnesia_backup, mnesia_bup, + mnesia_causal, mnesia_checkpoint, mnesia_controller, mnesia_dumper, + mnesia_ec, mnesia_loader, mnesia_frag, mnesia_frag_hash, @@ -336,6 +341,7 @@ ms() -> mnesia_late_loader, mnesia_lib, mnesia_log, + mnesia_pawset, mnesia_registry, mnesia_schema, mnesia_snmp_hook, @@ -441,6 +447,16 @@ async_dirty(Fun) -> async_dirty(Fun, Args) -> non_transaction(get(mnesia_activity_state), Fun, Args, async_dirty, ?DEFAULT_ACCESS). +-spec async_ec(Fun) -> Res | no_return() when + Fun :: fun(() -> Res). +async_ec(Fun) -> + async_ec(Fun, []). + +-spec async_ec(Fun, [Arg::_]) -> Res | no_return() when + Fun :: fun((...) -> Res). +async_ec(Fun, Args) -> + non_transaction(get(mnesia_activity_state), Fun, Args, async_ec, mnesia_ec). + -spec sync_dirty(Fun) -> Res | no_return() when Fun :: fun(() -> Res). sync_dirty(Fun) -> @@ -451,6 +467,16 @@ sync_dirty(Fun) -> sync_dirty(Fun, Args) -> non_transaction(get(mnesia_activity_state), Fun, Args, sync_dirty, ?DEFAULT_ACCESS). +-spec sync_ec(Fun) -> Res | no_return() when + Fun :: fun(() -> Res). +sync_ec(Fun) -> + sync_ec(Fun, []). + +-spec sync_ec(Fun, [Arg::_]) -> Res | no_return() when + Fun :: fun((...) -> Res). +sync_ec(Fun, Args) -> + non_transaction(get(mnesia_activity_state), Fun, Args, sync_ec, mnesia_ec). + -spec ets(Fun) -> Res | no_return() when Fun :: fun(() -> Res). ets(Fun) -> @@ -491,6 +517,8 @@ activity(Kind, Fun, Args, Mod) -> ets -> non_transaction(State, Fun, Args, Kind, Mod); async_dirty -> non_transaction(State, Fun, Args, Kind, Mod); sync_dirty -> non_transaction(State, Fun, Args, Kind, Mod); + async_ec -> non_transaction(State, Fun, Args, Kind, mnesia_ec); + sync_ec -> non_transaction(State, Fun, Args, Kind, mnesia_ec); transaction -> wrap_trans(State, Fun, Args, infinity, Mod, async); {transaction, Retries} -> wrap_trans(State, Fun, Args, Retries, Mod, async); sync_transaction -> wrap_trans(State, Fun, Args, infinity, Mod, sync); diff --git a/lib/mnesia/src/mnesia.hrl b/lib/mnesia/src/mnesia.hrl index 3fcedbfd09d0..3280cee4b4a2 100644 --- a/lib/mnesia/src/mnesia.hrl +++ b/lib/mnesia/src/mnesia.hrl @@ -111,7 +111,9 @@ disc_copies = [], disc_only_copies = [], ext = [], - schema_ops = [] + schema_ops = [], + sender = node(), + ts = #{} }). -record(decision, {tid, diff --git a/lib/mnesia/src/mnesia_causal.erl b/lib/mnesia/src/mnesia_causal.erl new file mode 100644 index 000000000000..b463ba1be879 --- /dev/null +++ b/lib/mnesia/src/mnesia_causal.erl @@ -0,0 +1,223 @@ +-module(mnesia_causal). + +-export([start/0, get_ts/0, send_msg/0, rcv_msg/3, rcv_msg/4, compare_vclock/2, + vclock_leq/2, deliver_one/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +-export([new_clock/0, bot/0, is_bot/1]). +-export([get_buffered/0]). % for testing + +-export_type([vclock/0]). + +-include("mnesia.hrl"). + +-import(mnesia_lib, [important/2, dbg_out/2, verbose/2, warning/2]). + +-behaviour(gen_server). + +-type ord() :: lt | eq | gt | cc. +-type lc() :: integer(). +-type vclock() :: #{node() => lc()}. +-type msg() :: #commit{}. + +-record(state, {send_seq :: integer(), delivered :: vclock(), buffer :: [mmsg()]}). +-record(mmsg, + {tid :: pid(), tab :: mnesia:table(), msg :: msg(), from :: pid() | undefined}). + +-type mmsg() :: #mmsg{}. +-type state() :: #state{}. + +%% Helper + +-spec bot() -> vclock(). +bot() -> + #{}. + +-spec is_bot(vclock()) -> boolean(). +is_bot(Ts) when is_map(Ts) andalso map_size(Ts) == 0 -> + true; +is_bot(Ts) when is_map(Ts) -> + false; +is_bot(_) -> + error({badarg, bot}). + +%% public API + +start() -> + gen_server:start({local, ?MODULE}, ?MODULE, [], []). + +-spec get_ts() -> vclock(). +get_ts() -> + gen_server:call(?MODULE, get_ts). + +-spec send_msg() -> {node(), vclock()}. +send_msg() -> + gen_server:call(?MODULE, send_msg). + +-spec get_buffered() -> [mmsg()]. +get_buffered() -> + gen_server:call(?MODULE, get_buf). + +%% @doc receive an event from another node +%% @return {vclock(), [event()]} the new vector clock and events ready to +%% be delivered +%% @end +-spec rcv_msg(pid(), #commit{}, mnesia:table()) -> [{pid(), #commit{}, mnesia:table()}]. +rcv_msg(Tid, Commit, Tab) -> + MMsgs = + gen_server:call(?MODULE, + {receive_msg, + #mmsg{tid = Tid, + msg = Commit, + tab = Tab}}), + [{M#mmsg.tid, M#mmsg.msg, M#mmsg.tab} || M <- MMsgs]. + +-spec rcv_msg(pid(), #commit{}, mnesia:table(), pid()) -> + [{pid(), #commit{}, mnesia:table(), pid()}]. +rcv_msg(Tid, Commit, Tab, From) -> + MMsgs = + gen_server:call(?MODULE, + {receive_msg, + #mmsg{tid = Tid, + msg = Commit, + tab = Tab, + from = From}}), + [{M#mmsg.tid, M#mmsg.msg, M#mmsg.tab, M#mmsg.from} || M <- MMsgs]. + +-spec deliver_one(#commit{}) -> ok. +deliver_one(#commit{sender = Sender}) -> + gen_server:call(?MODULE, {deliver_one, Sender}). + +%% ====================gen_server callbacks==================== +init([Nodes]) -> + {ok, + #state{send_seq = 0, + delivered = new_clock(Nodes), + buffer = []}}; +init(_Args) -> + {ok, + #state{send_seq = 0, + delivered = new_clock(), + buffer = []}}. + +handle_call(send_msg, _From, State = #state{delivered = Delivered, send_seq = SendSeq}) -> + Deps = Delivered#{node() := SendSeq + 1}, + {reply, {node(), Deps}, State#state{send_seq = SendSeq + 1}}; +handle_call({deliver_one, Sender}, _From, State = #state{delivered = Delivered}) -> + NewDelivered = increment(Sender, Delivered), + {reply, ok, State#state{delivered = NewDelivered}}; +handle_call({receive_msg, MM = #mmsg{msg = #commit{}}}, _From, State = #state{}) -> + {NewState, Deliverable} = find_deliverable(MM, State), + {reply, Deliverable, NewState}; +handle_call(get_buf, _From, #state{buffer = Buffer} = State) -> + {reply, Buffer, State}; +handle_call(get_ts, _From, #state{delivered = D} = State) -> + {reply, D, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(Msg, State) -> + warning("unhandled message ~p~n", [Msg]), + {noreply, State}. + +%%% internal functions + +%% @doc +%% update the delivered vector clock and buffer +%% @returns {list(), [event()]} the new buffer and events ready to be delivered +%% where we guarantee that if the input message is deliverable then it is at the +%% tail of the list +%% @end +-spec find_deliverable(mmsg(), state()) -> {state(), [mmsg()]}. +find_deliverable(MM = #mmsg{msg = #commit{ts = Deps, sender = Sender}}, + State = #state{delivered = Delivered, buffer = Buffer}) -> + case msg_deliverable(Deps, Delivered, Sender) of + true -> + dbg_out("input message ~p deliverable~n", [MM]), + NewDelivered = increment(Sender, Delivered), + do_find_deliverable(State#state{delivered = NewDelivered, buffer = Buffer}, [MM]); + false -> + dbg_out("input message ~p not deliverable~n", [MM]), + {State, []} + end. + +-spec do_find_deliverable(state(), [mmsg()]) -> {state(), [mmsg()]}. +do_find_deliverable(State = #state{delivered = Delivered, buffer = Buff}, Deliverable) -> + {Dev, NDev} = + lists:partition(fun(#mmsg{msg = #commit{ts = Deps, sender = Sender}}) -> + msg_deliverable(Deps, Delivered, Sender) + end, + Buff), + case Dev of + [] -> + {State, Deliverable}; + Dev when length(Dev) > 0 -> + State2 = + lists:foldl(fun(#mmsg{msg = #commit{sender = Sender}}, StateIn) -> + Dev2 = increment(Sender, StateIn#state.delivered), + StateIn#state{delivered = Dev2} + end, + State, + Dev), + do_find_deliverable(State2#state{buffer = NDev}, Dev ++ Deliverable) + end. + +%% @doc Deps is the vector clock of the event +%% Delivered is the local vector clock +%% @end +-spec msg_deliverable(vclock(), vclock(), node()) -> boolean(). +msg_deliverable(Deps, Delivered, Sender) -> + OtherDeps = maps:remove(Sender, Deps), + OtherDelivered = maps:remove(Sender, Delivered), + maps:get(Sender, Deps) == maps:get(Sender, Delivered) + 1 + andalso vclock_leq(OtherDeps, OtherDelivered). + +-spec new_clock() -> vclock(). +new_clock() -> + new_clock([node() | nodes()]). + +new_clock(Nodes) -> + dbg_out("new clock with nodes ~p~n", [[node() | nodes()]]), + maps:from_keys(Nodes, 0). + +-spec increment(node(), vclock()) -> vclock(). +increment(Node, C) -> + C#{Node := maps:get(Node, C, 0) + 1}. + +%% @doc convenience function for comparing two vector clocks, true when VC1 <= VC2 +-spec vclock_leq(vclock(), vclock()) -> boolean(). +vclock_leq(VC1, VC2) -> + R = compare_vclock(VC1, VC2), + R =:= eq orelse R =:= lt. + +-spec compare_vclock(vclock(), vclock()) -> ord(). +compare_vclock(VC1, VC2) when map_size(VC1) == 0 andalso map_size(VC2) == 0 -> + eq; +compare_vclock(VC1, _VC2) when map_size(VC1) == 0 -> + lt; +compare_vclock(_VC1, VC2) when map_size(VC2) == 0 -> + gt; +compare_vclock(VClock1, VClock2) -> + AllKeys = + maps:keys( + maps:merge(VClock1, VClock2)), + {Ls, Gs} = + lists:foldl(fun(Key, {L, G}) -> + C1 = maps:get(Key, VClock1, 0), + C2 = maps:get(Key, VClock2, 0), + if C1 < C2 -> {L + 1, G}; + C1 > C2 -> {L, G + 1}; + true -> {L, G} + end + end, + {0, 0}, + AllKeys), + if Ls == 0 andalso Gs == 0 -> + eq; + Ls == 0 -> + gt; + Gs == 0 -> + lt; + true -> + cc + end. diff --git a/lib/mnesia/src/mnesia_ec.erl b/lib/mnesia/src/mnesia_ec.erl new file mode 100644 index 000000000000..f13478105fce --- /dev/null +++ b/lib/mnesia/src/mnesia_ec.erl @@ -0,0 +1,775 @@ +-module(mnesia_ec). + +-include("mnesia.hrl"). + +-import(mnesia_lib, [important/2, warning/2, dbg_out/2, verbose/2]). + +-export([lock/4, write/5, delete/5, delete_object/5, read/5, match_object/5, all_keys/4, + first/3, last/3, prev/4, next/4, index_match_object/6, index_read/6, table_info/4, + select/5]). +-export([start/0, init/1]). + +-record(prep, + {protocol = async_ec, + %% async_ec | sync_ec + records = [], + prev_tab = [], % initiate to a non valid table name + prev_types, + prev_snmp}). +-record(state, {supervisor}). + +val(Var) -> + case ?catch_val_and_stack(Var) of + {'EXIT', Stacktrace} -> + mnesia_lib:other_val(Var, Stacktrace); + Value -> + Value + end. + +has_var(Pat) -> + mnesia:has_var(Pat). + +start() -> + mnesia_monitor:start_proc(?MODULE, ?MODULE, init, [self()]). + +init(Parent) -> + register(?MODULE, self()), + process_flag(trap_exit, true), + process_flag(message_queue_data, off_heap), + case val(debug) of + Debug when Debug /= debug, Debug /= trace -> + ignore; + _ -> + mnesia_subscr:subscribe(whereis(mnesia_event), {table, schema}) + end, + proc_lib:init_ack(Parent, {ok, self()}), + doit_loop(#state{supervisor = Parent}). + +doit_loop(#state{} = State) -> + receive + {From, {async_ec, Tid, Commit, Tab}} -> + dbg_out("received async_ec: ~p~n", [{From, {async_ec, Tid, Commit, Tab}}]), + spawn(fun() -> receive_msg(Tid, Commit, Tab, {rcv, async}) end), + doit_loop(State); + {From, {sync_ec, Tid, Commit, Tab}} -> + dbg_out("received sync_ec: ~p~n", [{From, {sync_ec, Tid, Commit, Tab}}]), + receive_msg(Tid, Commit, Tab, {rcv, {sync, From}}), + doit_loop(State); + {'EXIT', Pid, Reason} -> + handle_exit(Pid, Reason, State); + Msg -> + verbose("** ERROR ** ~p got unexpected message: ~tp~n", [?MODULE, Msg]), + doit_loop(State) + end. + +%% mnesia_access API +%% +lock({SyncMode, _Pid}, _Ts, _LockItem, _LockKind) + when SyncMode =:= sync_ec orelse SyncMode =:= async_ec -> + []; +lock({SyncMode, _}, _Ts, _LockItem, _LockKind) -> + mnesia:abort({bad_type, SyncMode}). + +write({SyncMode, _Pid}, _Ts, Tab, Val, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso is_tuple(Val) + andalso tuple_size(Val) > 2 + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + do_ec_write(SyncMode, Tab, Val); +write({SyncMode, _}, _Ts, Tab, Val, LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab, Val, LockKind}). + +delete({SyncMode, _Pid}, _Ts, Tab, Key, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + do_ec_delete(SyncMode, Tab, Key); +delete({SyncMode, _}, _Ts, Tab, _Key, _LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +delete_object({SyncMode, _Pid}, _Ts, Tab, Val, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso is_tuple(Val) + andalso tuple_size(Val) > 2 + andalso (SyncMode =:= async_ec orelse SyncMode =:= sync_ec) -> + case has_var(Val) of + false -> + do_ec_delete_object(SyncMode, Tab, Val); + true -> + mnesia:abort({bad_type, Tab, Val}) + end; +delete_object({SyncMode, _}, _Ts, Tab, _Key, _LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +read({SyncMode, _Pid}, _Ts, Tab, Key, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + ec_read(Tab, Key); +read({SyncMode, _}, _Ts, Tab, _Key, _LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +match_object({SyncMode, _Pid}, _Ts, Tab, Pat, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso is_tuple(Pat) + andalso tuple_size(Pat) > 2 + andalso (SyncMode =:= async_ec orelse SyncMode =:= sync_ec) -> + ec_match_object(Tab, Pat); +match_object({SyncMode, _}, _Ts, Tab, Pat, _LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab, Pat}). + +all_keys({SyncMode, _Pid}, _Ts, Tab, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + Mod = get_crdt_module(Tab), + Mod:db_all_keys(Tab); +all_keys({SyncMode, _}, _Ts, Tab, _LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +first({SyncMode, _Pid}, _Ts, Tab) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + ec_first(Tab); +first({SyncMode, _}, _Ts, Tab) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +last({SyncMode, _Pid}, _Ts, Tab) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + ec_last(Tab); +last({SyncMode, _}, _Ts, Tab) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +prev({SyncMode, _Pid}, _Ts, Tab, Key) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + ec_prev(Tab, Key); +prev({SyncMode, _}, _Ts, Tab, _) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +next({SyncMode, _Pid}, _Ts, Tab, Key) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= sync_ec orelse SyncMode =:= async_ec) -> + ec_next(Tab, Key); +next({SyncMode, _}, _Ts, Tab, _) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +index_match_object({SyncMode, _Pid}, _Ts, Tab, Pat, Attr, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso is_tuple(Pat) + andalso tuple_size(Pat) > 2 + andalso (SyncMode =:= async_ec orelse SyncMode =:= sync_ec) -> + ec_index_match_object(Tab, Pat, Attr); +index_match_object({SyncMode, _}, _Ts, Tab, Pat, _Attr, _LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab, Pat}). + +index_read({SyncMode, _}, _Ts, Tab, Key, Attr, _LockKind) + when is_atom(Tab) + andalso Tab /= schema + andalso (SyncMode =:= async_ec orelse SyncMode =:= sync_ec) -> + Pos = mnesia_schema:attr_tab_to_pos(Tab, Attr), + Mod = get_crdt_module(Tab), + case has_var(Key) of + false -> + Mod:index_read(Tab, Key, Pos); + true -> + mnesia:abort({bad_type, Tab, Attr, Key}) + end; +index_read({SyncMode, _}, _Ts, Tab, _Key, _Attr, _LockKind) -> + mnesia:abort({bad_type, SyncMode, Tab}). + +select({SyncMode, _Pid}, _Ts, Tab, Spec, _LockKind) + when SyncMode =:= sync_ec orelse SyncMode =:= async_ec -> + ec_select(Tab, Spec); +select({SyncMode, _}, _Ts, _Tab, _Spec, _LockKind) -> + mnesia:abort({bad_type, SyncMode}). + +table_info({SyncMode, _Pid} = Tid, Ts, Tab, Item) + when SyncMode =:= sync_ec orelse SyncMode =:= async_ec -> + mnesia:table_info(Tid, Ts, Tab, Item); +table_info({SyncMode, _}, _Ts, _Tab, _Item) -> + mnesia:abort({bad_type, SyncMode}). + +-spec get_crdt_module(mnesia:table()) -> module(). +get_crdt_module(Tab) -> + case val({Tab, setorbag}) of + AWSet when AWSet =:= pawset orelse AWSet =:= pawbag -> + mnesia_pawset; + RWSet when RWSet =:= prwset orelse RWSet =:= prwbag -> + mnesia_prwset; + _ -> + error({bad_ec_tab_type, Tab}) + end. + +%% Private functions, copied or modified from mnesia.erl and mnesia_tm.erl + +%% =============== prepare and send =============== +do_ec_write(SyncMode, Tab, Val) + when is_atom(Tab), Tab /= schema, is_tuple(Val), tuple_size(Val) > 2 -> + {_, _, _} = mnesia_lib:validate_record(Tab, Val), + Oid = {Tab, element(2, Val)}, + ec(SyncMode, {Oid, Val, write}); +do_ec_write(_SyncMode, Tab, Val) -> + mnesia:abort({bad_type, Tab, Val}). + +do_ec_delete(SyncMode, Tab, Key) when is_atom(Tab), Tab /= schema -> + Oid = {Tab, Key}, + ec(SyncMode, {Oid, Oid, delete}); +do_ec_delete(_SyncMode, Tab, _Key) -> + mnesia:abort({bad_type, Tab}). + +do_ec_delete_object(SyncMode, Tab, Val) + when is_atom(Tab), Tab /= schema, is_tuple(Val), tuple_size(Val) > 2 -> + Oid = {Tab, element(2, Val)}, + ec(SyncMode, {Oid, Val, delete_object}); +do_ec_delete_object(_SyncMode, Tab, Val) -> + mnesia:abort({bad_type, Tab, Val}). + +ec(Protocol, Item) -> + {{Tab, Key}, _Val, _Op} = Item, + Tid = {ec, self()}, + Prep = prepare_items(Tid, Tab, Key, [Item], #prep{protocol = Protocol}), + CR = Prep#prep.records, + dbg_out("ec: ~p~n", [CR]), + case Protocol of + async_ec -> + ReadNode = val({Tab, where_to_read}), + {WaitFor, FirstRes} = async_send_ec(Tid, CR, Tab, ReadNode), + rec_ec(WaitFor, FirstRes); + sync_ec -> + %% Send commit records to the other involved nodes, + %% and wait for all nodes to complete + {WaitFor, FirstRes} = sync_send_ec(Tid, CR, Tab, []), + rec_ec(WaitFor, FirstRes) + end. + +%% @doc @returns a prep record with all items in reverse order +prepare_items(Tid, Tab, Key, Items, Prep) -> + Types = val({Tab, where_to_commit}), + case Types of + [] -> + mnesia:abort({no_exists, Tab}); + {blocked, _} -> + unblocked = req({unblock_me, Tab}), + prepare_items(Tid, Tab, Key, Items, Prep); + _ -> + Snmp = val({Tab, snmp}), + Recs2 = do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Prep#prep.records), + Prep#prep{records = Recs2, + prev_tab = Tab, + prev_types = Types, + prev_snmp = Snmp} + end. + +req(R) -> + case whereis(?MODULE) of + undefined -> + {error, {node_not_running, node()}}; + Pid -> + Ref = make_ref(), + Pid ! {{self(), Ref}, R}, + rec(Pid, Ref) + end. + +rec(Pid, Ref) -> + receive + {?MODULE, Ref, Reply} -> + Reply; + {'EXIT', Pid, _} -> + {error, {node_not_running, node()}} + end. + +do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs) -> + Recs2 = prepare_snmp(Tid, Tab, Key, Types, Snmp, Items, Recs), % May exit + Recs3 = prepare_nodes(Tid, Types, Items, Recs2, normal), + verbose("do prepare_items Rec3: ~p ~p ~p ~p~n", [Tid, Types, Items, Recs2]), + prepare_ts(Recs3). + +prepare_snmp(_Tid, _Tab, _Key, _Types, [], _Items, Recs) -> + Recs; +prepare_snmp(Tid, Tab, Key, Types, Us, Items, Recs) -> + if Key /= '_' -> + {_Oid, _Val, Op} = hd(Items), + SnmpOid = mnesia_snmp_hook:key_to_oid(Tab, Key, Us), % May exit + prepare_nodes(Tid, Types, [{Op, Tab, Key, SnmpOid}], Recs, snmp); + Key == '_' -> + prepare_nodes(Tid, Types, [{clear_table, Tab}], Recs, snmp) + end. + +%% Returns a list of commit records +prepare_nodes(Tid, [{Node, Storage} | Rest], Items, C, Kind) -> + {Rec, C2} = pick_node(Tid, Node, C, []), + Rec2 = prepare_node(Node, Storage, Items, Rec, Kind), + [Rec2 | prepare_nodes(Tid, Rest, Items, C2, Kind)]; +prepare_nodes(_Tid, [], _Items, CommitRecords, _Kind) -> + CommitRecords. + +pick_node(Tid, Node, [Rec | Rest], Done) -> + if Rec#commit.node == Node -> + {Rec, Done ++ Rest}; + true -> + pick_node(Tid, Node, Rest, [Rec | Done]) + end; +pick_node({ec, _}, Node, [], Done) -> + {#commit{decision = presume_commit, node = Node}, Done}. + +prepare_node(Node, Storage, [Item | Items], #commit{ext = Ext0} = Rec, Kind) + when Kind == snmp -> + Rec2 = + case lists:keytake(snmp, 1, Ext0) of + false -> + Rec#commit{ext = [{snmp, [Item]} | Ext0]}; + {_, {snmp, Snmp}, Ext} -> + Rec#commit{ext = [{snmp, [Item | Snmp]} | Ext]} + end, + prepare_node(Node, Storage, Items, Rec2, Kind); +prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema -> + Rec2 = + case Storage of + ram_copies -> + Rec#commit{ram_copies = [Item | Rec#commit.ram_copies]}; + disc_copies -> + Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]}; + disc_only_copies -> + Rec#commit{disc_only_copies = [Item | Rec#commit.disc_only_copies]}; + {ext, Alias, Mod} -> + Ext0 = Rec#commit.ext, + case lists:keytake(ext_copies, 1, Ext0) of + false -> + Rec#commit{ext = [{ext_copies, [{{ext, Alias, Mod}, Item}]} | Ext0]}; + {_, {_, EC}, Ext} -> + Rec#commit{ext = [{ext_copies, [{{ext, Alias, Mod}, Item} | EC]} | Ext]} + end + end, + prepare_node(Node, Storage, Items, Rec2, Kind); +prepare_node(_Node, _Storage, [], Rec, _Kind) -> + Rec. + +-spec prepare_ts([#commit{}]) -> [#commit{}]. +prepare_ts(Recs) -> + {Node, Ts} = mnesia_causal:send_msg(), + do_prepare_ts(lists:reverse(Recs), Node, Ts). + +%% Returns a list of commit record, with node and ts set +-spec do_prepare_ts([#commit{}], node(), mnesia_causal:vclock()) -> + [#commit{sender :: atom()}]. +do_prepare_ts([Hd | Tl], Node, Ts) -> + % we only add ts once, since we consider all copies in a commit as a whole + Commit = Hd#commit{sender = Node, ts = Ts}, + [Commit | do_prepare_ts(Tl, Node, Ts)]; +do_prepare_ts([], _Node, _Ts) -> + []. + +do_update_ts(ram_copies, Copy, Ts) -> + add_time(Copy, Ts); +do_update_ts(disc_copies, Copy, Ts) -> + add_time(Copy, Ts); +do_update_ts(disc_only_copies, Copy, Ts) -> + add_time(Copy, Ts); +do_update_ts({ext, _Alias, _Mod}, ExtCopy, Ts) -> + add_time(ExtCopy, Ts). + +add_time({Oid, Val, Op}, Ts) -> + {Oid, erlang:append_element(Val, Ts), Op}; +add_time({ExtInfo, {Oid, Val, Op}}, Ts) -> + {ExtInfo, {Oid, erlang:append_element(Val, Ts), Op}}. + +receive_msg(Tid, Commit, _Tab, local) -> + mnesia_causal:deliver_one(Commit), + do_ec(Tid, Commit); +receive_msg(Tid, Commit, Tab, {rcv, async}) -> + Deliverable = mnesia_causal:rcv_msg(Tid, Commit, Tab), + dbg_out("found async_ec devliverable commits: ~p~n", [Deliverable]), + lists:foreach(fun({Tid1, Commit1, Tab1}) -> + do_async_ec(Tid1, mnesia_tm:new_cr_format(Commit1), Tab1) + end, + Deliverable); +receive_msg(Tid, Commit, Tab, {rcv, {sync, From}}) -> + Deliverable = mnesia_causal:rcv_msg(Tid, Commit, Tab, From), + dbg_out("found sync_ec devliverable commits: ~p~n", [Deliverable]), + lists:foreach(fun({Tid1, Commit1 = #commit{}, Tab1, From1}) -> + do_sync_ec(From1, Tid1, mnesia_tm:new_cr_format(Commit1), Tab1) + end, + Deliverable). + +sync_send_ec(Tid, [Head | Tail], Tab, WaitFor) -> + Node = Head#commit.node, + if Node == node() -> + % if the node we want to deliver to is local, we deliver it directly + {WF, _} = sync_send_ec(Tid, Tail, Tab, WaitFor), + Res = receive_msg(Tid, Head, Tab, local), + {WF, Res}; + true -> + % otherwise we need to send it and wait for ack + {?MODULE, Node} ! {self(), {sync_ec, Tid, Head, Tab}}, + sync_send_ec(Tid, Tail, Tab, [Node | WaitFor]) + end; +sync_send_ec(_Tid, [], _Tab, WaitFor) -> + {WaitFor, {'EXIT', {aborted, {node_not_running, WaitFor}}}}. + +%% @returns {WaitFor, Res} +async_send_ec(_Tid, _Committs, Tab, nowhere) -> + {[], {'EXIT', {aborted, {no_exists, Tab}}}}; +async_send_ec(Tid, Commits, Tab, ReadNode) -> + async_send_ec(Tid, Commits, Tab, ReadNode, [], ok). + +async_send_ec(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) -> + dbg_out("async_send_ec Nodes: ~p~n", [[Head | Tail]]), + Node = Head#commit.node, + if ReadNode == Node, Node == node() -> + NewRes = receive_msg(Tid, Head, Tab, local), + async_send_ec(Tid, Tail, Tab, ReadNode, WaitFor, NewRes); + ReadNode == Node -> + % if the readnode is not local, we need to send it to the readnode and + % _wait_ for it, note the sync_ec + % this might happen when we are not a mnemia node + {?MODULE, Node} ! {self(), {sync_ec, Tid, Head, Tab}}, + NewRes = {'EXIT', {aborted, {node_not_running, Node}}}, + async_send_ec(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes); + true -> + {?MODULE, Node} ! {self(), {async_ec, Tid, Head, Tab}}, + dbg_out("sending ~p to ~p~n", [{async_ec, Tid, Head, Tab}, {?MODULE, Node}]), + async_send_ec(Tid, Tail, Tab, ReadNode, WaitFor, Res) + end; +async_send_ec(_Tid, [], _Tab, _ReadNode, WaitFor, Res) -> + {WaitFor, Res}. + +rec_ec([Node | Tail], Res) when Node /= node() -> + NewRes = get_ec_reply(Node, Res), + rec_ec(Tail, NewRes); +rec_ec([], Res) -> + Res. + +get_ec_reply(Node, Res) -> + receive + {?MODULE, Node, {'EXIT', Reason}} -> + {'EXIT', {aborted, {badarg, Reason}}}; + {?MODULE, Node, {ec_res, ok}} -> + case Res of + {'EXIT', {aborted, {node_not_running, _Node}}} -> + ok; + _ -> + %% Prioritize bad results, but node_not_running + Res + end; + {?MODULE, Node, {ec_res, Reply}} -> + Reply; + {mnesia_down, Node} -> + case get(mnesia_activity_state) of + {_, Tid, _Ts} when element(1, Tid) == tid -> + %% Hmm dirty called inside a transaction, to avoid + %% hanging transaction we need to restart the transaction + mnesia:abort({node_not_running, Node}); + _ -> + %% It's ok to ignore mnesia_down's since we will make + %% the replicas consistent again when Node is started + Res + end + after 1000 -> + case lists:member(Node, val({current, db_nodes})) of + true -> + get_ec_reply(Node, Res); + false -> + Res + end + end. + +%%% =============== Receiving and update =================== +do_async_ec(Tid, Commit, _Tab) -> + ?eval_debug_fun({?MODULE, async_ec, pre}, [{tid, Tid}]), + do_ec(Tid, Commit), + ?eval_debug_fun({?MODULE, async_ec, post}, [{tid, Tid}]). + +do_sync_ec(From, Tid, Commit, _Tab) -> + ?eval_debug_fun({?MODULE, sync_ec, pre}, [{tid, Tid}]), + Res = do_ec(Tid, Commit), + ?eval_debug_fun({?MODULE, sync_ec, post}, [{tid, Tid}]), + From ! {?MODULE, node(), {ec_res, Res}}. + +do_ec(Tid, Commit) when Commit#commit.schema_ops == [] -> + mnesia_log:log(Commit), + do_commit(Tid, Commit). + +do_commit(Tid, C) -> + do_commit(Tid, C, optional). + +do_commit(Tid, C, DumperMode) -> + mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode), + R = mnesia_tm:do_snmp(Tid, proplists:get_value(snmp, C#commit.ext, [])), + R2 = do_update(Tid, ram_copies, C#commit.ram_copies, C#commit.ts, R), + R3 = do_update(Tid, disc_copies, C#commit.disc_copies, C#commit.ts, R2), + R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, C#commit.ts, R3), + R5 = do_update_ext(Tid, C#commit.ext, C#commit.ts, R4), + mnesia_subscr:report_activity(Tid), + R5. + +%% This could/should be optimized +do_update_ext(_Tid, [], _Ts, OldRes) -> + OldRes; +do_update_ext(Tid, Ext, Ts, OldRes) -> + case lists:keyfind(ext_copies, 1, Ext) of + false -> + OldRes; + {_, Ops} -> + Do = fun({{ext, _, _} = Storage, Op}, R) -> do_update(Tid, Storage, [Op], Ts, R) end, + lists:foldl(Do, OldRes, Ops) + end. + +%% Update the items +do_update(Tid, Storage, [Op1 | Ops], Ts, OldRes) -> + Op = do_update_ts(Storage, Op1, Ts), + try do_update_op(Tid, Storage, Op) of + ok -> + do_update(Tid, Storage, Ops, Ts, OldRes); + NewRes -> + do_update(Tid, Storage, Ops, Ts, NewRes) + catch + _:Reason:ST -> + %% This may only happen when we recently have + %% deleted our local replica, changed storage_type + %% or transformed table + %% BUGBUG: Updates may be lost if storage_type is changed. + %% Determine actual storage type and try again. + %% BUGBUG: Updates may be lost if table is transformed. + verbose("do_update in ~w failed: ~tp -> {'EXIT', ~tp}~n", [Tid, Op, {Reason, ST}]), + do_update(Tid, Storage, Ops, Ts, OldRes) + end; +do_update(_Tid, _Storage, [], _Ts, Res) -> + Res. + +do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) -> + commit_write(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, Obj, undefined), + Mod = get_crdt_module(Tab), + Mod:db_put(Storage, Tab, Obj); +do_update_op(Tid, Storage, {{Tab, K}, Obj, delete}) -> + % note here parameter is Obj rather than Val, this is mostly better since we + % can always extract the key from the object + % we send Obj instead of Key for processing + commit_delete(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, Obj, undefined), + Mod = get_crdt_module(Tab), + Mod:db_erase(Storage, Tab, Obj); +do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) -> + {NewObj, OldObjs} = + try + NewVal = mnesia_lib:db_update_counter(Storage, Tab, K, Incr), + true = is_integer(NewVal) andalso NewVal >= 0, + {{RecName, K, NewVal}, [{RecName, K, NewVal - Incr}]} + catch + error:_ when Incr > 0 -> + New = {RecName, K, Incr}, + mnesia_lib:db_put(Storage, Tab, New), + {New, []}; + error:_ -> + Zero = {RecName, K, 0}, + mnesia_lib:db_put(Storage, Tab, Zero), + {Zero, []} + end, + commit_update(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, NewObj, OldObjs), + element(3, NewObj); +do_update_op(Tid, Storage, {{Tab, Key}, Obj, delete_object}) -> + commit_del_object(?catch_val({Tab, commit_work}), Tid, Storage, Tab, Key, Obj), + Mod = get_crdt_module(Tab), + Mod:db_match_erase(Storage, Tab, Obj); +do_update_op(Tid, Storage, {{Tab, Key}, Obj, clear_table}) -> + commit_clear(?catch_val({Tab, commit_work}), Tid, Storage, Tab, Key, Obj), + mnesia_lib:db_match_erase(Storage, Tab, Obj). + +commit_write([], _, _, _, _, _, _) -> + ok; +commit_write([{checkpoints, CpList} | R], Tid, Storage, Tab, K, Obj, Old) -> + mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList), + commit_write(R, Tid, Storage, Tab, K, Obj, Old); +commit_write([H | R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == subscribers -> + mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old), + commit_write(R, Tid, Storage, Tab, K, Obj, Old); +commit_write([H | R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == index -> + mnesia_index:add_index(H, Storage, Tab, K, Obj, Old), + commit_write(R, Tid, Storage, Tab, K, Obj, Old). + +commit_update([], _, _, _, _, _, _) -> + ok; +commit_update([{checkpoints, CpList} | R], Tid, Storage, Tab, K, Obj, _) -> + Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList), + commit_update(R, Tid, Storage, Tab, K, Obj, Old); +commit_update([H | R], Tid, Storage, Tab, K, Obj, Old) + when element(1, H) == subscribers -> + mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old), + commit_update(R, Tid, Storage, Tab, K, Obj, Old); +commit_update([H | R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == index -> + mnesia_index:add_index(H, Storage, Tab, K, Obj, Old), + commit_update(R, Tid, Storage, Tab, K, Obj, Old). + +commit_delete([], _, _, _, _, _, _) -> + ok; +commit_delete([{checkpoints, CpList} | R], Tid, Storage, Tab, K, Obj, _) -> + Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete, CpList), + commit_delete(R, Tid, Storage, Tab, K, Obj, Old); +commit_delete([H | R], Tid, Storage, Tab, K, Obj, Old) + when element(1, H) == subscribers -> + mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete, Old), + commit_delete(R, Tid, Storage, Tab, K, Obj, Old); +commit_delete([H | R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == index -> + mnesia_index:delete_index(H, Storage, Tab, K), + commit_delete(R, Tid, Storage, Tab, K, Obj, Old). + +commit_del_object([], _, _, _, _, _) -> + ok; +commit_del_object([{checkpoints, CpList} | R], Tid, Storage, Tab, K, Obj) -> + mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList), + commit_del_object(R, Tid, Storage, Tab, K, Obj); +commit_del_object([H | R], Tid, Storage, Tab, K, Obj) when element(1, H) == subscribers -> + mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object), + commit_del_object(R, Tid, Storage, Tab, K, Obj); +commit_del_object([H | R], Tid, Storage, Tab, K, Obj) when element(1, H) == index -> + mnesia_index:del_object_index(H, Storage, Tab, K, Obj), + commit_del_object(R, Tid, Storage, Tab, K, Obj). + +commit_clear([], _, _, _, _, _) -> + ok; +commit_clear([{checkpoints, CpList} | R], Tid, Storage, Tab, K, Obj) -> + mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList), + commit_clear(R, Tid, Storage, Tab, K, Obj); +commit_clear([H | R], Tid, Storage, Tab, K, Obj) when element(1, H) == subscribers -> + mnesia_subscr:report_table_event(H, Tab, Tid, Obj, clear_table, undefined), + commit_clear(R, Tid, Storage, Tab, K, Obj); +commit_clear([H | R], Tid, Storage, Tab, K, Obj) when element(1, H) == index -> + mnesia_index:clear_index(H, Tab, K, Obj), + commit_clear(R, Tid, Storage, Tab, K, Obj). + +%% =============== read operations =============== + +ec_rpc(Tab, M, F, Args) -> + Node = val({Tab, where_to_read}), + do_ec_rpc(Tab, Node, M, F, Args). + +do_ec_rpc(_Tab, nowhere, _, _, Args) -> + mnesia:abort({no_exists, Args}); +do_ec_rpc(_Tab, Local, M, F, Args) when Local =:= node() -> + try + apply(M, F, Args) + catch + Res -> + Res; + _:_ -> + mnesia:abort({badarg, Args}) + end; +do_ec_rpc(Tab, Node, M, F, Args) -> + case mnesia_rpc:call(Node, M, F, Args) of + {badrpc, Reason} -> + timer:sleep(20), %% Do not be too eager, and can't use yield on SMP + %% Sync with mnesia_monitor + _ = try + sys:get_status(mnesia_monitor) + catch + _:_ -> + ok + end, + case mnesia_controller:call({check_w2r, Node, Tab}) % Sync + of + NewNode when NewNode =:= Node -> + ErrorTag = mnesia_lib:dirty_rpc_error_tag(Reason), + mnesia:abort({ErrorTag, Args}); + NewNode -> + case get(mnesia_activity_state) of + {_Mod, Tid, _Ts} when is_record(Tid, tid) -> + %% In order to perform a consistent + %% retry of a transaction we need + %% to acquire the lock on the NewNode. + %% In this context we do neither know + %% the kind or granularity of the lock. + %% --> Abort the transaction + mnesia:abort({node_not_running, Node}); + {error, {node_not_running, _}} -> + %% Mnesia is stopping + mnesia:abort({no_exists, Args}); + _ -> + %% Splendid! A dirty retry is safe + %% 'Node' probably went down now + %% Let mnesia_controller get broken link message first + do_ec_rpc(Tab, NewNode, M, F, Args) + end + end; + Other -> + Other + end. + +ec_read(Tab, Key) -> + Mod = get_crdt_module(Tab), + ec_rpc(Tab, Mod, db_get, [Tab, Key]). + +ec_select(Tab, Spec) -> + Mod = get_crdt_module(Tab), + ec_rpc(Tab, Mod, remote_ec_select, [Tab, Spec]). + +ec_first(Tab) -> + Mod = get_crdt_module(Tab), + ec_rpc(Tab, Mod, db_first, [Tab]). + +ec_last(Tab) -> + Mod = get_crdt_module(Tab), + ec_rpc(Tab, Mod, db_last, [Tab]). + +ec_prev(Tab, Key) -> + Mod = get_crdt_module(Tab), + ec_rpc(Tab, Mod, db_prev_key, [Tab, Key]). + +ec_next(Tab, Key) -> + Mod = get_crdt_module(Tab), + ec_rpc(Tab, Mod, db_next_key, [Tab, Key]). + +-spec ec_match_object(Tab, Pattern) -> [Record] + when Tab :: mnesia:table(), + Pattern :: tuple(), + Record :: tuple(). +ec_match_object(Tab, Pat) + when is_atom(Tab), Tab /= schema, is_tuple(Pat), tuple_size(Pat) > 2 -> + Mod = get_crdt_module(Tab), + ec_rpc(Tab, Mod, remote_match_object, [Tab, Pat]); +ec_match_object(Tab, Pat) -> + mnesia:abort({bad_type, Tab, Pat}). + +ec_index_match_object(Tab, Pat, Attr) + when is_atom(Tab), Tab /= schema, is_tuple(Pat), tuple_size(Pat) > 2 -> + Mod = get_crdt_module(Tab), + case mnesia_schema:attr_tab_to_pos(Tab, Attr) of + {_} -> + ec_match_object(Tab, Pat); + Pos when Pos =< tuple_size(Pat) -> + case has_var(element(2, Pat)) of + false -> + ec_match_object(Tab, Pat); + true -> + Elem = element(Pos, Pat), + case has_var(Elem) of + false -> + ec_rpc(Tab, Mod, index_match_object, [Tab, Pat, Pos]); + true -> + mnesia:abort({bad_type, Tab, Attr, Elem}) + end + end; + BadPos -> + mnesia:abort({bad_type, Tab, BadPos}) + end; +ec_index_match_object(Tab, Pat, _Attr) -> + mnesia:abort({bad_type, Tab, Pat}). + +%% =============== stop operations =============== + +handle_exit(Pid, _Reason, State) when node(Pid) /= node() -> + %% We got exit from a remote fool + doit_loop(State); +handle_exit(Pid, _Reason, State) when Pid == State#state.supervisor -> + %% Our supervisor has died, time to stop + do_stop(State). + +do_stop(#state{}) -> + exit(shutdown). diff --git a/lib/mnesia/src/mnesia_kernel_sup.erl b/lib/mnesia/src/mnesia_kernel_sup.erl index 339f7d3bb233..3c17fc9edff3 100644 --- a/lib/mnesia/src/mnesia_kernel_sup.erl +++ b/lib/mnesia/src/mnesia_kernel_sup.erl @@ -42,6 +42,8 @@ init([]) -> worker_spec(mnesia_locker, timer:seconds(3), ProcLib), worker_spec(mnesia_recover, timer:minutes(3), [gen_server]), worker_spec(mnesia_tm, timer:seconds(30), ProcLib), + worker_spec(mnesia_causal, timer:seconds(3), [gen_server]), + worker_spec(mnesia_ec, timer:seconds(3), ProcLib), worker_spec(mnesia_rpc, timer:seconds(3), [gen_server]), supervisor_spec(mnesia_checkpoint_sup), worker_spec(mnesia_controller, timer:seconds(3), [gen_server]), diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl index dbd4a9c42f5d..27ad69033451 100644 --- a/lib/mnesia/src/mnesia_monitor.erl +++ b/lib/mnesia/src/mnesia_monitor.erl @@ -99,8 +99,14 @@ init() -> mnesia_down(From, Node) -> cast({mnesia_down, From, Node}). +mktab(Tab, Args = [_, _, _, Type | _]) + when Type =:= pawset orelse Type =:= pawbag -> + mnesia_pawset:mktab(Tab, Args); mktab(Tab, Args) -> unsafe_call({mktab, Tab, Args}). +unsafe_mktab(Tab, [_, _, _, Type | _] = Args) + when Type =:= pawset orelse Type =:= pawbag -> + mnesia_pawset:unsafe_mktab(Tab, Args); unsafe_mktab(Tab, Args) -> unsafe_call({unsafe_mktab, Tab, Args}). diff --git a/lib/mnesia/src/mnesia_pawset.erl b/lib/mnesia/src/mnesia_pawset.erl new file mode 100644 index 000000000000..3fa6f85b7d55 --- /dev/null +++ b/lib/mnesia/src/mnesia_pawset.erl @@ -0,0 +1,346 @@ +%% +%% This module implements the pure operation-based add-wins set (pawset) based +%% on the %% paper by Baquero et al. 2017. It is used to achieve eventual +%% consistency with the new (a)sync_ec API. + +-module(mnesia_pawset). + +-include("mnesia.hrl"). + +-export([db_put/3, db_erase/3, db_get/2, db_first/1, db_last/1, db_next_key/2, + db_select/2, db_prev_key/2, db_all_keys/1, db_match_erase/3]). +-export([mktab/2, unsafe_mktab/2]). +-export([remote_match_object/2, remote_ec_select/2]). +-export([index_match_object/3, index_read/3]). + +-type op() :: write | delete. +-type val() :: any(). +-type ts() :: mnesia_causal:vclock(). +-type element() :: {ts(), {op(), val()}}. +-type storage() :: ram_copies | disk_copies | disc_only_copies | {ext, atom(), atom()}. + +-import(mnesia_lib, [important/2, dbg_out/2, verbose/2, warning/2]). + +val(Var) -> + case ?catch_val_and_stack(Var) of + {'EXIT', Stacktrace} -> + mnesia_lib:other_val(Var, Stacktrace); + Value -> + Value + end. + +has_var(Pat) -> + mnesia:has_var(Pat). + +mktab(Tab, [{keypos, 2}, public, named_table, Type | EtsOpts]) + when Type =:= pawset orelse Type =:= pawbag -> + Args1 = [{keypos, 2}, public, named_table, bag | EtsOpts], + mnesia_monitor:mktab(Tab, Args1). + +unsafe_mktab(Tab, [{keypos, 2}, public, named_table, Type | EtsOpts]) + when Type =:= pawset orelse Type =:= pawbag -> + Args1 = [{keypos, 2}, public, named_table, bag | EtsOpts], + mnesia_monitor:unsafe_mktab(Tab, Args1). + +%%==================== writes ==================== +-spec effect(storage(), mnesia:table(), tuple()) -> ok. +effect(Storage, Tab, Tup) -> + case causal_compact(Storage, Tab, obj2ele(Tup)) of + true -> + mnesia_lib:db_put(Storage, Tab, remove_op(Tup)), + ok; + false -> + ok + end. + +db_put(Storage, Tab, Obj) when is_map(element(tuple_size(Obj), Obj)) -> + try + Tup = add_op(Obj, write), + dbg_out("inserting ~p into ~p~n", [Tup, Tab]), + effect(Storage, Tab, Tup) + catch + _:Reason -> + warning("CRASH ~p ~p~n", [Reason, Tab]) + end; +db_put(_S, _T, Obj) -> + error("bad tuple ~p, no ts at the end~n", [Obj]). + +db_erase(Storage, Tab, Obj) when is_map(element(tuple_size(Obj), Obj)) -> + Tup = add_op(Obj, delete), + effect(Storage, Tab, Tup); +db_erase(_S, _T, Obj) -> + error("bad tuple ~p, no ts at the end~n", [Obj]). + +% used for delete_object, hence pattern cannot be a pattern +db_match_erase(Storage, Tab, Pat) -> + db_erase(Storage, Tab, Pat). + +%%==================== reads ==================== +db_get(Tab, Key) -> + Tups = mnesia_lib:db_get(Tab, Key), + Res = [remove_ts(Tup) || Tup <- Tups], + uniq(Tab, Res). + +db_all_keys(Tab) when is_atom(Tab), Tab /= schema -> + Pat0 = val({Tab, wild_pattern}), + Pat = setelement(2, Pat0, '$1'), + Keys = db_select(Tab, [{Pat, [], ['$1']}]), + case val({Tab, setorbag}) of + pawbag -> + mnesia_lib:uniq(Keys); + pawset -> + Keys; + Other -> + error({incorrect_ec_table_type, Other}) + end. + +db_match_object(Tab, Pat0) -> + Pat = wild_ts(Pat0), + Res = mnesia_lib:db_match_object(Tab, Pat), + case val({Tab, setorbag}) of + pawset -> + Res1 = [remove_ts(Tup) || Tup <- Res], + uniq(Tab, Res1); + pawbag -> + [remove_ts(Tup) || Tup <- Res]; + Other -> + error({bad_val, Other}) + end. + +db_first(Tab) -> + mnesia_lib:db_first(Tab). + +db_last(Tab) -> + mnesia_lib:db_last(Tab). + +db_next_key(Tab, Key) -> + mnesia_lib:db_next_key(Tab, Key). + +db_prev_key(Tab, Key) -> + mnesia_lib:db_prev_key(Tab, Key). + +db_select(Tab, Spec) -> + Spec1 = [{wild_ts(MatchHead), Guards, Results} || {MatchHead, Guards, Results} <- Spec], + mnesia_lib:db_select(Tab, Spec1). + +index_read(Tab, Key, Attr) -> + Tups = mnesia_index:dirty_read(Tab, Key, Attr), + [remove_ts(Tup) || Tup <- Tups]. + +index_match_object(Tab, Pat, Pos) -> + Pat2 = wild_ts(Pat), + Tups = mnesia_index:dirty_match_object(Tab, Pat2, Pos), + [remove_ts(Tup) || Tup <- Tups]. + +uniq(Tab, Res) -> + case val({Tab, setorbag}) of + pawset -> + Res1 = mnesia_lib:uniq(Res), + resolve_cc_add(Res1); + pawbag -> + Res; + Other -> + error({bad_val, Other}) + end. + +-spec resolve_cc_add([element()]) -> [element()]. +resolve_cc_add(Res) when length(Res) =< 1 -> + Res; +resolve_cc_add(Res) -> + dbg_out("selecting minimum element from ~p to resolve concurrent addition~n", [Res]), + [lists:min(Res)]. + +remote_match_object(Tab, Pat) -> + Key = element(2, Pat), + case has_var(Key) of + false -> + db_match_object(Tab, Pat); + true -> + PosList = regular_indexes(Tab), + remote_match_object(Tab, Pat, PosList) + end. + +remote_match_object(Tab, Pat, [Pos | Tail]) when Pos =< tuple_size(Pat) -> + IxKey = element(Pos, Pat), + case has_var(IxKey) of + false -> + Tups = mnesia_index:dirty_match_object(Tab, wild_ts(Pat), Pos), + [remove_ts(Tup) || Tup <- Tups]; + true -> + remote_match_object(Tab, Pat, Tail) + end; +remote_match_object(Tab, Pat, []) -> + db_match_object(Tab, Pat); +remote_match_object(Tab, Pat, _PosList) -> + mnesia:abort({bad_type, Tab, Pat}). + +remote_ec_select(Tab, Spec) -> + case Spec of + [{HeadPat, _, _}] when is_tuple(HeadPat), tuple_size(HeadPat) > 2 -> + Key = element(2, HeadPat), + case has_var(Key) of + false -> + db_select(Tab, Spec); + true -> + PosList = regular_indexes(Tab), + remote_ec_select(Tab, Spec, PosList) + end; + _ -> + db_select(Tab, Spec) + end. + +remote_ec_select(Tab, [{HeadPat, _, _}] = Spec, [Pos | Tail]) + when is_tuple(HeadPat), tuple_size(HeadPat) > 2, Pos =< tuple_size(HeadPat) -> + Key = element(Pos, HeadPat), + case has_var(Key) of + false -> + Recs = mnesia_index:dirty_select(Tab, wild_ts(HeadPat), Pos), + Spec2 = [{wild_ts(MatchHead), Guards, Results} || {MatchHead, Guards, Results} <- Spec], + CMS = ets:match_spec_compile(Spec2), + ets:match_spec_run(Recs, CMS); + true -> + remote_ec_select(Tab, Spec, Tail) + end; +remote_ec_select(Tab, Spec, _) -> + db_select(Tab, Spec). + +regular_indexes(Tab) -> + PosList = val({Tab, index}), + [P || P <- PosList, is_integer(P)]. + +%%% ==========pure op-based awset implementation========== + +%% @returns whether this element should be added +-spec causal_compact(storage(), mnesia:table(), element()) -> boolean(). +causal_compact(Storage, Tab, Ele) -> + {Pid1, Mref1} = + spawn_monitor(fun() -> + remove_obsolete(Storage, Tab, Ele), + receive {Parent, Mref} -> Parent ! {Mref, {obsolete, ok}} end + end), + {Pid2, Mref2} = + spawn_monitor(fun() -> + Red = redundant(Storage, Tab, Ele), + receive {Parent, Mref} -> Parent ! {Mref, {redundancy, Red}} end + end), + Parent = self(), + Pid1 ! {Parent, Mref1}, + Pid2 ! {Parent, Mref2}, + Red1 = wait_causal_compact(Mref1), + Red2 = wait_causal_compact(Mref2), + Red1 andalso Red2. + +wait_causal_compact(Mref) -> + receive + {Mref, {obsolete, ok}} -> + erlang:demonitor(Mref, [flush]), + true; + {Mref, {redundancy, Red}} -> + erlang:demonitor(Mref, [flush]), + not Red; + {'DOWN', Mref, _, _, Reason} -> + {error, Reason} + end. + +%% @doc checks whether the input element is redundant +%% i.e. if there are any other elements in the table that obsoletes this element +%% @end +-spec redundant(storage(), mnesia:table(), element()) -> boolean(). +redundant(_Storage, _Tab, {_Ts, {delete, _Val}}) -> + true; +redundant(Storage, Tab, Ele) -> + Key = get_val_key(Tab, get_val_ele(Ele)), + Tups = [add_op(Tup, write) || Tup <- mnesia_lib:db_get(Storage, Tab, Key)], + Eles2 = lists:map(fun obj2ele/1, Tups), + lists:any(fun(Ele2) -> obsolete(Tab, Ele, Ele2) end, Eles2). + +%% @edoc removes elements that are obsoleted by Ele +-spec remove_obsolete(storage(), mnesia:table(), element()) -> ok. +remove_obsolete(Storage, Tab, Ele) -> + Key = get_val_key(Tab, get_val_ele(Ele)), + case mnesia_lib:db_get(Storage, Tab, Key) of + [] -> + ok; + Tups when length(Tups) > 0 -> + Tups2 = [add_op(Tup, write) || Tup <- Tups], + Keep = lists:filter(fun(Tup) -> not obsolete(Tab, obj2ele(Tup), Ele) end, Tups2), + Keep2 = [remove_op(Tup) || Tup <- Keep], + mnesia_lib:db_erase(Storage, Tab, Key), + mnesia_lib:db_put(Storage, Tab, Keep2), + ok + end. + +key_pos(Tab) -> + case val({Tab, storage_type}) of + ram_copies -> + ?ets_info(Tab, keypos); + disc_copies -> + ?ets_info(Tab, keypos); + disc_only_copies -> + dets:info(Tab, keypos); + {ext, Mod, Alias} -> + Mod:info(Alias, keypos) + end. + +%% @returns true if second element obsoletes the first one +-spec obsolete(mnesia:table(), {ts(), {op(), val()}}, {ts(), {op(), val()}}) -> boolean(). +obsolete(Tab, {Ts1, {write, V1}}, {Ts2, {write, V2}}) -> + equals(Tab, V1, V2) andalso mnesia_causal:compare_vclock(Ts1, Ts2) =:= lt; +obsolete(Tab, {Ts1, {write, V1}}, {Ts2, {delete, V2}}) -> + equals(Tab, V1, V2) andalso mnesia_causal:compare_vclock(Ts1, Ts2) =:= lt; +obsolete(_Tab, {_Ts1, {delete, _V1}}, _X) -> + true. + +-spec equals(mnesia:table(), tuple(), tuple()) -> boolean(). +equals(Tab, V1, V2) -> + case val({Tab, setorbag}) of + pawset -> + element(key_pos(Tab), V1) =:= element(key_pos(Tab), V2); + pawbag -> + V1 =:= V2; + Other -> + error({bad_val, Other}) + end. + +%% Some helper functions to deal convert between different formats + +-spec obj2ele(tuple()) -> element(). +obj2ele(Obj) -> + {get_ts(Obj), {get_op(Obj), get_val_tup(Obj)}}. + +get_val_ele({_Ts, {_Op, V}}) -> + V. + +%% @equiv delete_meta/1 +get_val_tup(Obj) -> + delete_meta(Obj). + +-spec get_val_key(mnesia:table(), tuple()) -> term(). +get_val_key(Tab, V) -> + element(key_pos(Tab), V). + +add_op(Obj, Op) -> + erlang:append_element(Obj, Op). + +remove_op(Obj) -> + erlang:delete_element(tuple_size(Obj), Obj). + +get_op(Obj) -> + element(tuple_size(Obj), Obj). + +get_ts(Obj) -> + get_ts(tuple_size(Obj) - 1, Obj). + +get_ts(Idx, Obj) -> + element(Idx, Obj). + +remove_ts(Obj) -> + erlang:delete_element(tuple_size(Obj), Obj). + +-spec delete_meta(tuple()) -> tuple(). +delete_meta(Obj) -> + Last = tuple_size(Obj), + erlang:delete_element(Last - 1, erlang:delete_element(Last, Obj)). + +wild_ts(Pat) -> + erlang:append_element(Pat, '_'). diff --git a/lib/mnesia/src/mnesia_schema.erl b/lib/mnesia/src/mnesia_schema.erl index ed4f7a8784ba..55986ac9c6b1 100644 --- a/lib/mnesia/src/mnesia_schema.erl +++ b/lib/mnesia/src/mnesia_schema.erl @@ -1105,7 +1105,7 @@ assert_correct_cstruct(Cs) when is_record(Cs, cstruct) -> Tab = Cs#cstruct.name, verify(atom, mnesia_lib:etype(Tab), {bad_type, Tab}), Type = Cs#cstruct.type, - verify(true, lists:member(Type, [set, bag, ordered_set]), + verify(true, lists:member(Type, [set, bag, ordered_set, pawset, pawbag]), {bad_type, Tab, {type, Type}}), %% Currently ordered_set is not supported for disk_only_copies. diff --git a/lib/mnesia/test/Makefile b/lib/mnesia/test/Makefile index c3fbad88ca1b..c78ccbde8dd9 100644 --- a/lib/mnesia/test/Makefile +++ b/lib/mnesia/test/Makefile @@ -38,6 +38,9 @@ MODULES= \ mnesia_config_event \ mnesia_examples_test \ mnesia_nice_coverage_test \ + mnesia_causal_test \ + mnesia_ec_test \ + mnesia_pawset_test \ mnesia_evil_coverage_test \ mnesia_evil_backup \ mnesia_trans_access_test \ diff --git a/lib/mnesia/test/mnesia_causal_test.erl b/lib/mnesia/test/mnesia_causal_test.erl new file mode 100644 index 000000000000..7050875787ee --- /dev/null +++ b/lib/mnesia/test/mnesia_causal_test.erl @@ -0,0 +1,58 @@ +-module(mnesia_causal_test). + +-author('sl955@cam.ac.uk'). + +-include("mnesia_test_lib.hrl"). + +-export([init_per_testcase/2, end_per_testcase/2, init_per_group/2, end_per_group/2, + all/0, groups/0]). +-export([empty_final_buffer_ram/1]). + +init_per_testcase(Func, Conf) -> + mnesia_test_lib:init_per_testcase(Func, Conf). + +end_per_testcase(Func, Conf) -> + mnesia_test_lib:end_per_testcase(Func, Conf). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +all() -> + [{group, causal_consistency}, + empty_final_buffer_ram]. + +groups() -> + [{causal_consistency, [], []}]. + +init_per_group(_GroupName, Config) -> + Config. + +end_per_group(_GroupName, Config) -> + Config. + +empty_final_buffer_ram(suite) -> + []; +empty_final_buffer_ram(Config) when is_list(Config) -> + empty_final_buffer(Config, ram_copies). + +empty_final_buffer(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + Tab = empty_final_buffer, + Def = [{Storage, NodeNames}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + ?match(ok, + mnesia:activity(async_ec, + fun() -> + Writer(2, a), + Writer(1, a) + end)), + spawn_monitor(NodeA1, mnesia, async_ec, [fun() -> [Writer(N, a) || N <- lists:seq(3, 10)] end]), + spawn_monitor(NodeA1, mnesia, async_ec, [fun() -> [Writer(N, b) || N <- lists:seq(12, 20)] end]), + + timer:sleep(3000), + + ?match([], mnesia_causal:get_buffered()), + ?match([], rpc:call(NodeA1, mnesia_causal, get_buffered, [])), + ?match([], rpc:call(NodeA2, mnesia_causal, get_buffered, [])), + + ?verify_mnesia(Nodes, []). diff --git a/lib/mnesia/test/mnesia_ec_test.erl b/lib/mnesia/test/mnesia_ec_test.erl new file mode 100644 index 000000000000..00b26d2b81ea --- /dev/null +++ b/lib/mnesia/test/mnesia_ec_test.erl @@ -0,0 +1,395 @@ +-module(mnesia_ec_test). + +-author('sl955@cam.ac.uk'). + +-include("mnesia_test_lib.hrl"). + +-export([init_per_testcase/2, end_per_testcase/2, init_per_group/2, end_per_group/2, + all/0, groups/0]). +-export([sync_ec_rw_ram/1, sync_ec_rw_compare_dirty_ram/1, sync_ec_rwd_ram/1]). +-export([async_ec_index_read_ram/1]). +-export([async_ec_rw_ram/1, async_ec_rwd_ram/1, async_ec_rw_compare_dirty_ram/1]). +-export([ec_rwd_block_ram/1, ec_block_40keys_ram/1, ec_write_block_ram/1, + ec_delete_block_ram/1]). + +init_per_testcase(Func, Conf) -> + mnesia_test_lib:init_per_testcase(Func, Conf). + +end_per_testcase(Func, Conf) -> + mnesia_test_lib:end_per_testcase(Func, Conf). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +all() -> + [{group, ec_index_read}, + {group, ec_cc_crud}, + {group, ec_compare_dirty}]. % {group, ec_cc_crud_block} + +groups() -> + [{ec_cc_crud, [], [async_ec_rw_ram, async_ec_rwd_ram, sync_ec_rw_ram, sync_ec_rwd_ram]}, + {ec_index_read, [], [async_ec_index_read_ram]}, + {ec_compare_dirty, [], [async_ec_rw_compare_dirty_ram, sync_ec_rw_compare_dirty_ram]}, + {ec_cc_crud_block, + [sequence], + [ec_rwd_block_ram, ec_block_40keys_ram, ec_write_block_ram, ec_delete_block_ram]}]. + +init_per_group(_GroupName, Config) -> + Config. + +end_per_group(_GroupName, Config) -> + Config. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Concurrent read and write +%% Requires a custom distrbution protocol https://github.com/rabbitmq/inet_tcp_proxy +%% for simulating network partitions. +%% These require a modification of %% mnesia_test_lib on how nodes are started, +%% as slave_start does not seem to support custom distribution protocols. +%% Disabled by default for now. + +async_ec_rw_ram(suite) -> + []; +async_ec_rw_ram(Config) when is_list(Config) -> + ec_rw([{kind, async_ec} | Config], ram_copies). + +sync_ec_rw_ram(suite) -> + []; +sync_ec_rw_ram(Config) when is_list(Config) -> + ec_rw([{kind, sync_ec} | Config], ram_copies). + +ec_rw(Config, Storage) -> + [_NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + Tab = ec_rw, + Def = [{Storage, NodeNames}, {type, pawset}, {attributes, [k, v]}], + Kind = mnesia_test_lib:lookup_config(kind, Config), + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + + Reader = fun() -> mnesia:read(Tab, a) end, + Writer = fun() -> mnesia:write({Tab, a, 1}) end, + sleep_if_async(Kind, 1000), + ?match(ok, mnesia:activity(Kind, Writer)), + ?match([{Tab, a, 1}], mnesia:activity(Kind, Reader)), + sleep_if_async(Kind, 1000), + ?match([{Tab, a, 1}], mnesia:activity(Kind, Reader)), + ?match([{Tab, a, 1}], rpc:call(NodeA1, mnesia, activity, [Kind, Reader])), + ?match([{Tab, a, 1}], rpc:call(NodeA2, mnesia, activity, [Kind, Reader])). + +async_ec_rwd_ram(suite) -> + []; +async_ec_rwd_ram(Config) -> + ec_rwd([{kind, async_ec} | Config], ram_copies). + +sync_ec_rwd_ram(suite) -> + []; +sync_ec_rwd_ram(Config) -> + ec_rwd([{kind, sync_ec} | Config], ram_copies). + +ec_rwd(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = ?acquire_nodes(3, Config), + Tab = ec_rwd, + Kind = mnesia_test_lib:lookup_config(kind, Config), + Def = [{Storage, Nodes}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + Deleter = fun(K) -> mnesia:delete({Tab, K}) end, + + spawn(fun() -> + lists:foreach(fun(N) -> mnesia:activity(Kind, fun() -> Writer(N, 4) end) end, + lists:seq(1, 20)) + end), + WriteAndDelete = + fun() -> + lists:foreach(fun(N) -> mnesia:activity(Kind, fun() -> Writer(N, 4) end) end, + lists:seq(21, 30)), + sleep_if_async(async_ec, 1000), + lists:foreach(fun(N) -> mnesia:activity(Kind, fun() -> Deleter(N) end) end, + lists:seq(25, 29)) + end, + spawn(NodeA1, WriteAndDelete), + + spawn(NodeA2, + lists, + foreach, + [fun(N) -> mnesia:activity(Kind, fun() -> Writer(N, 4) end) end, lists:seq(31, 40)]), + + sleep_if_async(async_ec, 2000), + Res = lists:seq(1, 40) -- lists:seq(25, 29), + ?match(Res, + lists:sort( + mnesia:async_ec(fun() -> mnesia:all_keys(Tab) end))). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +sync_ec_rw_compare_dirty_ram(suite) -> + []; +sync_ec_rw_compare_dirty_ram(Config) -> + ec_rw_compare_dirty([{kind, sync_ec} | Config], ram_copies). + +async_ec_rw_compare_dirty_ram(suite) -> + []; +async_ec_rw_compare_dirty_ram(Config) -> + ec_rw_compare_dirty([{kind, async_ec} | Config], ram_copies). + +ec_rw_compare_dirty(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = ?acquire_nodes(3, Config), + Tab1 = ec_rw, + Tab2 = dirty_rw, + Def1 = [{Storage, Nodes}, {type, pawset}, {attributes, [k, v]}], + Def2 = [{Storage, Nodes}, {attributes, [k, v]}], + Kind = mnesia_test_lib:lookup_config(kind, Config), + ?match({atomic, ok}, mnesia:create_table(Tab1, Def1)), + ?match({atomic, ok}, mnesia:create_table(Tab2, Def2)), + + Writer = fun(K, V) -> mnesia:write({Tab1, K, V}) end, + spawn(fun() -> [mnesia:dirty_write({Tab2, N, 1}) || N <- lists:seq(1, 20)] end), + spawn(fun() -> + mnesia_rpc:call(NodeA1, + lists, + foreach, + [fun(N) -> mnesia:dirty_write({Tab2, N, 2}) end, lists:seq(21, 30)]) + end), + spawn(fun() -> + mnesia_rpc:call(NodeA2, + lists, + foreach, + [fun(N) -> mnesia:dirty_write({Tab2, N, 3}) end, lists:seq(31, 40)]) + end), + + spawn(fun() -> + lists:foreach(fun(N) -> mnesia:activity(Kind, fun() -> Writer(N, 4) end) end, + lists:seq(1, 20)) + end), + spawn(fun() -> + mnesia_rpc:call(NodeA1, + lists, + foreach, + [fun(N) -> mnesia:activity(Kind, fun() -> Writer(N, 5) end) end, + lists:seq(21, 30)]) + end), + spawn(fun() -> + mnesia_rpc:call(NodeA2, + lists, + foreach, + [fun(N) -> mnesia:activity(Kind, fun() -> Writer(N, 6) end) end, + lists:seq(31, 40)]) + end), + + sleep_if_async(Kind, 1000), + + DirtyKeys = + lists:sort( + mnesia:dirty_all_keys(Tab2)), + EcKeys = + lists:sort( + mnesia:activity(Kind, fun() -> mnesia:all_keys(Tab1) end)), + ?match(DirtyKeys, EcKeys). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +async_ec_index_read_ram(suite) -> + []; +async_ec_index_read_ram(Config) -> + async_ec_index_read(Config, ram_copies). + +async_ec_index_read(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = ?acquire_nodes(3, Config), + Tab = ec_index_read, + Def = [{Storage, Nodes}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + ?match({atomic, ok}, mnesia:add_table_index(Tab, v)), + + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + spawn(fun() -> + lists:foreach(fun(N) -> mnesia:activity(async_ec, fun() -> Writer(N, 4) end) end, + lists:seq(1, 20)) + end), + spawn(fun() -> + mnesia_rpc:call(NodeA1, + lists, + foreach, + [fun(N) -> mnesia:activity(async_ec, fun() -> Writer(N, 5) end) end, + lists:seq(21, 30)]) + end), + spawn(fun() -> + mnesia_rpc:call(NodeA2, + lists, + foreach, + [fun(N) -> mnesia:activity(async_ec, fun() -> Writer(N, 6) end) end, + lists:seq(31, 40)]) + end), + + sleep_if_async(async_ec, 1000), + + Keys5 = mnesia:async_ec(fun() -> mnesia:index_read(Tab, 5, 3) end), + Expected5 = [{Tab, N, 5} || N <- lists:seq(21, 30)], + ?match(Expected5, lists:sort(Keys5)), + Keys6 = mnesia:async_ec(fun() -> mnesia:index_match_object({Tab, '_', 6}, v) end), + Expected6 = [{Tab, N, 6} || N <- lists:seq(31, 40)], + ?match(Expected6, lists:sort(Keys6)), + KeyNone = mnesia:async_ec(fun() -> mnesia:index_read(Tab, 7, 3) end), + ?match([], KeyNone), + + ?verify_mnesia(Nodes, []). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +ec_write_block_ram(suite) -> + []; +ec_write_block_ram(Config) when is_list(Config) -> + ec_write_block([{is_port, true} | Config], ram_copies). + +ec_write_block(Config, Storage) -> + [NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + timer:sleep(500), + Tab = ec_write_block, + Def = [{Storage, NodeNames}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + + Reader = fun(K) -> mnesia:read(Tab, K) end, + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + BlockAndWrite = + fun() -> + % block connnection from NodeA1 to NodeA + % if called at A1 + inet_tcp_proxy_dist:block(NodeA), + timer:sleep(500), + mnesia:async_ec(fun() -> Writer(a, 1) end) + end, + spawn(NodeA1, BlockAndWrite), + timer:sleep(1000), + ?match([], mnesia:async_ec(fun() -> Reader(a) end)), + ?match(ok, mnesia:async_ec(fun() -> Writer(a, 3) end)), + ?match([{Tab, a, 3}], mnesia:async_ec(fun() -> Reader(a) end)), + + % allow connection + spawn(NodeA1, fun() -> inet_tcp_proxy_dist:allow(NodeA) end), + timer:sleep(1000), + + ?match([{Tab, a, 1}], mnesia:async_ec(fun() -> Reader(a) end)), + ?match([{Tab, a, 1}], rpc:call(NodeA1, mnesia, async_ec, [fun() -> Reader(a) end])), + ?match([{Tab, a, 1}], rpc:call(NodeA2, mnesia, async_ec, [fun() -> Reader(a) end])). + +ec_delete_block_ram(suite) -> + []; +ec_delete_block_ram(Config) when is_list(Config) -> + ec_delete_block([{is_port, true} | Config], ram_copies). + +ec_delete_block(Config, Storage) -> + [NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + timer:sleep(500), + Tab = ec_delete_block, + Def = [{Storage, NodeNames}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + + Reader = fun(K) -> mnesia:read(Tab, K) end, + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + Deleter = fun(K) -> mnesia:delete({Tab, K}) end, + + BlockAndDelete = + fun() -> + % block connnection from NodeA1 to NodeA + % if called at A1 + inet_tcp_proxy_dist:block(NodeA), + timer:sleep(500), + mnesia:async_ec(fun() -> Deleter(a, 1) end) + end, + spawn(fun() -> mnesia:async_ec(fun() -> Writer(b, 2) end) end), + spawn(NodeA1, fun() -> mnesia:async_ec(fun() -> Writer(a, 1) end) end), + timer:sleep(1000), + ?match([{Tab, a, 1}], mnesia:async_ec(fun() -> Reader(a) end)), + + spawn(NodeA1, BlockAndDelete), + spawn(fun() -> mnesia:async_ec(fun() -> Deleter(a) end) end), + + timer:sleep(500), + ?match([], mnesia:async_ec(fun() -> Reader(a) end)), + ?match([{Tab, b, 2}], mnesia:async_ec(fun() -> Reader(b) end)), + + % allow connection + spawn(NodeA1, fun() -> inet_tcp_proxy_dist:allow(NodeA) end), + timer:sleep(1000), + + ?match([], mnesia:async_ec(fun() -> Reader(a) end)), + ?match([], rpc:call(NodeA1, mnesia, async_ec, [fun() -> Reader(a) end])), + ?match([], rpc:call(NodeA2, mnesia, async_ec, [fun() -> Reader(a) end])). + +ec_rwd_block_ram(suite) -> + []; +ec_rwd_block_ram(Config) when is_list(Config) -> + ec_rwd_block([{is_port, true} | Config], ram_copies). + +ec_rwd_block(Config, Storage) -> + [NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + timer:sleep(500), + Tab = ec_rwd_block, + Def = [{Storage, NodeNames}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + + Reader = fun() -> mnesia:read(Tab, a) end, + Writer = fun() -> mnesia:write({Tab, a, 1}) end, + Deleter = fun() -> mnesia:delete({Tab, a}) end, + BlockAndWrite = + fun() -> + inet_tcp_proxy_dist:block(NodeA), + timer:sleep(500), + mnesia:async_ec(Writer) + end, + spawn(NodeA1, BlockAndWrite), + timer:sleep(1000), + ?match([], mnesia:async_ec(Reader)), + ?match(ok, mnesia:async_ec(Writer)), + ?match([{Tab, a, 1}], mnesia:async_ec(Reader)), + ?match(ok, mnesia:async_ec(Deleter)), + + spawn(NodeA1, fun() -> inet_tcp_proxy_dist:allow(NodeA) end), + timer:sleep(1000), + + ?match([{Tab, a, 1}], mnesia:async_ec(Reader)), + ?match([{Tab, a, 1}], rpc:call(NodeA1, mnesia, async_ec, [Reader])), + ?match([{Tab, a, 1}], rpc:call(NodeA2, mnesia, async_ec, [Reader])). + +ec_block_40keys_ram(suite) -> + []; +ec_block_40keys_ram(Config) -> + ec_block_40keys([{is_port, true} | Config], ram_copies). + +ec_block_40keys(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = ?acquire_nodes(3, Config), + Tab = ec_block_40keys, + Def = [{Storage, Nodes}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + Deleter = fun(K) -> mnesia:delete({Tab, K}) end, + + spawn(fun() -> + lists:foreach(fun(N) -> mnesia:async_ec(fun() -> Writer(N, 4) end) end, + lists:seq(1, 20)) + end), + spawn(NodeA1, inet_tcp_proxy_dist, block, [NodeA2]), + WriteAndDelete = + fun() -> + lists:foreach(fun(N) -> mnesia:async_ec(fun() -> Writer(N, 4) end) end, + lists:seq(21, 30)), + sleep_if_async(async_ec, 1000), + lists:foreach(fun(N) -> mnesia:async_ec(fun() -> Deleter(N) end) end, lists:seq(25, 29)) + end, + spawn(NodeA1, WriteAndDelete), + + spawn(NodeA2, + lists, + foreach, + [fun(N) -> mnesia:async_ec(fun() -> Writer(N, 4) end) end, lists:seq(31, 40)]), + + spawn(NodeA1, inet_tcp_proxy_dist, allow, [NodeA2]), + + sleep_if_async(async_ec, 3000), + Res = lists:seq(1, 40) -- lists:seq(25, 29), + ?match(Res, + lists:sort( + mnesia:async_ec(fun() -> mnesia:all_keys(Tab) end))). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +sleep_if_async(async_ec, Time) -> + timer:sleep(Time); +sleep_if_async(sync_ec, _T) -> + ok. diff --git a/lib/mnesia/test/mnesia_pawset_test.erl b/lib/mnesia/test/mnesia_pawset_test.erl new file mode 100644 index 000000000000..78f402158220 --- /dev/null +++ b/lib/mnesia/test/mnesia_pawset_test.erl @@ -0,0 +1,177 @@ +-module(mnesia_pawset_test). + +-author('sl955@cam.ac.uk'). + +-include("mnesia_test_lib.hrl"). + +-export([init_per_testcase/2, end_per_testcase/2, init_per_group/2, end_per_group/2, + all/0, groups/0]). +-export([match_delete_ram/1, match_object_ram/1, match_object_with_index_ram/1, + select_with_index_ram/1]). + +init_per_testcase(Func, Conf) -> + mnesia_test_lib:init_per_testcase(Func, Conf). + +end_per_testcase(Func, Conf) -> + mnesia_test_lib:end_per_testcase(Func, Conf). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +all() -> + [match_delete_ram, match_object_ram, {group, index_tests}]. + +groups() -> + [{index_tests, [], [match_object_with_index_ram, select_with_index_ram]}]. + +init_per_group(_GroupName, Config) -> + Config. + +end_per_group(_GroupName, Config) -> + Config. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +match_delete_ram(suite) -> + []; +match_delete_ram(Config) when is_list(Config) -> + match_delete(Config, ram_copies). + +match_delete(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + Tab = match_delete, + Def = [{Storage, NodeNames}, {type, pawbag}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + Reader = fun() -> mnesia:read(Tab, a) end, + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + ObjectDeleter = fun() -> mnesia:delete_object({Tab, a, 1}) end, + ?match(ok, + mnesia:activity(sync_ec, + fun() -> + Writer(a, 1), + Writer(a, 2) + end)), + ?match([{Tab, a, 1}, {Tab, a, 2}], mnesia:activity(sync_ec, Reader)), + ?match([{Tab, a, 1}, {Tab, a, 2}], + rpc:call(NodeA1, mnesia, activity, [sync_ec, Reader])), %% Delete the record + ?match([{Tab, a, 1}, {Tab, a, 2}], rpc:call(NodeA2, mnesia, activity, [sync_ec, Reader])), + ?match(ok, mnesia:sync_ec(ObjectDeleter)), + + ?match([{Tab, a, 2}], mnesia:async_ec(Reader)), + ?verify_mnesia(Nodes, []). + +match_object_ram(suite) -> + []; +match_object_ram(Config) when is_list(Config) -> + match_object(Config, ram_copies). + +match_object(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + Tab = match_object, + Def = [{Storage, NodeNames}, {type, pawbag}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + Reader = fun() -> mnesia:read(Tab, a) end, + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + ObjectMatcher = fun(Pat) -> mnesia:match_object(Pat) end, + + ?match(ok, + mnesia:activity(sync_ec, + fun() -> + Writer(a, 1), + Writer(a, 2), + Writer(b, 2) + end)), + + ?match([{Tab, a, 1}, {Tab, a, 2}], mnesia:activity(sync_ec, Reader)), + ?match([{Tab, b, 2}], mnesia:activity(sync_ec, fun() -> mnesia:read(Tab, b) end)), + + ?match([{Tab, a, 1}, {Tab, a, 2}], + lists:sort( + mnesia:async_ec(fun() -> ObjectMatcher({Tab, a, '_'}) end))), + ?match([{Tab, a, 2}, {Tab, b, 2}], + lists:sort( + rpc:call(NodeA1, mnesia, async_ec, [fun() -> ObjectMatcher({Tab, '_', 2}) end]))), + + ?match(ok, mnesia:sync_ec(fun() -> mnesia:delete_object({Tab, a, 1}) end)), + + ?match([{Tab, a, 2}], + rpc:call(NodeA2, mnesia, async_ec, [fun() -> ObjectMatcher({Tab, a, '_'}) end])), + + ?verify_mnesia(Nodes, []). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Index related tests + +match_object_with_index_ram(suite) -> + []; +match_object_with_index_ram(Config) when is_list(Config) -> + match_object_with_index(Config, ram_copies). + +match_object_with_index(Config, Storage) -> + Nodes = [_NodeA, NodeA1, NodeA2] = NodeNames = ?acquire_nodes(3, Config), + Tab = match_object, + Def = [{Storage, NodeNames}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + ?match({atomic, ok}, mnesia:add_table_index(Tab, v)), + Reader = fun() -> mnesia:read(Tab, a) end, + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + ObjectMatcher = fun(Pat) -> mnesia:match_object(Pat) end, + + ?match(ok, + mnesia:activity(sync_ec, + fun() -> + Writer(a, 1), + Writer(b, 2), + Writer(c, 2) + end)), + + ?match([{Tab, a, 1}], mnesia:activity(sync_ec, Reader)), + ?match([{Tab, b, 2}], mnesia:activity(sync_ec, fun() -> mnesia:read(Tab, b) end)), + + ?match([{Tab, a, 1}], mnesia:async_ec(fun() -> ObjectMatcher({Tab, a, '_'}) end)), + ?match([{Tab, b, 2}, {Tab, c, 2}], + lists:sort( + rpc:call(NodeA1, mnesia, async_ec, [fun() -> ObjectMatcher({Tab, '_', 2}) end]))), + + ?match(ok, mnesia:sync_ec(fun() -> mnesia:delete_object({Tab, b, 2}) end)), + + ?match([{Tab, c, 2}], + rpc:call(NodeA2, mnesia, async_ec, [fun() -> ObjectMatcher({Tab, '_', 2}) end])), + + ?verify_mnesia(Nodes, []). + +select_with_index_ram(suite) -> + []; +select_with_index_ram(Config) when is_list(Config) -> + select_with_index(Config, ram_copies). + +select_with_index(Config, Storage) -> + Nodes = NodeNames = ?acquire_nodes(3, Config), + Tab = select_with_index, + Def = [{Storage, NodeNames}, {type, pawset}, {attributes, [k, v]}], + ?match({atomic, ok}, mnesia:create_table(Tab, Def)), + ?match({atomic, ok}, mnesia:add_table_index(Tab, v)), + Reader = fun() -> mnesia:read(Tab, a) end, + Writer = fun(K, V) -> mnesia:write({Tab, K, V}) end, + + ?match(ok, + mnesia:activity(sync_ec, + fun() -> + Writer(a, 1), + Writer(b, 2), + Writer(c, 2), + Writer(d, 10) + end)), + + ?match([{Tab, a, 1}], mnesia:activity(sync_ec, Reader)), + ?match([{Tab, b, 2}], mnesia:activity(sync_ec, fun() -> mnesia:read(Tab, b) end)), + + ?match([1], + mnesia:sync_ec(fun() -> mnesia:select(Tab, [{{Tab, a, '$2'}, [], ['$2']}]) end)), + + ?match([b, c], + mnesia:sync_ec(fun() -> mnesia:select(Tab, [{{Tab, '$1', 2}, [], ['$1']}]) end)), + + ?match([], + mnesia:async_ec(fun() -> mnesia:select(Tab, [{{Tab, d, '$1'}, ['<', '$1'], ['$_']}]) + end)), + + ?verify_mnesia(Nodes, []). diff --git a/lib/mnesia/test/mt.erl b/lib/mnesia/test/mt.erl index b5eea3069083..d69f8e2143ef 100644 --- a/lib/mnesia/test/mt.erl +++ b/lib/mnesia/test/mt.erl @@ -46,10 +46,12 @@ alias(all) -> mnesia_SUITE; alias(atomicity) -> mnesia_atomicity_test; alias(backup) -> mnesia_evil_backup; +alias(causal) -> mnesia_causal_test; alias(config) -> mnesia_config_test; alias(consistency) -> mnesia_consistency_test; alias(dirty) -> mnesia_dirty_access_test; alias(durability) -> mnesia_durability_test; +alias(ec) -> mnesia_ec_test; alias(evil) -> mnesia_evil_coverage_test; alias(qlc) -> mnesia_qlc_test; alias(examples) -> mnesia_examples_test; @@ -62,6 +64,7 @@ alias(majority) -> mnesia_majority_test; alias(measure) -> mnesia_measure_test; alias(medium) -> {mnesia_SUITE, medium}; alias(nice) -> mnesia_nice_coverage_test; +alias(pawset) -> mnesia_pawset_test; alias(recover) -> mnesia_recover_test; alias(recovery) -> mnesia_recovery_test; alias(registry) -> mnesia_registry_test;