Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(emqx_management): Ignore results from the nodes that are down #10369

Merged
merged 3 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions apps/emqx/src/emqx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
stop/0
]).

%% Cluster API
-export([
cluster_nodes/1,
running_nodes/0
]).

%% PubSub API
-export([
subscribe/1,
Expand Down Expand Up @@ -102,6 +108,18 @@ is_running() ->
_ -> true
end.

%%--------------------------------------------------------------------
%% Cluster API
%%--------------------------------------------------------------------

-spec running_nodes() -> [node()].
running_nodes() ->
mria:running_nodes().

-spec cluster_nodes(all | running | cores | stopped) -> [node()].
cluster_nodes(Type) ->
mria:cluster_nodes(Type).

%%--------------------------------------------------------------------
%% PubSub API
%%--------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions apps/emqx/test/emqx_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ t_run_hook(_) ->
?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)).

t_cluster_nodes(_) ->
Expected = [node()],
?assertEqual(Expected, emqx:running_nodes()),
?assertEqual(Expected, emqx:cluster_nodes(running)),
?assertEqual(Expected, emqx:cluster_nodes(all)),
?assertEqual(Expected, emqx:cluster_nodes(cores)),
?assertEqual([], emqx:cluster_nodes(stopped)).

%%--------------------------------------------------------------------
%% Hook fun
%%--------------------------------------------------------------------
Expand Down
57 changes: 32 additions & 25 deletions apps/emqx_management/src/emqx_mgmt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@
%%--------------------------------------------------------------------

list_nodes() ->
Running = mria:cluster_nodes(running),
Stopped = mria:cluster_nodes(stopped),
Running = emqx:cluster_nodes(running),
Stopped = emqx:cluster_nodes(stopped),
DownNodes = lists:map(fun stopped_node_info/1, Stopped),
[{Node, Info} || #{node := Node} = Info <- node_info(Running)] ++ DownNodes.

Expand Down Expand Up @@ -199,7 +199,7 @@ vm_stats() ->
%%--------------------------------------------------------------------

list_brokers() ->
Running = mria:running_nodes(),
Running = emqx:running_nodes(),
[{Node, Broker} || #{node := Node} = Broker <- broker_info(Running)].

lookup_broker(Node) ->
Expand All @@ -223,7 +223,7 @@ broker_info(Nodes) ->
%%--------------------------------------------------------------------

get_metrics() ->
nodes_info_count([get_metrics(Node) || Node <- mria:running_nodes()]).
nodes_info_count([get_metrics(Node) || Node <- emqx:running_nodes()]).

get_metrics(Node) ->
unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
Expand All @@ -238,13 +238,20 @@ get_stats() ->
'subscriptions.shared.count',
'subscriptions.shared.max'
],
CountStats = nodes_info_count([
begin
Stats = get_stats(Node),
delete_keys(Stats, GlobalStatsKeys)
end
|| Node <- mria:running_nodes()
]),
CountStats = nodes_info_count(
lists:foldl(
fun(Node, Acc) ->
case get_stats(Node) of
{error, _} ->
Acc;
Stats ->
[delete_keys(Stats, GlobalStatsKeys) | Acc]
end
end,
[],
emqx:running_nodes()
)
),
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
maps:merge(CountStats, GlobalStats).

Expand Down Expand Up @@ -275,12 +282,12 @@ nodes_info_count(PropList) ->
lookup_client({clientid, ClientId}, FormatFun) ->
lists:append([
lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- mria:running_nodes()
|| Node <- emqx:running_nodes()
]);
lookup_client({username, Username}, FormatFun) ->
lists:append([
lookup_client(Node, {username, Username}, FormatFun)
|| Node <- mria:running_nodes()
|| Node <- emqx:running_nodes()
]).

lookup_client(Node, Key, FormatFun) ->
Expand All @@ -307,7 +314,7 @@ kickout_client(ClientId) ->
[] ->
{error, not_found};
_ ->
Results = [kickout_client(Node, ClientId) || Node <- mria:running_nodes()],
Results = [kickout_client(Node, ClientId) || Node <- emqx:running_nodes()],
check_results(Results)
end.

Expand All @@ -322,7 +329,7 @@ list_client_subscriptions(ClientId) ->
[] ->
{error, not_found};
_ ->
Results = [client_subscriptions(Node, ClientId) || Node <- mria:running_nodes()],
Results = [client_subscriptions(Node, ClientId) || Node <- emqx:running_nodes()],
Filter =
fun
({error, _}) ->
Expand All @@ -340,18 +347,18 @@ client_subscriptions(Node, ClientId) ->
{Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.

clean_authz_cache(ClientId) ->
Results = [clean_authz_cache(Node, ClientId) || Node <- mria:running_nodes()],
Results = [clean_authz_cache(Node, ClientId) || Node <- emqx:running_nodes()],
check_results(Results).

clean_authz_cache(Node, ClientId) ->
unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)).

