Skip to content

Commit

Permalink
chore: forward the async invite to leader node
Browse files Browse the repository at this point in the history
  • Loading branch information
HJianBo committed Jan 15, 2024
1 parent 93ef676 commit 4c40e75
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 22 deletions.
1 change: 1 addition & 0 deletions apps/emqx/priv/bpapi.versions
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}.
{emqx_mgmt_cluster,2}.
{emqx_mgmt_cluster,3}.
{emqx_mgmt_data_backup,1}.
{emqx_mgmt_trace,1}.
{emqx_mgmt_trace,2}.
Expand Down
20 changes: 11 additions & 9 deletions apps/emqx_management/src/emqx_mgmt_api_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ schema("/cluster/topology") ->
};
schema("/cluster/invitation") ->
#{
'operationId' => get_invitation_view,
'operationId' => get_invitation_status,
get => #{
desc => ?DESC(get_invitation_view),
desc => ?DESC(get_invitation_status),
tags => [<<"Cluster">>],
responses => #{
200 => ?HOCON(
?REF(invitation_view),
?REF(invitation_status),
#{desc => <<"Get invitation progress created by async operation">>}
)
}
Expand Down Expand Up @@ -174,7 +174,7 @@ fields(timeout) ->
#{desc => <<"Timeout in milliseconds">>, example => <<"15000">>}
)}
];
fields(invitation_view) ->
fields(invitation_status) ->
[
{succeed,
?HOCON(
Expand Down Expand Up @@ -208,7 +208,8 @@ fields(node_invitation_succeed) ->
?HOCON(
emqx_utils_calendar:epoch_millisecond(),
#{
desc => <<"The time of the async invitation result is received, millisecond precision epoch">>,
desc =>
<<"The time of the async invitation result is received, millisecond precision epoch">>,
example => <<"1705044829915">>
}
)}
Expand All @@ -224,7 +225,8 @@ fields(node_invitation_in_progress) ->
?HOCON(
emqx_utils_calendar:epoch_millisecond(),
#{
desc => <<"The start timestamp of the invitation, millisecond precision epoch">>,
desc =>
<<"The start timestamp of the invitation, millisecond precision epoch">>,
example => <<"1705044829915">>
}
)}
Expand Down Expand Up @@ -319,8 +321,8 @@ invite_node_async(put, #{bindings := #{node := Node0}}) ->
}}
end.

get_invitation_view(get, _) ->
{200, format_invitation_view(emqx_mgmt_cluster:invitation_view())}.
get_invitation_status(get, _) ->
{200, format_invitation_status(emqx_mgmt_cluster:invitation_status())}.

force_leave(delete, #{bindings := #{node := Node0}}) ->
Node = ekka_node:parse_name(binary_to_list(Node0)),
Expand All @@ -344,7 +346,7 @@ connected_replicants() ->
error_message(Msg) ->
iolist_to_binary(io_lib:format("~p", [Msg])).

format_invitation_view(#{
format_invitation_status(#{
succeed := Succeed,
in_progress := InProgress,
failed := Failed
Expand Down
20 changes: 11 additions & 9 deletions apps/emqx_management/src/emqx_mgmt_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
%% APIs
-export([start_link/0]).

-export([invite_async/1, invitation_view/0]).
-export([invite_async/1, invitation_status/0]).

%% gen_server callbacks
-export([
Expand All @@ -42,17 +42,19 @@ start_link() ->

-spec invite_async(atom()) -> ok | ignore | {error, {already_started, pid()}}.
invite_async(Node) ->
JoinTo = node(),
%% Proxy the invitation task to the leader node
JoinTo = mria_membership:leader(),
case Node =/= JoinTo of
true ->
gen_server:call(?MODULE, {invite_async, Node, JoinTo}, infinity);
gen_server:call({?MODULE, JoinTo}, {invite_async, Node, JoinTo}, infinity);
false ->
ignore
end.

-spec invitation_view() -> map().
invitation_view() ->
gen_server:call(?MODULE, invitation_view, infinity).
-spec invitation_status() -> map().
invitation_status() ->
Leader = mria_membership:leader(),
gen_server:call({?MODULE, Leader}, invitation_status, infinity).

%%--------------------------------------------------------------------
%% gen_server callbacks
Expand All @@ -71,8 +73,8 @@ handle_call({invite_async, Node, JoinTo}, _From, State) ->
WorkerPid ->
{reply, {error, {already_started, WorkerPid}}, State}
end;
handle_call(invitation_view, _From, State) ->
{reply, state_to_invitation_view(State), State};
handle_call(invitation_status, _From, State) ->
{reply, state_to_invitation_status(State), State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
Expand Down Expand Up @@ -155,7 +157,7 @@ find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) ->
find_node_name_via_worker_pid(WorkerPid, maps:next(I))
end.

state_to_invitation_view(State) ->
state_to_invitation_status(State) ->
History = maps:get(history, State, #{}),
{Succ, Failed} = lists:foldl(
fun({Node, Task}, {SuccAcc, FailedAcc}) ->
Expand Down
8 changes: 4 additions & 4 deletions apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ t_cluster_invite_api_timeout(Config) ->
)
end,
?assertMatch(
{400, #{code := 'BAD_REQUEST', message := <<"timeout must be integer">>}},
{400, #{code := 'BAD_REQUEST', message := <<"timeout must be an integer">>}},
Invite(Core2, not_a_integer_timeout)
),
?assertMatch(
{400, #{code := 'BAD_REQUEST', message := <<"timeout can't less than 5000ms">>}},
{400, #{code := 'BAD_REQUEST', message := <<"timeout cannot be less than 5000ms">>}},
Invite(Core2, 3000)
),

Expand Down Expand Up @@ -240,7 +240,7 @@ t_cluster_invite_async(Config) ->
%% assert: core2 is in_progress status
?assertMatch(
{200, #{in_progress := [#{node := Core2}]}},
rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}])
rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}])
),

%% waiting the async invitation_succeed
Expand Down Expand Up @@ -295,7 +295,7 @@ waiting_the_async_invitation_succeed(Node, TargetNode, N) ->
in_progress := InProgress,
succeed := Succeed,
failed := Failed
}} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}]),
}} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_status, [get, #{}]),
case find_node_info_list(TargetNode, InProgress) of
error ->
case find_node_info_list(TargetNode, Succeed) of
Expand Down
5 changes: 5 additions & 0 deletions rel/i18n/emqx_mgmt_api_cluster.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ force_remove_node.desc:
force_remove_node.label:
"""Force leave node from cluster"""

get_invitation_status.desc:
"""Get the execution status of all asynchronous invite node tasks"""
get_invitation_status.label:
"""Get status of all invitation tasks"""

}

0 comments on commit 4c40e75

Please sign in to comment.