Skip to content

Commit

Permalink
Merge pull request #10625 from lafirest/refactor/limiter_cfg
Browse files Browse the repository at this point in the history
refactor(limiter): simplify limiter configuration
  • Loading branch information
lafirest committed May 9, 2023
2 parents 2df9410 + 197ebcc commit b94290d
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 35 deletions.
101 changes: 91 additions & 10 deletions apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
get_bucket_cfg_path/2,
desc/1,
types/0,
short_paths/0,
calc_capacity/1,
extract_with_type/2,
default_client_config/0
default_client_config/0,
short_paths_fields/1,
get_listener_opts/1,
get_node_opts/1
]).

-define(KILOBYTE, 1024).
Expand Down Expand Up @@ -104,15 +108,17 @@ roots() ->
].

fields(limiter) ->
[
{Type,
?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type),
importance => ?IMPORTANCE_HIDDEN,
aliases => alias_of_type(Type)
})}
|| Type <- types()
] ++
short_paths_fields(?MODULE) ++
[
{Type,
?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type),
importance => ?IMPORTANCE_HIDDEN,
required => {false, recursively},
aliases => alias_of_type(Type)
})}
|| Type <- types()
] ++
[
%% This is an undocumented feature, and it won't be support anymore
{client,
Expand Down Expand Up @@ -203,6 +209,14 @@ fields(listener_client_fields) ->
fields(Type) ->
simple_bucket_field(Type).

short_paths_fields(DesModule) ->
[
{Name,
?HOCON(rate(), #{desc => ?DESC(DesModule, Name), required => false, example => Example})}
|| {Name, Example} <-
lists:zip(short_paths(), [<<"1000/s">>, <<"1000/s">>, <<"100MB/s">>])
].

desc(limiter) ->
"Settings for the rate limiter.";
desc(node_opts) ->
Expand Down Expand Up @@ -236,6 +250,9 @@ get_bucket_cfg_path(Type, BucketName) ->
types() ->
[bytes, messages, connection, message_routing, internal].

short_paths() ->
[max_conn_rate, messages_rate, bytes_rate].

calc_capacity(#{rate := infinity}) ->
infinity;
calc_capacity(#{rate := Rate, burst := Burst}) ->
Expand Down Expand Up @@ -266,6 +283,31 @@ default_client_config() ->
failure_strategy => force
}.

default_bucket_config() ->
#{
rate => infinity,
burst => 0
}.

get_listener_opts(Conf) ->
Limiter = maps:get(limiter, Conf, undefined),
ShortPaths = maps:with(short_paths(), Conf),
get_listener_opts(Limiter, ShortPaths).

get_node_opts(Type) ->
Opts = emqx:get_config([limiter, Type], default_bucket_config()),
case type_to_short_path_name(Type) of
undefined ->
Opts;
Name ->
case emqx:get_config([limiter, Name], undefined) of
undefined ->
Opts;
Rate ->
Opts#{rate := Rate}
end
end.

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -476,3 +518,42 @@ merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.

short_path_name_to_type(max_conn_rate) ->
connection;
short_path_name_to_type(messages_rate) ->
messages;
short_path_name_to_type(bytes_rate) ->
bytes.

type_to_short_path_name(connection) ->
max_conn_rate;
type_to_short_path_name(messages) ->
messages_rate;
type_to_short_path_name(bytes) ->
bytes_rate;
type_to_short_path_name(_) ->
undefined.

get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
Limiter;
get_listener_opts(undefined, ShortPaths) ->
convert_listener_short_paths(ShortPaths);
get_listener_opts(Limiter, ShortPaths) ->
Shorts = convert_listener_short_paths(ShortPaths),
emqx_utils_maps:deep_merge(Limiter, Shorts).

convert_listener_short_paths(ShortPaths) ->
DefBucket = default_bucket_config(),
DefClient = default_client_config(),
Fun = fun(Name, Rate, Acc) ->
Type = short_path_name_to_type(Name),
case Name of
max_conn_rate ->
Acc#{Type => DefBucket#{rate => Rate}};
_ ->
Client = maps:get(client, Acc, #{}),
Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
end
end,
maps:fold(Fun, #{}, ShortPaths).
9 changes: 3 additions & 6 deletions apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->

-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
init_tree(Type) when is_atom(Type) ->
Cfg = emqx:get_config([limiter, Type]),
Cfg = emqx_limiter_schema:get_node_opts(Type),
init_tree(Type, Cfg).

init_tree(Type, #{rate := Rate} = Cfg) ->
Expand Down Expand Up @@ -625,13 +625,10 @@ find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity -
{error, invalid_bucket}
end;
%% this is a node-level reference
find_referenced_bucket(Id, Type, _) ->
case emqx:get_config([limiter, Type], undefined) of
find_referenced_bucket(_Id, Type, _) ->
case emqx_limiter_schema:get_node_opts(Type) of
#{rate := infinity} ->
false;
undefined ->
?SLOG(error, #{msg => "invalid limiter type", type => Type, id => Id}),
{error, invalid_bucket};
NodeCfg ->
{ok, Bucket} = emqx_limiter_manager:find_root(Type),
{ok, Bucket, NodeCfg}
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ init([]) ->
%% Internal functions
%%--==================================================================
make_child(Type) ->
Cfg = emqx:get_config([limiter, Type]),
Cfg = emqx_limiter_schema:get_node_opts(Type),
make_child(Type, Cfg).

make_child(Type, Cfg) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ zone(Opts) ->
maps:get(zone, Opts, undefined).

limiter(Opts) ->
maps:get(limiter, Opts, undefined).
emqx_limiter_schema:get_listener_opts(Opts).

add_limiter_bucket(Id, #{limiter := Limiter}) ->
maps:fold(
Expand Down
5 changes: 3 additions & 2 deletions apps/emqx/src/emqx_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2001,7 +2001,8 @@ base_listener(Bind) ->
listener_fields
),
#{
desc => ?DESC(base_listener_limiter)
desc => ?DESC(base_listener_limiter),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"enable_authn",
Expand All @@ -2012,7 +2013,7 @@ base_listener(Bind) ->
default => true
}
)}
].
] ++ emqx_limiter_schema:short_paths_fields(?MODULE).

desc("persistent_session_store") ->
"Settings for message persistence.";
Expand Down
96 changes: 84 additions & 12 deletions apps/emqx/test/emqx_ratelimiter_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,23 @@ all() ->
emqx_common_test_helpers:all(?MODULE).

init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF),
load_conf(),
emqx_common_test_helpers:start_apps([?APP]),
Config.

end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([?APP]).

init_per_testcase(_TestCase, Config) ->
emqx_config:erase(limiter),
load_conf(),
Config.

end_per_testcase(_TestCase, Config) ->
Config.

load_conf() ->
emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).

init_config() ->
emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF).
Expand Down Expand Up @@ -313,8 +315,8 @@ t_capacity(_) ->
%% Test Cases Global Level
%%--------------------------------------------------------------------
t_collaborative_alloc(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
GlobalMod = fun(Cfg) ->
Cfg#{message_routing => #{rate => ?RATE("600/1s"), burst => 0}}
end,

Bucket1 = fun(#{client := Cli} = Bucket) ->
Expand Down Expand Up @@ -353,11 +355,11 @@ t_collaborative_alloc(_) ->
).

t_burst(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
GlobalMod = fun(Cfg) ->
Cfg#{
message_routing := MR#{
rate := ?RATE("200/1s"),
burst := ?RATE("400/1s")
message_routing => #{
rate => ?RATE("200/1s"),
burst => ?RATE("400/1s")
}
}
end,
Expand Down Expand Up @@ -653,16 +655,16 @@ t_not_exists_instance(_) ->
),

