diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index 9cc2d508cc..8d3c1e61d9 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -4,6 +4,7 @@ * Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973) * Use milliseconds internally in emqx_delayed to store the publish time, improving precision.[#9060](https://github.com/emqx/emqx/pull/9060) +* More rigorous checking of flapping to improve stability of the system. [#9136](https://github.com/emqx/emqx/pull/9136) ## Bug fixes diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 742868694b..494c77ec7a 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -345,7 +345,8 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) -> fun check_connect/2, fun enrich_client/2, fun set_log_meta/2, - fun check_banned/2 + fun check_banned/2, + fun count_flapping_event/2 ], ConnPkt, Channel#channel{conn_state = connecting} @@ -1260,14 +1261,11 @@ handle_info( {sock_closed, Reason}, Channel = #channel{ - conn_state = ConnState, - clientinfo = ClientInfo = #{zone := Zone} + conn_state = ConnState } ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> - emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso - emqx_flapping:detect(ClientInfo), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; @@ -1636,6 +1634,14 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) -> false -> ok end. +%%-------------------------------------------------------------------- +%% Flapping + +count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> + emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso + emqx_flapping:detect(ClientInfo), + {ok, Channel}. + %%-------------------------------------------------------------------- %% Authenticate diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index df1720772a..a3fa3e5bc1 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -207,14 +207,6 @@ init_per_suite(Config) -> ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end), - %% Access Control Meck - ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), - ok = meck:expect( - emqx_access_control, - authenticate, - fun(_) -> {ok, #{is_superuser => false}} end - ), - ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end), %% Broker Meck ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), %% Hooks Meck @@ -234,7 +226,6 @@ init_per_suite(Config) -> end_per_suite(_Config) -> meck:unload([ - emqx_access_control, emqx_metrics, emqx_session, emqx_broker, @@ -244,11 +235,21 @@ end_per_suite(_Config) -> ]). init_per_testcase(_TestCase, Config) -> + %% Access Control Meck + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect( + emqx_access_control, + authenticate, + fun(_) -> {ok, #{is_superuser => false}} end + ), + ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end), + %% Set confs OldConf = set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), [{config, OldConf} | Config]. end_per_testcase(_TestCase, Config) -> + meck:unload([emqx_access_control]), emqx_config:put(?config(config, Config)), emqx_common_test_helpers:stop_apps([]), Config. @@ -1115,6 +1116,32 @@ t_ws_cookie_init(_) -> ), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). +%%-------------------------------------------------------------------- +%% Test cases for other mechnisms +%%-------------------------------------------------------------------- + +t_flapping_detect(_) -> + emqx_config:put_zone_conf(default, [flapping_detect, enable], true), + Parent = self(), + ok = meck:expect( + emqx_cm, + open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end + ), + ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end), + ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end), + IdleChannel = channel(#{conn_state => idle}), + {shutdown, not_authorized, _ConnAck, _Channel} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel), + receive + flapping_detect -> ok + after 2000 -> + ?assert(false, "Flapping detect should be exected in connecting progress") + end, + meck:unload([emqx_flapping]). + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------