Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cluster): supports inviting nodes to join the cluster in an asynchronous manner #12267

Merged
merged 9 commits into from
Jan 16, 2024
1 change: 1 addition & 0 deletions apps/emqx/priv/bpapi.versions
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}.
{emqx_mgmt_cluster,2}.
{emqx_mgmt_cluster,3}.
{emqx_mgmt_data_backup,1}.
{emqx_mgmt_trace,1}.
{emqx_mgmt_trace,2}.
Expand Down
159 changes: 153 additions & 6 deletions apps/emqx_management/src/emqx_mgmt_api_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
cluster_info/2,
cluster_topology/2,
invite_node/2,
invite_node_async/2,
get_invitation_status/2,
force_leave/2,
join/1,
connected_replicants/0
]).

-define(DEFAULT_INVITE_TIMEOUT, 15000).

namespace() -> "cluster".

api_spec() ->
Expand All @@ -40,7 +44,9 @@ paths() ->
[
"/cluster",
"/cluster/topology",
"/cluster/invitation",
"/cluster/:node/invite",
"/cluster/:node/invite_async",
"/cluster/:node/force_leave"
Comment on lines +47 to 50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add async option to invite instead of introduce a new api ?

].

Expand Down Expand Up @@ -70,13 +76,41 @@ schema("/cluster/topology") ->
}
}
};
schema("/cluster/invitation") ->
#{
'operationId' => get_invitation_status,
get => #{
desc => ?DESC(get_invitation_status),
tags => [<<"Cluster">>],
responses => #{
200 => ?HOCON(
?REF(invitation_status),
#{desc => <<"Get invitation progress created by async operation">>}
)
}
}
};
schema("/cluster/:node/invite") ->
#{
'operationId' => invite_node,
put => #{
desc => ?DESC(invite_node),
tags => [<<"Cluster">>],
parameters => [hoconsc:ref(node)],
'requestBody' => hoconsc:ref(timeout),
responses => #{
200 => <<"ok">>,
400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST'])
}
}
};
schema("/cluster/:node/invite_async") ->
#{
'operationId' => invite_node_async,
put => #{
desc => ?DESC(invite_node_async),
tags => [<<"Cluster">>],
parameters => [hoconsc:ref(node)],
responses => #{
200 => <<"ok">>,
400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST'])
Expand Down Expand Up @@ -131,6 +165,71 @@ fields(core_replicants) ->
#{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>}
)},
{replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))}
];
fields(timeout) ->
[
{timeout,
?HOCON(
non_neg_integer(),
#{desc => <<"Timeout in milliseconds">>, example => <<"15000">>}
)}
];
fields(invitation_status) ->
[
{succeed,
?HOCON(
?ARRAY(?REF(node_invitation_succeed)),
#{desc => <<"A list of information about nodes which are successfully invited">>}
)},
{in_progress,
?HOCON(
?ARRAY(?REF(node_invitation_in_progress)),
#{desc => <<"A list of information about nodes that are processing invitations">>}
)},
{failed,
?HOCON(
?ARRAY(?REF(node_invitation_failed)),
#{desc => <<"A list of information about nodes that failed to be invited">>}
)}
];
fields(node_invitation_failed) ->
fields(node_invitation_succeed) ++
[
{reason,
?HOCON(
binary(),
#{desc => <<"Failure reason">>, example => <<"Bad RPC to target node">>}
)}
];
fields(node_invitation_succeed) ->
fields(node_invitation_in_progress) ++
[
{finished_at,
?HOCON(
emqx_utils_calendar:epoch_millisecond(),
#{
desc =>
<<"The time of the async invitation result is received, millisecond precision epoch">>,
example => <<"1705044829915">>
}
)}
];
fields(node_invitation_in_progress) ->
[
{node,
?HOCON(
binary(),
#{desc => <<"Node name">>, example => <<"emqx2@127.0.0.1">>}
)},
{started_at,
?HOCON(
emqx_utils_calendar:epoch_millisecond(),
#{
desc =>
<<"The start timestamp of the invitation, millisecond precision epoch">>,
example => <<"1705044829915">>
}
)}
].

validate_node(Node) ->
Expand Down Expand Up @@ -188,19 +287,43 @@ running_cores() ->
Running = emqx:running_nodes(),
lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)).

invite_node(put, #{bindings := #{node := Node0}}) ->
invite_node(put, #{bindings := #{node := Node0}, body := Body}) ->
Node = ekka_node:parse_name(binary_to_list(Node0)),
case maps:get(<<"timeout">>, Body, ?DEFAULT_INVITE_TIMEOUT) of
T when not is_integer(T) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could that be verified with schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it really should. But for the moment, I haven't found a suitable example to check the format of a field request via schema. 😭

{400, #{code => 'BAD_REQUEST', message => <<"timeout must be an integer">>}};
T when T < 5000 ->
{400, #{code => 'BAD_REQUEST', message => <<"timeout cannot be less than 5000ms">>}};
Timeout ->
case emqx_mgmt_cluster_proto_v3:invite_node(Node, node(), Timeout) of
ok ->
{200};
ignore ->
{400, #{code => 'BAD_REQUEST', message => <<"Cannot invite self">>}};
{badrpc, Error} ->
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}};
{error, Error} ->
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}}
end
end.

invite_node_async(put, #{bindings := #{node := Node0}}) ->
Node = ekka_node:parse_name(binary_to_list(Node0)),
case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of
case emqx_mgmt_cluster:invite_async(Node) of
ok ->
{200};
ignore ->
{400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}};
{badrpc, Error} ->
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}};
{error, Error} ->
{400, #{code => 'BAD_REQUEST', message => error_message(Error)}}
{error, {already_started, _Pid}} ->
{400, #{
code => 'BAD_REQUEST',
message => <<"The invitation task already created for this node">>
}}
end.

get_invitation_status(get, _) ->
{200, format_invitation_status(emqx_mgmt_cluster:invitation_status())}.

force_leave(delete, #{bindings := #{node := Node0}}) ->
Node = ekka_node:parse_name(binary_to_list(Node0)),
case ekka:force_leave(Node) of
Expand All @@ -222,3 +345,27 @@ connected_replicants() ->

error_message(Msg) ->
iolist_to_binary(io_lib:format("~p", [Msg])).

format_invitation_status(#{
succeed := Succeed,
in_progress := InProgress,
failed := Failed
}) ->
#{
succeed => format_invitation_info(Succeed),
in_progress => format_invitation_info(InProgress),
failed => format_invitation_info(Failed)
}.

format_invitation_info(L) when is_list(L) ->
lists:map(
fun(Info) ->
Info1 = emqx_utils_maps:update_if_present(
started_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info
),
emqx_utils_maps:update_if_present(
finished_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info1
)
end,
L
).