From b7126257a5744353ec1bb27a40d7ae0cbe1c7898 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 11 May 2023 13:52:13 +0800 Subject: [PATCH] fix(limiter): fix an error when setting `max_conn_rate` in a listener --- .../emqx_limiter/src/emqx_limiter_schema.erl | 3 +- apps/emqx/src/emqx_listeners.erl | 49 +++++++++++-------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index a4f7d5b897..81358fdfe8 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -286,7 +286,8 @@ default_client_config() -> default_bucket_config() -> #{ rate => infinity, - burst => 0 + burst => 0, + initial => 0 }. get_listener_opts(Conf) -> diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index d56e03a1bc..2b80000dca 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -347,7 +347,8 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when Type == tcp; Type == ssl -> Id = listener_id(Type, ListenerName), - add_limiter_bucket(Id, Opts), + Limiter = limiter(Opts), + add_limiter_bucket(Id, Limiter), esockd:open( Id, ListenOn, @@ -356,7 +357,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when #{ listener => {Type, ListenerName}, zone => zone(Opts), - limiter => limiter(Opts), + limiter => Limiter, enable_authn => enable_authn(Opts) } ]} @@ -366,9 +367,10 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when Type == ws; Type == wss -> Id = listener_id(Type, ListenerName), - add_limiter_bucket(Id, Opts), + Limiter = limiter(Opts), + add_limiter_bucket(Id, Limiter), RanchOpts = ranch_opts(Type, ListenOn, Opts), - WsOpts = ws_opts(Type, ListenerName, Opts), + WsOpts = ws_opts(Type, ListenerName, Opts, Limiter), case Type of ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) @@ -415,20 +417,22 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> Password -> [{password, str(Password)}] end ++ optional_quic_listener_opts(Opts), + Limiter = limiter(Opts), ConnectionOpts = #{ conn_callback => emqx_quic_connection, peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), zone => zone(Opts), listener => {quic, ListenerName}, - limiter => limiter(Opts) + limiter => Limiter }, StreamOpts = #{ stream_callback => emqx_quic_stream, active => 1 }, + Id = listener_id(quic, ListenerName), - add_limiter_bucket(Id, Opts), + add_limiter_bucket(Id, Limiter), quicer:start_listener( Id, ListenOn, @@ -532,12 +536,12 @@ esockd_opts(ListenerId, Type, Opts0) -> end ). -ws_opts(Type, ListenerName, Opts) -> +ws_opts(Type, ListenerName, Opts, Limiter) -> WsPaths = [ {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{ zone => zone(Opts), listener => {Type, ListenerName}, - limiter => limiter(Opts), + limiter => Limiter, enable_authn => enable_authn(Opts) }} ], @@ -653,26 +657,29 @@ zone(Opts) -> limiter(Opts) -> emqx_limiter_schema:get_listener_opts(Opts). -add_limiter_bucket(Id, #{limiter := Limiter}) -> +add_limiter_bucket(_Id, undefined) -> + ok; +add_limiter_bucket(Id, Limiter) -> maps:fold( fun(Type, Cfg, _) -> emqx_limiter_server:add_bucket(Id, Type, Cfg) end, ok, maps:without([client], Limiter) - ); -add_limiter_bucket(_Id, _Cfg) -> - ok. + ). -del_limiter_bucket(Id, #{limiter := Limiters}) -> - lists:foreach( - fun(Type) -> - emqx_limiter_server:del_bucket(Id, Type) - end, - maps:keys(Limiters) - ); -del_limiter_bucket(_Id, _Cfg) -> - ok. +del_limiter_bucket(Id, Conf) -> + case limiter(Conf) of + undefined -> + ok; + Limiter -> + lists:foreach( + fun(Type) -> + emqx_limiter_server:del_bucket(Id, Type) + end, + maps:keys(Limiter) + ) + end. enable_authn(Opts) -> maps:get(enable_authn, Opts, true).