Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(plugins): use emqx:running_nodes for multicall operations #10913

Merged
merged 3 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/emqx/priv/bpapi.versions
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
{emqx_management,3}.
{emqx_management,4}.
{emqx_mgmt_api_plugins,1}.
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}.
{emqx_mgmt_trace,1}.
{emqx_mgmt_trace,2}.
Expand Down
5 changes: 3 additions & 2 deletions apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ init_per_suite(Config) ->
_ = emqx_mgmt_api_test_util:init_suite([emqx_conf]),
Config.

end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]),
ok.

t_object(_Config) ->
Spec = #{
Expand Down
19 changes: 13 additions & 6 deletions apps/emqx_management/src/emqx_mgmt_api_plugins.erl
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ validate_name(Name) ->

%% API CallBack Begin
list_plugins(get, _) ->
{Plugins, []} = emqx_mgmt_api_plugins_proto_v1:get_plugins(),
Nodes = emqx:running_nodes(),
{Plugins, []} = emqx_mgmt_api_plugins_proto_v2:get_plugins(Nodes),
{200, format_plugins(Plugins)}.

get_plugins() ->
Expand Down Expand Up @@ -373,7 +374,8 @@ upload_install(post, #{}) ->

do_install_package(FileName, Bin) ->
%% TODO: handle bad nodes
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
Nodes = emqx:running_nodes(),
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin),
case lists:filter(fun(R) -> R =/= ok end, Res) of
[] ->
{200};
Expand All @@ -386,25 +388,30 @@ do_install_package(FileName, Bin) ->
end,
Filtered
),
{error, #{error := Reason}} = hd(Filtered),
Reason =
case hd(Filtered) of
{error, #{error := Reason0}} -> Reason0;
{error, #{reason := Reason0}} -> Reason0
end,
{400, #{
code => 'BAD_PLUGIN_INFO',
message => iolist_to_binary([Reason, ":", FileName])
}}
end.

plugin(get, #{bindings := #{name := Name}}) ->
{Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name),
Nodes = emqx:running_nodes(),
{Plugins, _} = emqx_mgmt_api_plugins_proto_v2:describe_package(Nodes, Name),
case format_plugins(Plugins) of
[Plugin] -> {200, Plugin};
[] -> {404, #{code => 'NOT_FOUND', message => Name}}
end;
plugin(delete, #{bindings := #{name := Name}}) ->
Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
Res = emqx_mgmt_api_plugins_proto_v2:delete_package(Name),
return(204, Res).

update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action),
return(204, Res).

update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

-export([
introduced_in/0,
deprecated_since/0,

get_plugins/0,
install_package/2,
describe_package/1,
Expand All @@ -31,6 +33,9 @@
introduced_in() ->
"5.0.0".

deprecated_since() ->
"5.1.0".

-spec get_plugins() -> emqx_rpc:multicall_result().
get_plugins() ->
rpc:multicall(emqx_mgmt_api_plugins, get_plugins, [], 15000).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_plugins_proto_v2).

-behaviour(emqx_bpapi).

-export([
introduced_in/0,
get_plugins/1,
install_package/3,
describe_package/2,
delete_package/1,
ensure_action/2
]).

-include_lib("emqx/include/bpapi.hrl").

introduced_in() ->
"5.1.0".

-spec get_plugins([node()]) -> emqx_rpc:multicall_result().
get_plugins(Nodes) ->
rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000).

-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result().
install_package(Nodes, Filename, Bin) ->
rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000).

-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result().
describe_package(Nodes, Name) ->
rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000).

-spec delete_package(binary() | string()) -> ok | {error, any()}.
delete_package(Name) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).

-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}.
ensure_action(Name, Action) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).
83 changes: 82 additions & 1 deletion apps/emqx_plugins/test/emqx_plugins_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ groups() ->
[
{copy_plugin, [sequence], [
group_t_copy_plugin_to_a_new_node,
group_t_copy_plugin_to_a_new_node_single_node
group_t_copy_plugin_to_a_new_node_single_node,
group_t_cluster_leave
]},
{create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]}
].
Expand Down Expand Up @@ -676,6 +677,86 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
),
ok.

group_t_cluster_leave({init, Config}) ->
PrivDataDir = ?config(priv_dir, Config),
ToInstallDir = filename:join(PrivDataDir, "plugins_copy_to"),
file:del_dir_r(ToInstallDir),
ok = filelib:ensure_path(ToInstallDir),
#{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
Cluster =
emqx_common_test_helpers:emqx_cluster(
[core, core],
#{
apps => [emqx_conf, emqx_plugins],
env => [
{emqx, init_config_load_done, false},
{emqx, boot_modules, []}
],
env_handler => fun
(emqx_plugins) ->
ok = emqx_plugins:put_config(install_dir, ToInstallDir),
%% this is to simulate an user setting the state
%% via environment variables before starting the node
ok = emqx_plugins:put_config(
states,
[#{name_vsn => NameVsn, enable => true}]
),
ok;
(_) ->
ok
end,
priv_data_dir => PrivDataDir,
schema_mod => emqx_conf_schema,
peer_mod => slave,
load_schema => true
}
),
Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
[
{to_install_dir, ToInstallDir},
{cluster, Cluster},
{nodes, Nodes},
{name_vsn, NameVsn},
{plugin_name, PluginName}
| Config
];
group_t_cluster_leave({'end', Config}) ->
Nodes = proplists:get_value(nodes, Config),
[ok = emqx_common_test_helpers:stop_slave(N) || N <- Nodes],
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
ok;
group_t_cluster_leave(Config) ->
[N1, N2] = ?config(nodes, Config),
NameVsn = proplists:get_value(name_vsn, Config),
ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]),
ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]),
ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]),
Params = unused,
%% 2 nodes running
?assertMatch(
{200, [#{running_status := [#{status := running}, #{status := running}]}]},
erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
),
?assertMatch(
{200, [#{running_status := [#{status := running}, #{status := running}]}]},
erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
),

%% Now, one node leaves the cluster.
ok = erpc:call(N2, ekka, leave, []),

%% Each node will no longer ask the plugin status to the other.
?assertMatch(
{200, [#{running_status := [#{node := N1, status := running}]}]},
erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
),
?assertMatch(
{200, [#{running_status := [#{node := N2, status := running}]}]},
erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
),
ok.

make_tar(Cwd, NameWithVsn) ->
make_tar(Cwd, NameWithVsn, NameWithVsn).

Expand Down
1 change: 1 addition & 0 deletions changes/ee/fix-10913.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed an issue where a node that left the cluster would still report plugin status from other nodes.