Skip to content

Commit

Permalink
Allow a user_defined function to wrap mnesia_schema:merge_schema()
Browse files Browse the repository at this point in the history
Mnesia currently notifies the user if it detects a partitioned
network, but the options for resolving the situation are limited.
In practice, the only safe options are:
- set master_nodes and restart one of the affected 'islands'
- restart the entire system from backup

This patch introduces a way to resolve the situation without
restarting any nodes. The key to doing this safely is to
lock affected tables and run the merge function inside the same
transaction that merges the schema. Otherwise, one transaction
will merge the schema, after which writes to the database will
be replicated across the (potentially) inconsistent copies;
the transaction triggered by the asynchronous inconsistency event
will have to race to be the first to access the tables.

The normal call to merge the schema is done from mnesia_controller.
Previously, this was mnesia_schema:merge_schema().

The new function is merge_schema(UserFun), with the
following behaviour:

merge_schema(UserFun) ->
    schema_transaction(
        fun() ->
            UserFun(fun(Arg) -> do_merge_schema(Arg) end)
        end).

Where do_merge_schema(LockTabs) will execute the schema merge
as before, but also lock all tables in the list LockTabs which
have copies on the affected nodes (that is, everywhere the schema
table is locked).

The effect of this is to allow a wrapper function that calls the
merge and, if successful, continues to resolve the inconsistency
on the tables, knowing that they have now been locked on all
affected nodes.

The function that is actually called by the deconflict function
is mnesia_controller:connect_nodes(Nodes, UserFun), as in:

Tables = tables_to_deconflict(Node),
mnesia_controller:connect_nodes(
   [Node], fun(MergeF) ->
               case MergeF(Tables) of
                  {merged,_,_} ->
                       deconflict(Tables, Node);
                  Other ->
                       Other
            end).

In the case where the merge fails, it is probably wise to
restart from a backup...

I have not run the mnesia test suite, as it is not available.
I have not updated documentation, as these functions are not
documented in the first place.
  • Loading branch information
Ulf Wiger committed May 9, 2010
1 parent a21a9da commit 3f70f3d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
27 changes: 17 additions & 10 deletions lib/mnesia/src/mnesia_controller.erl
Expand Up @@ -52,6 +52,7 @@
async_dump_log/1,
sync_dump_log/1,
connect_nodes/1,
connect_nodes/2,
wait_for_schema_commit_lock/0,
release_schema_commit_lock/0,
create_table/1,
Expand Down Expand Up @@ -94,7 +95,7 @@
load_and_reply/2,
send_and_reply/2,
wait_for_tables_init/2,
connect_nodes2/2
connect_nodes2/3
]).

-import(mnesia_lib, [set/2, add/2]).
Expand Down Expand Up @@ -420,12 +421,15 @@ try_schedule_late_disc_load(Tabs, Reason, MsgTag) ->
[[Tabs, Reason, MsgTag], AbortReason])
end.

connect_nodes(Ns) ->
connect_nodes(Ns) ->
connect_nodes(Ns, fun default_merge/1).

connect_nodes(Ns, UserFun) ->
case mnesia:system_info(is_running) of
no ->
{error, {node_not_running, node()}};
yes ->
Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns]),
Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns, UserFun]),
receive
{?MODULE, Pid, Res, New} ->
case Res of
Expand All @@ -443,15 +447,15 @@ connect_nodes(Ns) ->
end
end.

connect_nodes2(Father, Ns) ->
connect_nodes2(Father, Ns, UserFun) ->
Current = val({current, db_nodes}),
abcast([node()|Ns], {merging_schema, node()}),
{NewC, OldC} = mnesia_recover:connect_nodes(Ns),
Connected = NewC ++OldC,
New1 = mnesia_lib:intersect(Ns, Connected),
New = New1 -- Current,
process_flag(trap_exit, true),
Res = try_merge_schema(New),
Res = try_merge_schema(New, UserFun),
Msg = {schema_is_merged, [], late_merge, []},
multicall([node()|Ns], Msg),
After = val({current, db_nodes}),
Expand All @@ -465,7 +469,7 @@ connect_nodes2(Father, Ns) ->

