Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize flapping and banned modules #3033

Merged
merged 2 commits into from Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 5 additions & 7 deletions include/emqx.hrl
Expand Up @@ -154,15 +154,13 @@
%% Banned
%%--------------------------------------------------------------------

-type(banned_who() :: {clientid, binary()}
| {username, binary()}
| {ip_address, inet:ip_address()}).

-record(banned, {
who :: banned_who(),
reason :: binary(),
who :: {clientid, binary()}
| {username, binary()}
| {ip_address, inet:ip_address()},
by :: binary(),
desc :: binary(),
reason :: binary(),
at :: integer(),
until :: integer()
}).

Expand Down
40 changes: 29 additions & 11 deletions src/emqx_banned.erl
Expand Up @@ -33,7 +33,7 @@
-export([start_link/0, stop/0]).

-export([ check/1
, add/1
, create/1
, delete/1
, info/1
]).
Expand Down Expand Up @@ -74,21 +74,39 @@ start_link() ->
stop() -> gen_server:stop(?MODULE).

-spec(check(emqx_types:clientinfo()) -> boolean()).
check(#{clientid := ClientId,
username := Username,
peerhost := IPAddr}) ->
ets:member(?BANNED_TAB, {clientid, ClientId})
orelse ets:member(?BANNED_TAB, {username, Username})
orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).

-spec(add(emqx_types:banned()) -> ok).
add(Banned) when is_record(Banned, banned) ->
check(ClientInfo) ->
do_check({clientid, maps:get(clientid, ClientInfo, undefined)})
orelse do_check({username, maps:get(username, ClientInfo, undefined)})
orelse do_check({peerhost, maps:get(peerhost, ClientInfo, undefined)}).

do_check({_, undefined}) ->
false;
do_check(Who) when is_tuple(Who) ->
case mnesia:dirty_read(?BANNED_TAB, Who) of
[] -> false;
[#banned{until = Until}] ->
Until > erlang:system_time(millisecond)
end.

-spec(create(emqx_types:banned()) -> ok).
create(#{who := Who,
by := By,
reason := Reason,
at := At,
until := Until}) ->
mnesia:dirty_write(?BANNED_TAB, #banned{who = Who,
by = By,
reason = Reason,
at = At,
until = Until});
create(Banned) when is_record(Banned, banned) ->
mnesia:dirty_write(?BANNED_TAB, Banned).

-spec(delete({clientid, emqx_types:clientid()}
| {username, emqx_types:username()}
| {peerhost, emqx_types:peerhost()}) -> ok).
delete(Key) -> mnesia:dirty_delete(?BANNED_TAB, Key).
delete(Who) ->
mnesia:dirty_delete(?BANNED_TAB, Who).

info(InfoKey) ->
mnesia:table_info(?BANNED_TAB, InfoKey).
Expand Down
10 changes: 1 addition & 9 deletions src/emqx_channel.erl
Expand Up @@ -219,7 +219,6 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
fun enrich_client/2,
fun set_logger_meta/2,
fun check_banned/2,
fun check_flapping/2,
fun auth_connect/2], ConnPkt, Channel) of
{ok, NConnPkt, NChannel} ->
process_connect(NConnPkt, NChannel);
Expand Down Expand Up @@ -942,7 +941,7 @@ set_logger_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId).

%%--------------------------------------------------------------------
%% Check banned/flapping
%% Check banned
%%--------------------------------------------------------------------

check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
Expand All @@ -951,13 +950,6 @@ check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
false -> ok
end.

