Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(limiter): simplify the memory represent of limiter configuration #10591

Merged
merged 3 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 14 additions & 15 deletions apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

%% API
-export([
make_token_bucket_limiter/2,
make_local_limiter/2,
make_ref_limiter/2,
check/2,
consume/2,
Expand All @@ -32,12 +32,11 @@
make_future/1,
available/1
]).
-export_type([token_bucket_limiter/0]).
-export_type([local_limiter/0]).

%% a token bucket limiter with a limiter server's bucket reference

%% the number of tokens currently available
-type token_bucket_limiter() :: #{
%% a token bucket limiter which may or not contains a reference to another limiter,
%% and can be used in a client alone
-type local_limiter() :: #{
tokens := non_neg_integer(),
rate := decimal(),
capacity := decimal(),
Expand All @@ -58,12 +57,12 @@
retry_ctx =>
undefined
%% the retry context
| retry_context(token_bucket_limiter()),
| retry_context(local_limiter()),
%% allow to add other keys
atom => any()
}.

%% a limiter server's bucket reference
%% a limiter instance which only contains a reference to another limiter(bucket)
-type ref_limiter() :: #{
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy(),
Expand All @@ -88,7 +87,7 @@
}.

-type bucket() :: emqx_limiter_bucket_ref:bucket_ref().
-type limiter() :: token_bucket_limiter() | ref_limiter() | infinity.
-type limiter() :: local_limiter() | ref_limiter() | infinity.
-type millisecond() :: non_neg_integer().

-type pause_type() :: pause | partial.
Expand Down Expand Up @@ -116,7 +115,7 @@
rate := decimal(),
initial := non_neg_integer(),
low_watermark := non_neg_integer(),
capacity := decimal(),
burst := decimal(),
divisible := boolean(),
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy()
Expand All @@ -134,8 +133,8 @@
%% API
%%--------------------------------------------------------------------
%%@doc create a limiter
-spec make_token_bucket_limiter(limiter_bucket_cfg(), bucket()) -> _.
make_token_bucket_limiter(Cfg, Bucket) ->
-spec make_local_limiter(limiter_bucket_cfg(), bucket()) -> _.
make_local_limiter(Cfg, Bucket) ->
Cfg#{
tokens => emqx_limiter_server:get_initial_val(Cfg),
lasttime => ?NOW,
Expand Down Expand Up @@ -312,8 +311,8 @@ on_failure(throw, Limiter) ->
Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
erlang:throw({rate_check_fail, Message}).

-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
inner_check_result(token_bucket_limiter()).
-spec do_check_with_parent_limiter(pos_integer(), local_limiter()) ->
inner_check_result(local_limiter()).
do_check_with_parent_limiter(
Need,
#{
Expand All @@ -336,7 +335,7 @@ do_check_with_parent_limiter(
)
end.

-spec do_reset(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()).
-spec do_reset(pos_integer(), local_limiter()) -> inner_check_result(local_limiter()).
do_reset(
Need,
#{
Expand Down
25 changes: 24 additions & 1 deletion apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
post_config_update/5
]).

-export([
find_root/1,
insert_root/2,
delete_root/1
]).

-export([
start_server/1,
start_server/2,
Expand Down Expand Up @@ -62,6 +68,7 @@

-define(UID(Id, Type), {Id, Type}).
-define(TAB, emqx_limiter_counters).
-define(ROOT_ID, root).

%%--------------------------------------------------------------------
%% API
Expand Down Expand Up @@ -104,9 +111,25 @@ insert_bucket(Id, Type, Bucket) ->
).

-spec delete_bucket(limiter_id(), limiter_type()) -> true.
delete_bucket(Type, Id) ->
delete_bucket(Id, Type) ->
ets:delete(?TAB, ?UID(Id, Type)).

-spec find_root(limiter_type()) ->
{ok, bucket_ref()} | undefined.
find_root(Type) ->
find_bucket(?ROOT_ID, Type).

-spec insert_root(
limiter_type(),
bucket_ref()
) -> boolean().
insert_root(Type, Bucket) ->
insert_bucket(?ROOT_ID, Type, Bucket).

-spec delete_root(limiter_type()) -> true.
delete_root(Type) ->
delete_bucket(?ROOT_ID, Type).

post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
Types = lists:delete(client, maps:keys(NewConf)),
_ = [on_post_config_update(Type, NewConf) || Type <- Types],
Expand Down
68 changes: 54 additions & 14 deletions apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
get_bucket_cfg_path/2,
desc/1,
types/0,
calc_capacity/1
calc_capacity/1,
extract_with_type/2,
default_client_config/0
]).

