Skip to content

Commit

Permalink
Merge pull request #10668 from lafirest/fix/max_conn_rate
Browse files Browse the repository at this point in the history
fix(limiter): fix an error when setting `max_conn_rate` in a listener
  • Loading branch information
lafirest committed May 11, 2023
2 parents fbd516d + b712625 commit d3a7d6d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
3 changes: 2 additions & 1 deletion apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
Expand Up @@ -286,7 +286,8 @@ default_client_config() ->
default_bucket_config() ->
#{
rate => infinity,
burst => 0
burst => 0,
initial => 0
}.

get_listener_opts(Conf) ->
Expand Down
49 changes: 28 additions & 21 deletions apps/emqx/src/emqx_listeners.erl
Expand Up @@ -347,7 +347,8 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
Type == tcp; Type == ssl
->
Id = listener_id(Type, ListenerName),
add_limiter_bucket(Id, Opts),
Limiter = limiter(Opts),
add_limiter_bucket(Id, Limiter),
esockd:open(
Id,
ListenOn,
Expand All @@ -356,7 +357,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
#{
listener => {Type, ListenerName},
zone => zone(Opts),
limiter => limiter(Opts),
limiter => Limiter,
enable_authn => enable_authn(Opts)
}
]}
Expand All @@ -366,9 +367,10 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
Type == ws; Type == wss
->
Id = listener_id(Type, ListenerName),
add_limiter_bucket(Id, Opts),
Limiter = limiter(Opts),
add_limiter_bucket(Id, Limiter),
RanchOpts = ranch_opts(Type, ListenOn, Opts),
WsOpts = ws_opts(Type, ListenerName, Opts),
WsOpts = ws_opts(Type, ListenerName, Opts, Limiter),
case Type of
ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
Expand Down Expand Up @@ -415,20 +417,22 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
Password -> [{password, str(Password)}]
end ++
optional_quic_listener_opts(Opts),
Limiter = limiter(Opts),
ConnectionOpts = #{
conn_callback => emqx_quic_connection,
peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
zone => zone(Opts),
listener => {quic, ListenerName},
limiter => limiter(Opts)
limiter => Limiter
},
StreamOpts = #{
stream_callback => emqx_quic_stream,
active => 1
},

Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Opts),
add_limiter_bucket(Id, Limiter),
quicer:start_listener(
Id,
ListenOn,
Expand Down Expand Up @@ -532,12 +536,12 @@ esockd_opts(ListenerId, Type, Opts0) ->
end
).

ws_opts(Type, ListenerName, Opts) ->
ws_opts(Type, ListenerName, Opts, Limiter) ->
WsPaths = [
{emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
zone => zone(Opts),
listener => {Type, ListenerName},
limiter => limiter(Opts),
limiter => Limiter,
enable_authn => enable_authn(Opts)
}}
],
Expand Down Expand Up @@ -653,26 +657,29 @@ zone(Opts) ->
limiter(Opts) ->
emqx_limiter_schema:get_listener_opts(Opts).

add_limiter_bucket(Id, #{limiter := Limiter}) ->
add_limiter_bucket(_Id, undefined) ->
ok;
add_limiter_bucket(Id, Limiter) ->
maps:fold(
fun(Type, Cfg, _) ->
emqx_limiter_server:add_bucket(Id, Type, Cfg)
end,
ok,
maps:without([client], Limiter)
);
add_limiter_bucket(_Id, _Cfg) ->
ok.
).

del_limiter_bucket(Id, #{limiter := Limiters}) ->
lists:foreach(
fun(Type) ->
emqx_limiter_server:del_bucket(Id, Type)
end,
maps:keys(Limiters)
);
del_limiter_bucket(_Id, _Cfg) ->
ok.
del_limiter_bucket(Id, Conf) ->
case limiter(Conf) of
undefined ->
ok;
Limiter ->
lists:foreach(
fun(Type) ->
emqx_limiter_server:del_bucket(Id, Type)
end,
maps:keys(Limiter)
)
end.

enable_authn(Opts) ->
maps:get(enable_authn, Opts, true).
Expand Down

0 comments on commit d3a7d6d

Please sign in to comment.