Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rebalance issues #10967

Merged
merged 3 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/emqx_node_rebalance/src/emqx_node_rebalance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
-type start_opts() :: #{
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_health_check => pos_integer(),
wait_takeover => pos_integer(),
wait_health_check => number(),
wait_takeover => number(),
abs_conn_threshold => pos_integer(),
rel_conn_threshold => number(),
abs_sess_threshold => pos_integer(),
Expand Down Expand Up @@ -438,7 +438,7 @@ is_node_available() ->
node().

all_nodes() ->
mria_mnesia:running_nodes().
emqx:running_nodes().

seconds(Sec) ->
round(timer:seconds(Sec)).
23 changes: 17 additions & 6 deletions apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ schema("/load_rebalance/:node/evacuation/stop") ->
}}.

'/load_rebalance/availability_check'(get, #{}) ->
case emqx_eviction_agent:status() of
case emqx_node_rebalance_status:local_status() of
disabled ->
{200, #{}};
{enabled, _Stats} ->
_ ->
error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
end.

Expand Down Expand Up @@ -258,11 +258,11 @@ wrap_rpc(Node, RPCResult) ->
{200, #{}};
{error, Reason} ->
error_response(
400, ?BAD_REQUEST, io_lib:format("error on node ~p: ~p", [Node, Reason])
400, ?BAD_REQUEST, binfmt("error on node ~p: ~p", [Node, Reason])
);
{badrpc, Reason} ->
error_response(
503, ?RPC_ERROR, io_lib:format("RPC error on node ~p: ~p", [Node, Reason])
503, ?RPC_ERROR, binfmt("RPC error on node ~p: ~p", [Node, Reason])
)
end.

Expand Down Expand Up @@ -299,9 +299,9 @@ with_nodes_at_key(Key, Params, Fun) ->
{ok, Params1} ->
Fun(Params1);
{error, {unavailable, Nodes}} ->
error_response(400, ?NOT_FOUND, io_lib:format("Nodes unavailable: ~p", [Nodes]));
error_response(400, ?NOT_FOUND, binfmt("Nodes unavailable: ~p", [Nodes]));
{error, {invalid, Nodes}} ->
error_response(400, ?BAD_REQUEST, io_lib:format("Invalid nodes: ~p", [Nodes]))
error_response(400, ?BAD_REQUEST, binfmt("Invalid nodes: ~p", [Nodes]))
end.

parse_node(Bin) when is_binary(Bin) ->
Expand Down Expand Up @@ -331,6 +331,8 @@ without(Keys, Props) ->
Props
).

binfmt(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).

%%------------------------------------------------------------------------------
%% Schema
%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -432,6 +434,14 @@ fields(rebalance_start) ->
];
fields(rebalance_evacuation_start) ->
[
{"wait_health_check",
mk(
emqx_schema:timeout_duration_s(),
#{
desc => ?DESC(wait_health_check),
required => false
}
)},
{"conn_evict_rate",
mk(
pos_integer(),
Expand Down Expand Up @@ -712,6 +722,7 @@ rebalance_example() ->

rebalance_evacuation_example() ->
#{
wait_health_check => 10,
conn_evict_rate => 100,
sess_evict_rate => 100,
redirect_to => <<"othernode:1883">>,
Expand Down
7 changes: 5 additions & 2 deletions apps/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ cli(_) ->
[
{
"rebalance start --evacuation \\\n"
" [--wait-health-check Secs] \\\n"
" [--redirect-to \"Host1:Port1 Host2:Port2 ...\"] \\\n"
" [--conn-evict-rate CountPerSec] \\\n"
" [--migrate-to \"node1@host1 node2@host2 ...\"] \\\n"
Expand Down Expand Up @@ -182,8 +183,6 @@ collect_args(["--migrate-to", MigrateTo | Args], Map) ->
%% rebalance
collect_args(["--nodes", Nodes | Args], Map) ->
collect_args(Args, Map#{"--nodes" => Nodes});
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
collect_args(["--abs-conn-threshold", AbsConnThres | Args], Map) ->
collect_args(Args, Map#{"--abs-conn-threshold" => AbsConnThres});
collect_args(["--rel-conn-threshold", RelConnThres | Args], Map) ->
Expand All @@ -193,6 +192,8 @@ collect_args(["--abs-sess-threshold", AbsSessThres | Args], Map) ->
collect_args(["--rel-sess-threshold", RelSessThres | Args], Map) ->
collect_args(Args, Map#{"--rel-sess-threshold" => RelSessThres});
%% common
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
collect_args(["--conn-evict-rate", ConnEvictRate | Args], Map) ->
collect_args(Args, Map#{"--conn-evict-rate" => ConnEvictRate});
collect_args(["--wait-takeover", WaitTakeover | Args], Map) ->
Expand All @@ -207,6 +208,8 @@ validate_evacuation([], Map) ->
{ok, Map};
validate_evacuation([{"--evacuation", _} | Rest], Map) ->
validate_evacuation(Rest, Map);
validate_evacuation([{"--wait-health-check", _} | _] = Opts, Map) ->
validate_pos_int(wait_health_check, Opts, Map, fun validate_evacuation/2);
validate_evacuation([{"--redirect-to", ServerReference} | Rest], Map) ->
validate_evacuation(Rest, Map#{server_reference => list_to_binary(ServerReference)});
validate_evacuation([{"--conn-evict-rate", _} | _] = Opts, Map) ->
Expand Down
66 changes: 44 additions & 22 deletions apps/emqx_node_rebalance/src/emqx_node_rebalance_evacuation.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer(),
migrate_to => migrate_to()
wait_takeover => number(),
migrate_to => migrate_to(),
wait_health_check => number()
}.
-type start_error() :: already_started | eviction_agent_busy.
-type start_error() :: already_started.
-type stats() :: #{
initial_conns := non_neg_integer(),
initial_sessions := non_neg_integer(),
Expand Down Expand Up @@ -97,7 +98,7 @@ available_nodes(Nodes) when is_list(Nodes) ->

callback_mode() -> handle_event_function.

%% states: disabled, evicting_conns, waiting_takeover, evicting_sessions, prohibiting
%% states: disabled, waiting_health_check, evicting_conns, waiting_takeover, evicting_sessions, prohibiting

init([]) ->
case emqx_node_rebalance_evacuation_persist:read(default_opts()) of
Expand All @@ -119,25 +120,20 @@ init([]) ->
%% start
handle_event(
{call, From},
{start, #{server_reference := ServerReference} = Opts},
{start, #{wait_health_check := WaitHealthCheck} = Opts},
disabled,
#{} = Data
) ->
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
ok ->
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
{next_state, evicting_conns, NewData, [
{state_timeout, 0, evict_conns},
{reply, From, ok}
]};
{error, eviction_agent_busy} ->
{keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]}
end;
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
{next_state, waiting_health_check, NewData, [
{state_timeout, seconds(WaitHealthCheck), start_eviction},
{reply, From, ok}
]};
handle_event({call, From}, {start, _Opts}, _State, #{}) ->
{keep_state_and_data, [{reply, From, {error, already_started}}]};
%% stop
Expand Down Expand Up @@ -167,6 +163,27 @@ handle_event({call, From}, status, State, #{migrate_to := MigrateTo} = Data) ->
{keep_state_and_data, [
{reply, From, {enabled, Stats#{state => State, migrate_to => migrate_to(MigrateTo)}}}
]};
%% start eviction
handle_event(
state_timeout,
start_eviction,
waiting_health_check,
#{server_reference := ServerReference} = Data
) ->
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
ok ->
?tp(debug, eviction_agent_started, #{
data => Data
}),
{next_state, evicting_conns, Data, [
{state_timeout, 0, evict_conns}
]};
{error, eviction_agent_busy} ->
?tp(warning, eviction_agent_busy, #{
data => Data
}),
{next_state, disabled, deinit(Data)}
end;
%% conn eviction
handle_event(
state_timeout,
Expand Down Expand Up @@ -270,12 +287,14 @@ default_opts() ->
conn_evict_rate => ?DEFAULT_CONN_EVICT_RATE,
sess_evict_rate => ?DEFAULT_SESS_EVICT_RATE,
wait_takeover => ?DEFAULT_WAIT_TAKEOVER,
wait_health_check => ?DEFAULT_WAIT_HEALTH_CHECK,
migrate_to => undefined
}.

init_data(Data0, Opts) ->
Data1 = maps:merge(Data0, Opts),
{enabled, #{connections := ConnCount, sessions := SessCount}} = emqx_eviction_agent:status(),
ConnCount = emqx_eviction_agent:connection_count(),
SessCount = emqx_eviction_agent:session_count(),
Data1#{
initial_conns => ConnCount,
current_conns => ConnCount,
Expand Down Expand Up @@ -305,4 +324,7 @@ is_node_available() ->
node().

all_nodes() ->
mria_mnesia:running_nodes() -- [node()].
emqx:running_nodes() -- [node()].

seconds(Sec) ->
round(timer:seconds(Sec)).
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,16 @@
%% APIs
%%--------------------------------------------------------------------

%% do not persist `migrate_to`:
%% * after restart there is nothing to migrate
%% * this value may be invalid after node was offline
-type persisted_start_opts() :: #{
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer()
}.
-type start_opts() :: #{
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer(),
migrate_to => emqx_node_rebalance_evacuation:migrate_to()
wait_takeover => number(),
migrate_to => emqx_node_rebalance_evacuation:migrate_to(),
wait_health_check => number()
}.

-spec save(persisted_start_opts()) -> ok_or_error(term()).
-spec save(start_opts()) -> ok_or_error(term()).
save(
#{
server_reference := ServerReference,
Expand All @@ -50,7 +42,7 @@ save(
(is_binary(ServerReference) orelse ServerReference =:= undefined) andalso
is_integer(ConnEvictRate) andalso ConnEvictRate > 0 andalso
is_integer(SessEvictRate) andalso SessEvictRate > 0 andalso
is_integer(WaitTakeover) andalso WaitTakeover >= 0
is_number(WaitTakeover) andalso WaitTakeover >= 0
->
Filepath = evacuation_filepath(),
case filelib:ensure_dir(Filepath) of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ t_start_evacuation_validation(Config) ->
#{sess_evict_rate => <<"sess">>},
#{redirect_to => 123},
#{wait_takeover => <<"wait">>},
#{wait_health_check => <<"wait">>},
#{migrate_to => []},
#{migrate_to => <<"migrate_to">>},
#{migrate_to => [<<"bad_node">>]},
Expand Down Expand Up @@ -103,6 +104,7 @@ t_start_evacuation_validation(Config) ->
conn_evict_rate => 10,
sess_evict_rate => 10,
wait_takeover => 10,
wait_health_check => 10,
redirect_to => <<"srv">>,
migrate_to => [atom_to_binary(RecipientNode)]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ end_per_testcase(_Case, Config) ->

t_agent_busy(Config) ->
[{DonorNode, _DonorPort}] = ?config(cluster_nodes, Config),

ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]),

?assertEqual(
{error, eviction_agent_busy},
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)])
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_busy},
5000
).

t_already_started(Config) ->
Expand All @@ -115,7 +117,12 @@ t_start(Config) ->

[{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),

ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_started},
5000
),

?assertMatch(
{error, {use_another_server, #{}}},
emqtt_try_connect([{port, DonorPort}])
Expand All @@ -126,7 +133,11 @@ t_persistence(Config) ->

[{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),

ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_started},
5000
),

?assertMatch(
{error, {use_another_server, #{}}},
Expand Down Expand Up @@ -179,7 +190,7 @@ t_conn_evicted(Config) ->
?assertWaitEvent(
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := node_evacuation_evict_conn},
1000
5000
),

?assertMatch(
Expand Down Expand Up @@ -251,6 +262,7 @@ opts(Config) ->
conn_evict_rate => 10,
sess_evict_rate => 10,
wait_takeover => 1,
wait_health_check => 1,
migrate_to => migrate_to(Config)
}.

Expand Down
3 changes: 3 additions & 0 deletions changes/ee/fix-10967.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed error message formatting in rebalance API: previously they could be displayed as unclear dumps of internal Erlang structures.

Added `wait_health_check` option to node evacuation CLI and API. This is a time interval when the node reports "unhealthy status" without beginning actual evacuation. We need this to allow a Load Balancer (if any) to remove the evacuated node from balancing and not forward (re)connecting clients to the evacuated node.
2 changes: 1 addition & 1 deletion rel/i18n/emqx_node_rebalance_api.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ param_node.label:
"""Node name"""

wait_health_check.desc:
"""Time to wait before starting the rebalance process, in seconds"""
"""Time to wait before starting the rebalance/evacuation process, in seconds"""

wait_health_check.label:
"""Wait health check"""
Expand Down