check_flapping(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
case emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:check(ClientInfo) of
true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
false -> ok
end.

%%--------------------------------------------------------------------
%% Auth Connect
%%--------------------------------------------------------------------
Expand Down
91 changes: 19 additions & 72 deletions src/emqx_flapping.erl
Expand Up @@ -27,7 +27,7 @@
-export([start_link/0, stop/0]).

%% API
-export([check/1, detect/1]).
-export([detect/1]).

%% gen_server callbacks
-export([ init/1
Expand All @@ -54,8 +54,7 @@
clientid :: emqx_types:clientid(),
peerhost :: emqx_types:peerhost(),
started_at :: pos_integer(),
detect_cnt :: pos_integer(),
banned_at :: pos_integer()
detect_cnt :: pos_integer()
}).

-opaque(flapping() :: #flapping{}).
Expand All @@ -68,27 +67,14 @@ start_link() ->

stop() -> gen_server:stop(?MODULE).

%% @doc Check flapping when a MQTT client connected.
-spec(check(emqx_types:clientinfo()) -> boolean()).
check(#{clientid := ClientId}) ->
check(ClientId, get_policy()).

check(ClientId, #{banned_interval := Interval}) ->
case ets:lookup(?FLAPPING_TAB, {banned, ClientId}) of
[] -> false;
[#flapping{banned_at = BannedAt}] ->
now_diff(BannedAt) < Interval
end.

%% @doc Detect flapping when a MQTT client disconnected.
-spec(detect(emqx_types:clientinfo()) -> boolean()).
detect(Client) -> detect(Client, get_policy()).

detect(#{clientid := ClientId, peerhost := PeerHost},
Policy = #{threshold := Threshold}) ->
detect(#{clientid := ClientId, peerhost := PeerHost}, Policy = #{threshold := Threshold}) ->
try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of
Cnt when Cnt < Threshold -> false;
_Cnt -> case ets:lookup(?FLAPPING_TAB, ClientId) of
_Cnt -> case ets:take(?FLAPPING_TAB, ClientId) of
[Flapping] ->
ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}),
true;
Expand Down Expand Up @@ -118,52 +104,44 @@ now_diff(TS) -> erlang:system_time(millisecond) - TS.
%%--------------------------------------------------------------------

init([]) ->
#{duration := Duration, banned_interval := Interval} = get_policy(),
ok = emqx_tables:new(?FLAPPING_TAB, [public, set,
{keypos, 2},
{read_concurrency, true},
{write_concurrency, true}
]),
State = #{time => max(Duration, Interval) + 1, tref => undefined},
{ok, ensure_timer(State), hibernate}.
{ok, #{}, hibernate}.

handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.

handle_cast({detected, Flapping = #flapping{clientid = ClientId,
peerhost = PeerHost,
started_at = StartedAt,
detect_cnt = DetectCnt},
#{duration := Duration}}, State) ->
case (Interval = now_diff(StartedAt)) < Duration of
handle_cast({detected, #flapping{clientid = ClientId,
peerhost = PeerHost,
started_at = StartedAt,
detect_cnt = DetectCnt},
#{duration := Duration, banned_interval := Interval}}, State) ->
case now_diff(StartedAt) < Duration of
true -> %% Flapping happened:(
%% Log first
?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Duration]),
%% Banned.
BannedFlapping = Flapping#flapping{clientid = {banned, ClientId},
banned_at = erlang:system_time(millisecond)
},
alarm_handler:set_alarm({{flapping_detected, ClientId}, BannedFlapping}),
ets:insert(?FLAPPING_TAB, BannedFlapping);
Now = erlang:system_time(millisecond),
Banned = #banned{who = {clientid, ClientId},
by = <<"flapping detector">>,
reason = <<"flapping is detected">>,
at = Now,
until = Now + Interval},
alarm_handler:set_alarm({{flapping_detected, ClientId}, Banned}),
emqx_banned:create(Banned);
false ->
?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
[ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval])
end,
ets:delete_object(?FLAPPING_TAB, Flapping),
{noreply, State};

handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.

handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) ->
with_flapping_tab(fun expire_flapping/2,
[erlang:system_time(millisecond),
get_policy()]),
{noreply, ensure_timer(State#{tref => undefined}), hibernate};

handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
Expand All @@ -173,34 +151,3 @@ terminate(_Reason, _State) ->

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

ensure_timer(State = #{time := Time, tref := undefined}) ->
State#{tref => emqx_misc:start_timer(Time, expire_flapping)};
ensure_timer(State) -> State.

with_flapping_tab(Fun, Args) ->
case ets:info(?FLAPPING_TAB, size) of
undefined -> ok;
0 -> ok;
_Size -> erlang:apply(Fun, Args)
end.

expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) ->
case ets:select(?FLAPPING_TAB,
[{#flapping{started_at = '$1', banned_at = undefined, _ = '_'},
[{'<', '$1', NowTime-Duration}], ['$_']},
{#flapping{clientid = {banned, '_'}, banned_at = '$1', _ = '_'},
[{'<', '$1', NowTime-Interval}], ['$_']}]) of
[] -> ok;
Flappings ->
lists:foreach(fun(Flapping = #flapping{clientid = {banned, ClientId}}) ->
ets:delete_object(?FLAPPING_TAB, Flapping),
alarm_handler:clear_alarm({flapping_detected, ClientId});
(_) -> ok
end, Flappings)
end.

19 changes: 9 additions & 10 deletions test/emqx_banned_SUITE.erl
Expand Up @@ -38,20 +38,20 @@ end_per_suite(_Config) ->

t_add_delete(_) ->
Banned = #banned{who = {clientid, <<"TestClient">>},
reason = <<"test">>,
by = <<"banned suite">>,
desc = <<"test">>,
reason = <<"test">>,
at = erlang:system_time(second),
until = erlang:system_time(second) + 1000
},
ok = emqx_banned:add(Banned),
ok = emqx_banned:create(Banned),
?assertEqual(1, emqx_banned:info(size)),
ok = emqx_banned:delete({clientid, <<"TestClient">>}),
?assertEqual(0, emqx_banned:info(size)).

t_check(_) ->
ok = emqx_banned:add(#banned{who = {clientid, <<"BannedClient">>}}),
ok = emqx_banned:add(#banned{who = {username, <<"BannedUser">>}}),
ok = emqx_banned:add(#banned{who = {ipaddr, {192,168,0,1}}}),
ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}}),
ok = emqx_banned:create(#banned{who = {username, <<"BannedUser">>}}),
ok = emqx_banned:create(#banned{who = {peerhost, {192,168,0,1}}}),
?assertEqual(3, emqx_banned:info(size)),
ClientInfo1 = #{clientid => <<"BannedClient">>,
username => <<"user">>,
Expand All @@ -75,7 +75,7 @@ t_check(_) ->
?assertNot(emqx_banned:check(ClientInfo4)),
ok = emqx_banned:delete({clientid, <<"BannedClient">>}),
ok = emqx_banned:delete({username, <<"BannedUser">>}),
ok = emqx_banned:delete({ipaddr, {192,168,0,1}}),
ok = emqx_banned:delete({peerhost, {192,168,0,1}}),
?assertNot(emqx_banned:check(ClientInfo1)),
?assertNot(emqx_banned:check(ClientInfo2)),
?assertNot(emqx_banned:check(ClientInfo3)),
Expand All @@ -84,9 +84,8 @@ t_check(_) ->

t_unused(_) ->
{ok, Banned} = emqx_banned:start_link(),
ok = emqx_banned:add(#banned{who = {clientid, <<"BannedClient">>},
until = erlang:system_time(second)
}),
ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>},
until = erlang:system_time(second)}),
?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
?assertEqual(ok, Banned ! ok),
Expand Down
10 changes: 5 additions & 5 deletions test/emqx_flapping_SUITE.erl
Expand Up @@ -45,13 +45,13 @@ t_detect_check(_) ->
peerhost => {127,0,0,1}
},
false = emqx_flapping:detect(ClientInfo),
false = emqx_flapping:check(ClientInfo),
false = emqx_banned:check(ClientInfo),
false = emqx_flapping:detect(ClientInfo),
false = emqx_flapping:check(ClientInfo),
false = emqx_banned:check(ClientInfo),
true = emqx_flapping:detect(ClientInfo),
timer:sleep(100),
true = emqx_flapping:check(ClientInfo),
timer:sleep(300),
false = emqx_flapping:check(ClientInfo),
true = emqx_banned:check(ClientInfo),
timer:sleep(200),
false = emqx_banned:check(ClientInfo),
ok = emqx_flapping:stop().