Skip to content

Commit

Permalink
fix(listen): ensure limiter server state consistent with updates
Browse files Browse the repository at this point in the history
  • Loading branch information
keynslug committed Dec 19, 2023
1 parent 4796f85 commit 7e40496
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions apps/emqx/src/emqx_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,25 @@ start_listener(ListenerId) ->
apply_on_listener(ListenerId, fun start_listener/3).

-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) ->
case do_start_listener(Type, ListenerName, Conf) of
start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) ->
ListenerId = listener_id(Type, Name),
Limiter = limiter(Conf),
ok = add_limiter_bucket(ListenerId, Limiter),
case do_start_listener(Type, Name, ListenerId, Conf) of
{ok, {skipped, Reason}} when
Reason =:= quic_app_missing
->
?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
console_print(
"Listener ~ts is NOT started due to: ~p.~n",
[listener_id(Type, ListenerName), Reason]
[ListenerId, Reason]
),
ok;
{ok, _} ->
?tp(listener_started, #{type => Type, bind => Bind}),
console_print(
"Listener ~ts on ~ts started.~n",
[listener_id(Type, ListenerName), format_bind(Bind)]
[ListenerId, format_bind(Bind)]
),
ok;
{error, {already_started, Pid}} ->
Expand All @@ -252,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) ->
}),
{error, {already_started, Pid}};
{error, Reason} ->
ok = del_limiter_bucket(ListenerId, Limiter),
?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}),
ListenerId = listener_id(Type, ListenerName),
BindStr = format_bind(Bind),
?ELOG(
"Failed to start listener ~ts on ~ts: ~0p.~n",
Expand All @@ -267,10 +270,10 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) ->
),
{error, {failed_to_start, Msg}}
end;
start_listener(Type, ListenerName, #{enable := false}) ->
start_listener(Type, Name, #{enable := false}) ->
console_print(
"Listener ~ts is NOT started due to: disabled.~n",
[listener_id(Type, ListenerName)]
[listener_id(Type, Name)]
),
ok.

Expand All @@ -294,6 +297,8 @@ update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) ->
update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) ->
start_listener(Type, Name, Conf);
update_listener(Type, Name, OldConf, NewConf) ->
Id = listener_id(Type, Name),
ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)),
case do_update_listener(Type, Name, OldConf, NewConf) of
ok ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
Expand Down Expand Up @@ -325,7 +330,7 @@ stop_listener(ListenerId) ->

stop_listener(Type, Name, #{bind := Bind} = Conf) ->
Id = listener_id(Type, Name),
ok = del_limiter_bucket(Id, Conf),
ok = del_limiter_bucket(Id, limiter(Conf)),
ok = unregister_ocsp_stapling_refresh(Type, Name),
case do_stop_listener(Type, Id, Conf) of
ok ->
Expand Down Expand Up @@ -387,29 +392,25 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
console_print(_Fmt, _Args) -> ok.
-endif.

-spec do_start_listener(listener_type(), atom(), map()) ->
-spec do_start_listener(listener_type(), atom(), listener_id(), map()) ->
{ok, pid() | {skipped, atom()}} | {error, term()}.
%% Start MQTT/TCP listener
do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) ->
Id = listener_id(Type, Name),
ok = add_limiter_bucket(Id, limiter(Opts)),
do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) ->
esockd:open(
Id,
ListenOn,
merge_default(esockd_opts(Id, Type, Name, Opts))
);
%% Start MQTT/WS listener
do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) ->
Id = listener_id(Type, Name),
ok = add_limiter_bucket(Id, limiter(Opts)),
do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
RanchOpts = ranch_opts(Type, Opts),
WsOpts = ws_opts(Type, Name, Opts),
case Type of
ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
end;
%% Start MQTT/QUIC listener
do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
ListenOn =
case Bind of
{Addr, Port} when tuple_size(Addr) == 4 ->
Expand Down Expand Up @@ -459,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
zone => zone(Opts),
listener => {quic, ListenerName},
listener => {quic, Name},
limiter => Limiter
},
StreamOpts = #{
stream_callback => emqx_quic_stream,
active => 1
},

Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Limiter),
quicer:spawn_listener(
Id,
ListenOn,
Expand Down Expand Up @@ -745,18 +743,24 @@ add_limiter_bucket(Id, Limiter) ->
maps:without([client], Limiter)
).

del_limiter_bucket(Id, Conf) ->
case limiter(Conf) of
undefined ->
ok;
Limiter ->
lists:foreach(
fun(Type) ->
emqx_limiter_server:del_bucket(Id, Type)
end,
maps:keys(Limiter)
)
end.
del_limiter_bucket(_Id, undefined) ->
ok;
del_limiter_bucket(Id, Limiter) ->
maps:foreach(
fun(Type, _) ->
emqx_limiter_server:del_bucket(Id, Type)
end,
Limiter
).

update_limiter_bucket(Id, Limiter, undefined) ->
del_limiter_bucket(Id, Limiter);
update_limiter_bucket(Id, undefined, Limiter) ->
add_limiter_bucket(Id, Limiter);
update_limiter_bucket(Id, OldLimiter, NewLimiter) ->
ok = add_limiter_bucket(Id, NewLimiter),
Outdated = maps:without(maps:keys(NewLimiter), OldLimiter),
del_limiter_bucket(Id, Outdated).

diff_confs(NewConfs, OldConfs) ->
emqx_utils:diff_lists(
Expand Down

0 comments on commit 7e40496

Please sign in to comment.