Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: protect 'join' operation with a global lock #137

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/mria.hrl
Expand Up @@ -16,3 +16,5 @@
}).

-type(member() :: #member{}).

-define(JOIN_LOCK_ID, {mria_sync_join, node()}).
57 changes: 32 additions & 25 deletions src/mria.erl
Expand Up @@ -228,31 +228,11 @@ join(Node) ->
join(Node, _) when Node =:= node() ->
ignore;
join(Node, Reason) when is_atom(Node) ->
%% When `Reason =:= heal' the node should rejoin regardless of
%% what mnesia thinks:
IsInCluster = is_node_in_cluster(Node) andalso Reason =/= heal,
case {IsInCluster, mria_node:is_running(Node), catch mria_rlog:role(Node)} of
{false, true, core} ->
%% FIXME: reading role via `mria_config' may be unsafe
%% when the app is not running, since it defaults to core.
%% Replicant may try to join the cluster as a core and wreak
%% havok
Role = application:get_env(mria, node_role, core),
do_join(Role, Node, Reason);
{_, false, _} ->
{error, {node_down, Node}};
{true, _, _} ->
{error, {already_in_cluster, Node}};
{_, _, replicant} ->
{error, {cannot_join_to_replicant, Node}};
{_, IsRunning, Role} ->
{error, #{ reason => illegal_target
, target_node => Node
, in_cluster => IsInCluster
, is_running => IsRunning
, target_role => Role
}}
end.
%% If two nodes are trying to join each other simultaneously,
%% one of them must be blocked waiting for a lock.
%% Once lock is released, it is expected to be already in the
%% cluster (if the other node joined it successfully).
global:trans(?JOIN_LOCK_ID, fun() -> join1(Node, Reason) end, [node(), Node]).

%% @doc Leave the cluster
-spec leave() -> ok | {error, term()}.
Expand Down Expand Up @@ -528,6 +508,33 @@ find_upstream_node(Shard) ->
Node
end).

join1(Node, Reason) when is_atom(Node) ->
%% When `Reason =:= heal' the node should rejoin regardless of
%% what mnesia thinks:
IsInCluster = is_node_in_cluster(Node) andalso Reason =/= heal,
case {IsInCluster, mria_node:is_running(Node), catch mria_rlog:role(Node)} of
{false, true, core} ->
%% FIXME: reading role via `mria_config' may be unsafe
%% when the app is not running, since it defaults to core.
%% Replicant may try to join the cluster as a core and wreak
%% havok
Role = application:get_env(mria, node_role, core),
do_join(Role, Node, Reason);
{_, false, _} ->
{error, {node_down, Node}};
{true, _, _} ->
{error, {already_in_cluster, Node}};
{_, _, replicant} ->
{error, {cannot_join_to_replicant, Node}};
{_, IsRunning, Role} ->
{error, #{ reason => illegal_target
, target_node => Node
, in_cluster => IsInCluster
, is_running => IsRunning
, target_role => Role
}}
end.

-spec do_join(mria_rlog:role(), node(), join_reason()) -> ok.
do_join(Role, Node, Reason) ->
?tp(notice, "Mria is restarting to join the cluster", #{seed => Node}),
Expand Down
33 changes: 33 additions & 0 deletions test/mria_SUITE.erl
Expand Up @@ -1056,6 +1056,39 @@ t_cluster_nodes(_) ->
end,
[]).

t_join_each_other_simultaneously(_) ->
Cluster = [maps:remove(join_to, Spec)
|| Spec <- mria_ct:cluster([core, core], mria_mnesia_test_util:common_env())],
?check_trace(
try
[N1, N2] = mria_ct:start_cluster(mria, Cluster),
Key1 = rpc:async_call(N1, mria, join, [N2]),
Key2 = rpc:async_call(N2, mria, join, [N1]),
?assertMatch([ok, {error, {already_in_cluster, _}}],
lists:sort([rpc:yield(Key1), rpc:yield(Key2)]))
after
ok = mria_ct:teardown_cluster(Cluster)
end,
[]).

t_join_another_node_simultaneously(_) ->
Cluster = [maps:remove(join_to, Spec)
|| Spec <- mria_ct:cluster([core, core, core, core], mria_mnesia_test_util:common_env())],
?check_trace(
try
[N1, N2, N3, N4] = Nodes = mria_ct:start_cluster(mria, Cluster),
ok = rpc:call(N2, mria, join, [N1]),
Key1 = rpc:async_call(N3, mria, join, [N1]),
Key2 = rpc:async_call(N4, mria, join, [N1]),
?assertMatch(ok, rpc:yield(Key1)),
?assertMatch(ok, rpc:yield(Key2)),
timer:sleep(3000),
?assertEqual({[true, true, true, true], []}, rpc:multicall(Nodes, mria_sup, is_running, []))
after
ok = mria_ct:teardown_cluster(Cluster)
end,
[]).

cluster_benchmark(_) ->
NReplicas = 6,
Config = #{ trans_size => 10
Expand Down