clean_authz_cache_all() ->
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria:running_nodes()],
Results = [{Node, clean_authz_cache_all(Node)} || Node <- emqx:running_nodes()],
wrap_results(Results).

clean_pem_cache_all() ->
Results = [{Node, clean_pem_cache_all(Node)} || Node <- mria:running_nodes()],
Results = [{Node, clean_pem_cache_all(Node)} || Node <- emqx:running_nodes()],
wrap_results(Results).

wrap_results(Results) ->
Expand Down Expand Up @@ -379,7 +386,7 @@ set_keepalive(_ClientId, _Interval) ->

%% @private
call_client(ClientId, Req) ->
Results = [call_client(Node, ClientId, Req) || Node <- mria:running_nodes()],
Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()],
Expected = lists:filter(
fun
({error, _}) -> false;
Expand Down Expand Up @@ -428,7 +435,7 @@ list_subscriptions(Node) ->
list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([
list_subscriptions_via_topic(Node, Topic, FormatFun)
|| Node <- mria:running_nodes()
|| Node <- emqx:running_nodes()
]).

list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
Expand All @@ -442,7 +449,7 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
%%--------------------------------------------------------------------

subscribe(ClientId, TopicTables) ->
subscribe(mria:running_nodes(), ClientId, TopicTables).
subscribe(emqx:running_nodes(), ClientId, TopicTables).

subscribe([Node | Nodes], ClientId, TopicTables) ->
case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of
Expand All @@ -467,7 +474,7 @@ publish(Msg) ->
-spec unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
{unsubscribe, _} | {error, channel_not_found}.
unsubscribe(ClientId, Topic) ->
unsubscribe(mria:running_nodes(), ClientId, Topic).
unsubscribe(emqx:running_nodes(), ClientId, Topic).

-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
{unsubscribe, _} | {error, channel_not_found}.
Expand All @@ -490,7 +497,7 @@ do_unsubscribe(ClientId, Topic) ->
-spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe, _} | {error, channel_not_found}.
unsubscribe_batch(ClientId, Topics) ->
unsubscribe_batch(mria:running_nodes(), ClientId, Topics).
unsubscribe_batch(emqx:running_nodes(), ClientId, Topics).

-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe_batch, _} | {error, channel_not_found}.
Expand All @@ -515,7 +522,7 @@ do_unsubscribe_batch(ClientId, Topics) ->
%%--------------------------------------------------------------------

get_alarms(Type) ->
[{Node, get_alarms(Node, Type)} || Node <- mria:running_nodes()].
[{Node, get_alarms(Node, Type)} || Node <- emqx:running_nodes()].

get_alarms(Node, Type) ->
add_duration_field(unwrap_rpc(emqx_proto_v1:get_alarms(Node, Type))).
Expand All @@ -524,7 +531,7 @@ deactivate(Node, Name) ->
unwrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)).

delete_all_deactivated_alarms() ->
[delete_all_deactivated_alarms(Node) || Node <- mria:running_nodes()].
[delete_all_deactivated_alarms(Node) || Node <- emqx:running_nodes()].

delete_all_deactivated_alarms(Node) ->
unwrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)).
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_management/src/emqx_mgmt_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
{error, page_limit_invalid};
Meta ->
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
Nodes = mria:running_nodes(),
Nodes = emqx:running_nodes(),
ResultAcc = init_query_result(),
QueryState = init_query_state(Tab, NQString, MsFun, Meta),
NResultAcc = do_cluster_query(
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_management/src/emqx_mgmt_api_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ cluster_info(get, _) ->
ClusterName = application:get_env(ekka, cluster_name, emqxcl),
Info = #{
name => ClusterName,
nodes => mria:running_nodes(),
nodes => emqx:running_nodes(),
self => node()
},
{200, Info}.
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_management/src/emqx_mgmt_api_configs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ configs(get, Params, _Req) ->
QS = maps:get(query_string, Params, #{}),
Node = maps:get(<<"node">>, QS, node()),
case
lists:member(Node, mria:running_nodes()) andalso
lists:member(Node, emqx:running_nodes()) andalso
emqx_management_proto_v2:get_full_config(Node)
of
false ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_management/src/emqx_mgmt_api_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ err_msg_str(Reason) ->
io_lib:format("~p", [Reason]).

list_listeners() ->
[list_listeners(Node) || Node <- mria:running_nodes()].
[list_listeners(Node) || Node <- emqx:running_nodes()].

list_listeners(Node) ->
wrap_rpc(emqx_management_proto_v2:list_listeners(Node)).
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_management/src/emqx_mgmt_api_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ metrics(get, #{query_string := Qs}) ->
maps:from_list(
emqx_mgmt:get_metrics(Node) ++ [{node, Node}]
)
|| Node <- mria:running_nodes()
|| Node <- emqx:running_nodes()
],
{200, Data}
end.
Expand Down
26 changes: 13 additions & 13 deletions apps/emqx_management/src/emqx_mgmt_api_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,21 @@ list(get, #{query_string := Qs}) ->
true ->
{200, emqx_mgmt:get_stats()};
_ ->
Data = [
maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}])
|| Node <- running_nodes()
],
Data = lists:foldl(
fun(Node, Acc) ->
case emqx_mgmt:get_stats(Node) of
{error, _Err} ->
Acc;
Stats when is_list(Stats) ->
Data = maps:from_list([{node, Node} | Stats]),
[Data | Acc]
end
end,
[],
emqx:running_nodes()
),
{200, Data}
end.