merge_schema() ->
AllNodes = mnesia_lib:all_nodes(),
case try_merge_schema(AllNodes) of
case try_merge_schema(AllNodes, fun default_merge/1) of
ok ->
schema_is_merged();
{aborted, {throw, Str}} when is_list(Str) ->
Expand All @@ -474,8 +478,11 @@ merge_schema() ->
fatal("Failed to merge schema: ~p~n", [Else])
end.

try_merge_schema(Nodes) ->
case mnesia_schema:merge_schema() of
default_merge(F) ->
F([]).

try_merge_schema(Nodes, UserFun) ->
case mnesia_schema:merge_schema(UserFun) of
{atomic, not_merged} ->
%% No more nodes that we need to merge the schema with
ok;
Expand All @@ -488,11 +495,11 @@ try_merge_schema(Nodes) ->
im_running(OldFriends, NewFriends),
im_running(NewFriends, OldFriends),

try_merge_schema(Nodes);
try_merge_schema(Nodes, UserFun);
{atomic, {"Cannot get cstructs", Node, Reason}} ->
dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),
timer:sleep(1000), % Avoid a endless loop look alike
try_merge_schema(Nodes);
try_merge_schema(Nodes, UserFun);
Other ->
Other
end.
Expand Down
23 changes: 20 additions & 3 deletions lib/mnesia/src/mnesia_schema.erl
Expand Up @@ -62,6 +62,7 @@
list2cs/1,
lock_schema/0,
merge_schema/0,
merge_schema/1,
move_table/3,
opt_create_dir/2,
prepare_commit/3,
Expand Down Expand Up @@ -2650,10 +2651,17 @@ make_dump_tables([]) ->

%% Merge the local schema with the schema on other nodes
merge_schema() ->
schema_transaction(fun() -> do_merge_schema() end).
schema_transaction(fun() -> do_merge_schema([]) end).

do_merge_schema() ->
merge_schema(UserFun) ->
schema_transaction(fun() -> UserFun(fun(Arg) -> do_merge_schema(Arg) end) end).


do_merge_schema(LockTabs0) ->
{_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, write),
LockTabs = [{T, tab_to_nodes(T)} || T <- LockTabs0],
io:fwrite("LockTabs = ~p~n", [LockTabs]),
[get_tid_ts_and_lock(T,write) || {T,_} <- LockTabs],
Connected = val(recover_nodes),
Running = val({current, db_nodes}),
Store = Ts#tidstore.store,
Expand All @@ -2665,9 +2673,11 @@ do_merge_schema() ->
mnesia:abort({bad_commit, {missing_lock, Miss}})
end,
case Connected -- Running of
[Node | _] ->
[Node | _] = NewNodes ->
%% Time for a schema merging party!
mnesia_locker:wlock_no_exist(Tid, Store, schema, [Node]),
[mnesia_locker:wlock_no_exist(Tid, Store, T, mnesia_lib:intersect(Ns, NewNodes))
|| {T,Ns} <- LockTabs],
case rpc:call(Node, mnesia_controller, get_cstructs, []) of
{cstructs, Cstructs, RemoteRunning1} ->
LockedAlready = Running ++ [Node],
Expand All @@ -2681,6 +2691,9 @@ do_merge_schema() ->
end,
NeedsLock = RemoteRunning -- LockedAlready,
mnesia_locker:wlock_no_exist(Tid, Store, schema, NeedsLock),
[mnesia_locker:wlock_no_exist(Tid, Store, T,
mnesia_lib:intersect(Ns,NeedsLock))
|| {T,Ns} <- LockTabs],
{value, SchemaCs} =
lists:keysearch(schema, #cstruct.name, Cstructs),

Expand Down Expand Up @@ -2714,6 +2727,10 @@ do_merge_schema() ->
not_merged
end.

tab_to_nodes(Tab) when is_atom(Tab) ->
Cs = val({Tab, cstruct}),
mnesia_lib:cs_to_nodes(Cs).

make_merge_schema(Node, [Cs | Cstructs]) ->
Ops = do_make_merge_schema(Node, Cs),
Ops ++ make_merge_schema(Node, Cstructs);
Expand Down

0 comments on commit 3f70f3d

Please sign in to comment.