Skip to content

Commit

Permalink
Merge pull request #9136 from emqx/enhance-flapping-detect-5
Browse files Browse the repository at this point in the history
refactor: enhance the flapping detect accuracy
  • Loading branch information
HJianBo committed Oct 13, 2022
2 parents 5a66c8b + 59c1b0d commit 6f077c4
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGES-5.0.md
Expand Up @@ -4,6 +4,7 @@

* Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973)
* Use milliseconds internally in emqx_delayed to store the publish time, improving precision.[#9060](https://github.com/emqx/emqx/pull/9060)
* More rigorous checking of flapping to improve stability of the system. [#9136](https://github.com/emqx/emqx/pull/9136)

## Bug fixes

Expand Down
16 changes: 11 additions & 5 deletions apps/emqx/src/emqx_channel.erl
Expand Up @@ -345,7 +345,8 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
fun check_connect/2,
fun enrich_client/2,
fun set_log_meta/2,
fun check_banned/2
fun check_banned/2,
fun count_flapping_event/2
],
ConnPkt,
Channel#channel{conn_state = connecting}
Expand Down Expand Up @@ -1260,14 +1261,11 @@ handle_info(
{sock_closed, Reason},
Channel =
#channel{
conn_state = ConnState,
clientinfo = ClientInfo = #{zone := Zone}
conn_state = ConnState
}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
Expand Down Expand Up @@ -1636,6 +1634,14 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
false -> ok
end.

%%--------------------------------------------------------------------
%% Flapping

count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso
emqx_flapping:detect(ClientInfo),
{ok, Channel}.

%%--------------------------------------------------------------------
%% Authenticate

Expand Down
45 changes: 36 additions & 9 deletions apps/emqx/test/emqx_channel_SUITE.erl
Expand Up @@ -207,14 +207,6 @@ init_per_suite(Config) ->
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) -> {ok, #{is_superuser => false}} end
),
ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
%% Broker Meck
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
%% Hooks Meck
Expand All @@ -234,7 +226,6 @@ init_per_suite(Config) ->

end_per_suite(_Config) ->
meck:unload([
emqx_access_control,
emqx_metrics,
emqx_session,
emqx_broker,
Expand All @@ -244,11 +235,21 @@ end_per_suite(_Config) ->
]).

init_per_testcase(_TestCase, Config) ->
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) -> {ok, #{is_superuser => false}} end
),
ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
%% Set confs
OldConf = set_test_listener_confs(),
emqx_common_test_helpers:start_apps([]),
[{config, OldConf} | Config].

end_per_testcase(_TestCase, Config) ->
meck:unload([emqx_access_control]),
emqx_config:put(?config(config, Config)),
emqx_common_test_helpers:stop_apps([]),
Config.
Expand Down Expand Up @@ -1115,6 +1116,32 @@ t_ws_cookie_init(_) ->
),
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).

%%--------------------------------------------------------------------
%% Test cases for other mechnisms
%%--------------------------------------------------------------------

t_flapping_detect(_) ->
emqx_config:put_zone_conf(default, [flapping_detect, enable], true),
Parent = self(),
ok = meck:expect(
emqx_cm,
open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end
),
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
IdleChannel = channel(#{conn_state => idle}),
{shutdown, not_authorized, _ConnAck, _Channel} =
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
receive
flapping_detect -> ok
after 2000 ->
?assert(false, "Flapping detect should be exected in connecting progress")
end,
meck:unload([emqx_flapping]).

%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
Expand Down

0 comments on commit 6f077c4

Please sign in to comment.