-define(KILOBYTE, 1024).
Expand Down Expand Up @@ -94,30 +96,33 @@
namespace() -> limiter.

roots() ->
[{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}].
[
{limiter,
hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{
importance => ?IMPORTANCE_HIDDEN
})}
].

fields(limiter) ->
[
{Type,
?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type),
default => #{},
importance => ?IMPORTANCE_HIDDEN,
aliases => alias_of_type(Type)
})}
|| Type <- types()
] ++
[
%% This is an undocumented feature, and it won't be support anymore
{client,
?HOCON(
?R_REF(client_fields),
#{
desc => ?DESC(client),
importance => ?IMPORTANCE_HIDDEN,
default => maps:from_list([
{erlang:atom_to_binary(Type), #{}}
|| Type <- types()
])
required => {false, recursively},
deprecated => {since, "5.0.25"}
}
)}
];
Expand All @@ -131,7 +136,7 @@ fields(node_opts) ->
})}
];
fields(client_fields) ->
client_fields(types(), #{default => #{}});
client_fields(types());
fields(bucket_opts) ->
fields_of_bucket(<<"infinity">>);
fields(client_opts) ->
Expand Down Expand Up @@ -194,7 +199,7 @@ fields(client_opts) ->
fields(listener_fields) ->
composite_bucket_fields(?LISTENER_BUCKET_KEYS, listener_client_fields);
fields(listener_client_fields) ->
client_fields(?LISTENER_BUCKET_KEYS, #{required => false});
client_fields(?LISTENER_BUCKET_KEYS);
fields(Type) ->
simple_bucket_field(Type).

Expand Down Expand Up @@ -236,6 +241,31 @@ calc_capacity(#{rate := infinity}) ->
calc_capacity(#{rate := Rate, burst := Burst}) ->
erlang:floor(1000 * Rate / default_period()) + Burst.

extract_with_type(_Type, undefined) ->
undefined;
extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
ClientVal = maps:find(Type, ClientCfg),
merge_client_bucket(Type, ClientVal, BucketVal);
extract_with_type(Type, BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
merge_client_bucket(Type, undefined, BucketVal).

%% Since the client configuration can be absent and be a undefined value,
%% but we must need some basic settings to control the behaviour of the limiter,
%% so here add this helper function to generate a default setting.
%% This is a temporary workaround until we found a better way to simplify.
default_client_config() ->
#{
rate => infinity,
initial => 0,
low_watermark => 0,
burst => 0,
divisible => false,
max_retry_time => timer:seconds(10),
failure_strategy => force
}.

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -362,7 +392,7 @@ simple_bucket_field(Type) when is_atom(Type) ->
?R_REF(?MODULE, client_opts),
#{
desc => ?DESC(client),
required => false,
required => {false, recursively},
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
}
Expand All @@ -375,7 +405,7 @@ composite_bucket_fields(Types, ClientRef) ->
{Type,
?HOCON(?R_REF(?MODULE, bucket_opts), #{
desc => ?DESC(?MODULE, Type),
required => false,
required => {false, recursively},
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})}
Expand All @@ -387,7 +417,7 @@ composite_bucket_fields(Types, ClientRef) ->
?R_REF(?MODULE, ClientRef),
#{
desc => ?DESC(client),
required => false
required => {false, recursively}
}
)}
].
Expand All @@ -410,11 +440,12 @@ fields_of_bucket(Default) ->
})}
].

client_fields(Types, Meta) ->
client_fields(Types) ->
[
{Type,
?HOCON(?R_REF(client_opts), Meta#{
?HOCON(?R_REF(client_opts), #{
desc => ?DESC(Type),
required => false,
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})}
Expand All @@ -436,3 +467,12 @@ alias_of_type(bytes) ->
[bytes_in];
alias_of_type(_) ->
[].

merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
#{Type => BucketVal, client => #{Type => ClientVal}};
merge_client_bucket(Type, {ok, ClientVal}, _) ->
#{client => #{Type => ClientVal}};
merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.