Skip to content

Commit

Permalink
fix: protect 'join' operation with a global lock
Browse files Browse the repository at this point in the history
The lock must resolve conflicts between two nodes trying to join
each other simultaneously

Fixes: EMQX-9588, #10380
  • Loading branch information
SergeTupchiy committed Apr 24, 2023
1 parent f2a9f4f commit e558f2b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 25 deletions.
2 changes: 2 additions & 0 deletions include/mria.hrl
Expand Up @@ -16,3 +16,5 @@
}).

-type(member() :: #member{}).

-define(JOIN_LOCK_ID, {mria_sync_join, node()}).
57 changes: 32 additions & 25 deletions src/mria.erl
Expand Up @@ -228,31 +228,11 @@ join(Node) ->
join(Node, _) when Node =:= node() ->
ignore;
join(Node, Reason) when is_atom(Node) ->
%% When `Reason =:= heal' the node should rejoin regardless of
%% what mnesia thinks:
IsInCluster = is_node_in_cluster(Node) andalso Reason =/= heal,
case {IsInCluster, mria_node:is_running(Node), catch mria_rlog:role(Node)} of
{false, true, core} ->
%% FIXME: reading role via `mria_config' may be unsafe
%% when the app is not running, since it defaults to core.
%% Replicant may try to join the cluster as a core and wreak
%% havok
Role = application:get_env(mria, node_role, core),
do_join(Role, Node, Reason);
{_, false, _} ->
{error, {node_down, Node}};
{true, _, _} ->
{error, {already_in_cluster, Node}};
{_, _, replicant} ->
{error, {cannot_join_to_replicant, Node}};
{_, IsRunning, Role} ->
{error, #{ reason => illegal_target
, target_node => Node
, in_cluster => IsInCluster
, is_running => IsRunning
, target_role => Role
}}
end.
%% If two nodes are trying to join each other simultaneously,
%% one of them must be blocked waiting for a lock.
%% Once lock is released, it is expected to be already in the
%% cluster (if the other node joined it successfully).
global:trans(?JOIN_LOCK_ID, fun() -> join1(Node, Reason) end, [node(), Node]).

%% @doc Leave the cluster
-spec leave() -> ok | {error, term()}.
Expand Down Expand Up @@ -528,6 +508,33 @@ find_upstream_node(Shard) ->
Node
end).

join1(Node, Reason) when is_atom(Node) ->
%% When `Reason =:= heal' the node should rejoin regardless of
%% what mnesia thinks:
IsInCluster = is_node_in_cluster(Node) andalso Reason =/= heal,
case {IsInCluster, mria_node:is_running(Node), catch mria_rlog:role(Node)} of
{false, true, core} ->
%% FIXME: reading role via `mria_config' may be unsafe
%% when the app is not running, since it defaults to core.
%% Replicant may try to join the cluster as a core and wreak
%% havok
Role = application:get_env(mria, node_role, core),
do_join(Role, Node, Reason);
{_, false, _} ->
{error, {node_down, Node}};
{true, _, _} ->
{error, {already_in_cluster, Node}};
{_, _, replicant} ->
{error, {cannot_join_to_replicant, Node}};
{_, IsRunning, Role} ->
{error, #{ reason => illegal_target
, target_node => Node
, in_cluster => IsInCluster
, is_running => IsRunning
, target_role => Role
}}
end.

-spec do_join(mria_rlog:role(), node(), join_reason()) -> ok.
do_join(Role, Node, Reason) ->
?tp(notice, "Mria is restarting to join the cluster", #{seed => Node}),
Expand Down
34 changes: 34 additions & 0 deletions test/mria_SUITE.erl
Expand Up @@ -1056,6 +1056,40 @@ t_cluster_nodes(_) ->
end,
[]).

t_join_each_other_simultaneously(_) ->
Cluster = [maps:remove(join_to, Spec)
|| Spec <- mria_ct:cluster([core, core], mria_mnesia_test_util:common_env())],
?check_trace(
try
[N1, N2] = mria_ct:start_cluster(mria, Cluster),
Key1 = rpc:async_call(N1, mria, join, [N2]),
Key2 = rpc:async_call(N2, mria, join, [N1]),
?assertMatch([ok, {error, {already_in_cluster, _}}],
lists:sort([rpc:yield(Key1), rpc:yield(Key2)]))
after
ok = mria_ct:teardown_cluster(Cluster)
end,
[]).

t_join_another_node_simultaneously(_) ->
Cluster = [maps:remove(join_to, Spec)
|| Spec <- mria_ct:cluster([core, core, core, core], mria_mnesia_test_util:common_env())],
?check_trace(
try
[N1, N2, N3, N4] = Nodes = mria_ct:start_cluster(mria, Cluster),
ok = rpc:call(N2, mria, join, [N1]),
Key1 = rpc:async_call(N3, mria, join, [N1]),
Key2 = rpc:async_call(N4, mria, join, [N1]),
?assertMatch(ok, rpc:yield(Key1)),
?assertMatch(ok, rpc:yield(Key2)),
timer:sleep(3000),
?assertEqual({[true, true, true, true], []}, rpc:multicall(Nodes, mria_sup, is_running, []))
after
ok = mria_ct:teardown_cluster(Cluster)
end,
[]).


cluster_benchmark(_) ->
NReplicas = 6,
Config = #{ trans_size => 10
Expand Down

0 comments on commit e558f2b

Please sign in to comment.