?assertEqual(
{error, invalid_bucket},
{ok, infinity},
emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg)
),
ok.

t_create_instance_with_node(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
GlobalMod = fun(Cfg) ->
Cfg#{
message_routing := MR#{rate := ?RATE("200/1s")},
messages := MR#{rate := ?RATE("200/1s")}
message_routing => #{rate => ?RATE("200/1s"), burst => 0},
messages => #{rate => ?RATE("200/1s"), burst => 0}
}
end,

Expand Down Expand Up @@ -739,6 +741,68 @@ t_esockd_htb_consume(_) ->
?assertMatch({ok, _}, C2R),
ok.

%%--------------------------------------------------------------------
%% Test Cases short paths
%%--------------------------------------------------------------------
t_node_short_paths(_) ->
CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>,
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
Accessor = fun emqx_limiter_schema:get_node_opts/1,
?assertMatch(#{rate := 100.0}, Accessor(connection)),
?assertMatch(#{rate := 10.0}, Accessor(messages)),
?assertMatch(#{rate := 1.0}, Accessor(bytes)),
?assertMatch(#{rate := infinity}, Accessor(message_routing)),
?assertEqual(undefined, emqx:get_config([limiter, connection], undefined)).

t_compatibility_for_node_short_paths(_) ->
CfgStr =
<<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>,
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
Accessor = fun emqx_limiter_schema:get_node_opts/1,
?assertMatch(#{rate := 100.0}, Accessor(connection)),
?assertMatch(#{rate := 20.0}, Accessor(bytes)).

t_listener_short_paths(_) ->
CfgStr = <<
""
"listeners.tcp.default {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}"
""
>>,
ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertMatch(
#{
client := #{
messages := #{rate := 10.0},
bytes := #{rate := 1.0}
},
connection := #{rate := 100.0}
},
emqx_limiter_schema:get_listener_opts(ListenerOpt)
).

t_compatibility_for_listener_short_paths(_) ->
CfgStr = <<
"" "listeners.tcp.default {max_conn_rate = \"1000\", limiter.connection.rate = \"500\"}" ""
>>,
ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertMatch(
#{
connection := #{rate := 100.0}
},
emqx_limiter_schema:get_listener_opts(ListenerOpt)
).

t_no_limiter_for_listener(_) ->
CfgStr = <<>>,
ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertEqual(
undefined,
emqx_limiter_schema:get_listener_opts(ListenerOpt)
).

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -1043,3 +1107,11 @@ make_create_test_data_with_infinity_node(FakeInstnace) ->
%% client = C bucket = B C > B
{MkA(1000, 100), IsRefLimiter(FakeInstnace)}
].

parse_schema(ConfigString) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(
emqx_limiter_schema,
RawConf,
#{required => false, atom_key => false}
).

0 comments on commit b94290d

Please sign in to comment.