%%%==============================================================================================
%% Internal

running_nodes() ->
Nodes = erlang:nodes([visible, this]),
RpcResults = emqx_proto_v2:are_running(Nodes),
[
Node
|| {Node, IsRunning} <- lists:zip(Nodes, RpcResults),
IsRunning =:= {ok, true}
].
10 changes: 5 additions & 5 deletions apps/emqx_management/src/emqx_mgmt_api_trace.erl
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ trace(get, _Params) ->
fun(#{start_at := A}, #{start_at := B}) -> A > B end,
emqx_trace:format(List0)
),
Nodes = mria:running_nodes(),
Nodes = emqx:running_nodes(),
TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v2:get_trace_size(Nodes)),
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
Now = erlang:system_time(second),
Expand Down Expand Up @@ -464,7 +464,7 @@ format_trace(Trace0) ->
LogSize = lists:foldl(
fun(Node, Acc) -> Acc#{Node => 0} end,
#{},
mria:running_nodes()
emqx:running_nodes()
),
Trace2 = maps:without([enable, filter], Trace1),
Trace2#{
Expand Down Expand Up @@ -560,13 +560,13 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
).

collect_trace_file(undefined, TraceLog) ->
Nodes = mria:running_nodes(),
Nodes = emqx:running_nodes(),
wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file(Nodes, TraceLog));
collect_trace_file(Node, TraceLog) ->
wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file([Node], TraceLog)).

collect_trace_file_detail(TraceLog) ->
Nodes = mria:running_nodes(),
Nodes = emqx:running_nodes(),
wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file_detail(Nodes, TraceLog)).

wrap_rpc({GoodRes, BadNodes}) ->
Expand Down Expand Up @@ -696,7 +696,7 @@ parse_node(Query, Default) ->
{ok, Default};
{ok, NodeBin} ->
Node = binary_to_existing_atom(NodeBin),
true = lists:member(Node, mria:running_nodes()),
true = lists:member(Node, emqx:running_nodes()),
{ok, Node}
end
catch
Expand Down
10 changes: 5 additions & 5 deletions apps/emqx_management/test/emqx_mgmt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).

init_per_testcase(TestCase, Config) ->
meck:expect(mria, running_nodes, 0, [node()]),
meck:expect(emqx, running_nodes, 0, [node()]),
emqx_common_test_helpers:init_per_testcase(?MODULE, TestCase, Config).

end_per_testcase(TestCase, Config) ->
meck:unload(mria),
meck:unload(emqx),
emqx_common_test_helpers:end_per_testcase(?MODULE, TestCase, Config).

t_list_nodes(init, Config) ->
meck:expect(
mria,
emqx,
cluster_nodes,
fun
(running) -> [node()];
Expand Down Expand Up @@ -125,7 +125,7 @@ t_lookup_client(_Config) ->
emqx_mgmt:lookup_client({username, <<"user1">>}, ?FORMATFUN)
),
?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)),
meck:expect(mria, running_nodes, 0, [node(), 'fake@nonode']),
meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']),
?assertMatch(
[_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN)
).
Expand Down Expand Up @@ -188,7 +188,7 @@ t_clean_cache(_Config) ->
{error, _},
emqx_mgmt:clean_pem_cache_all()
),
meck:expect(mria, running_nodes, 0, [node(), 'fake@nonode']),
meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']),
?assertMatch(
{error, [{'fake@nonode', {error, _}}]},
emqx_mgmt:clean_authz_cache_all()
Expand Down