Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMQX-9152 optimize mria running nodes #135

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 19 additions & 21 deletions src/mria.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,29 +192,27 @@ is_node_in_cluster(Node) ->
lists:member(Node, cluster_nodes(all)).

%% @doc Running nodes.
%% This function should be used with care, as it may not return the most up-to-date
%% view of replicant nodes, as changes in mria_membership are reflected asynchronously.
%% For example:
%% - a core node leaves the cluster and joins it back quickly,
%% - a replicant node receives monitor DOWN message (see mria_membership)
%% and marks the core node as leaving/stopped,
%% - mria_lb on the replicant re-discovers the core node (rlog_lb_update_interval),
%% - the replicant pings the core, the core pongs the replicant,
%% now each nodes shows the other one as running.
-spec running_nodes() -> list(node()).
running_nodes() ->
%% TODO: cache the results (this could be a hot call) and don't
%% fail on the first unsuccessful call, since other nodes may be
%% alive. Use info from `mria_membership'?
case mria_rlog:role() of
core ->
CoreNodes = mria_mnesia:running_nodes(),
{Replicants0, _} = rpc:multicall(CoreNodes, mria_status, replicants, [], 15000),
Replicants = [Node || Nodes <- Replicants0, is_list(Nodes), Node <- Nodes],
lists:usort(CoreNodes ++ Replicants);
replicant ->
case mria_lb:core_nodes() of
[CoreNode|_] ->
case mria_lib:rpc_call_nothrow(CoreNode, ?MODULE, running_nodes, []) of
{badrpc, _} -> [];
{badtcp, _} -> [];
Result -> Result
end;
[] ->
[]
end
end.
CoreNodes = case mria_rlog:role() of
core -> mria_mnesia:running_nodes();
replicant ->
%% Can be used on core node as well, eliminating this
%% case statement, but mria_mnesia:running_nodes/0
%% must be more accurate than mria_membership:nodelist/0...
mria_membership:running_core_nodelist()
end,
Replicants = mria_membership:running_replicant_nodelist(),
lists:usort(CoreNodes ++ Replicants).

%%--------------------------------------------------------------------
%% Cluster API
Expand Down
44 changes: 32 additions & 12 deletions src/mria_lb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ do_update(State = #s{core_nodes = OldCoreNodes}) ->
, ignored_nodes => DiscoveredNodes -- NewCoreNodes
, node => node()
}),
IsChanged andalso
ping_core_nodes(NewCoreNodes),
DiscoveredReplicants = discover_replicants(NewCoreNodes),
ping_new_nodes(NewCoreNodes, DiscoveredReplicants),
State#s{core_nodes = NewCoreNodes}.

%% Find fully connected clusters (i.e. cliques of nodes)
Expand Down Expand Up @@ -290,16 +290,6 @@ maybe_report_netsplit(OldNodes, Clusters) ->
end,
ok.

-spec ping_core_nodes([node()]) -> ok.
ping_core_nodes(NewCoreNodes) ->
%% Replicants do not have themselves as local members.
%% We make an entry on the fly.
LocalMember = mria_membership:make_new_local_member(),
lists:foreach(
fun(Core) ->
mria_membership:ping(Core, LocalMember)
end, NewCoreNodes).

%%================================================================================
%% Internal exports
%%================================================================================
Expand Down Expand Up @@ -364,6 +354,13 @@ discover_nodes() ->
discover_manually(Seed)
end.

-spec discover_replicants([node()]) -> [node()].
discover_replicants(CoreNodes) ->
{Replicants0, _BadNodes} = rpc:multicall( CoreNodes
, mria_membership, replicant_nodelist, []
),
lists:usort([Node || Nodes <- Replicants0, is_list(Nodes), Node <- Nodes]).

%% Return the last node that has been explicitly specified via
%% "mria:join" command. It overrides other discovery mechanisms.
-spec manual_seed() -> [node()].
Expand Down Expand Up @@ -406,6 +403,29 @@ cluster_score(OldNodes, Cluster) ->
, length(Cluster)
}.

%% Ping all new nodes to update mria_membership state
-spec ping_new_nodes([node()], [node()]) -> ok.
ping_new_nodes(CoreNodes, Replicants) ->
%% mria_membership:running_core_nodelist/0 is a more reliable source
%% of previous nodes comparing to the list of core nodes stored in
%% mria_lb state. mria_lb makes updates periodically, so it can overlook
%% changes when another (core) node leaves and joins quickly
%% (within mria_lb update period).
%% At the same time, mria_membership on a replicant node monitors its
%% corresponding registered processes on all other nodes, so it will
%% eventually detect a left node.
NewCoreNodes = CoreNodes -- mria_membership:running_core_nodelist(),
NewReplicants = Replicants -- mria_membership:running_replicant_nodelist(),
ping_nodes(NewCoreNodes ++ NewReplicants).

-spec ping_nodes([node()]) -> ok.
ping_nodes(Nodes) ->
LocalMember = mria_membership:local_any_member(),
lists:foreach(
fun(Node) ->
mria_membership:ping(Node, LocalMember)
end, Nodes).

%%================================================================================
%% Unit tests
%%================================================================================
Expand Down