Skip to content

Commit

Permalink
Merge pull request #10591 from lafirest/fix/simplify_limiter_client_cfg
Browse files Browse the repository at this point in the history
perf(limiter): simplify the memory represent of limiter configuration
  • Loading branch information
lafirest committed May 5, 2023
2 parents e4f5014 + 7a96a97 commit 335d948
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 168 deletions.
29 changes: 14 additions & 15 deletions apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

%% API
-export([
make_token_bucket_limiter/2,
make_local_limiter/2,
make_ref_limiter/2,
check/2,
consume/2,
Expand All @@ -32,12 +32,11 @@
make_future/1,
available/1
]).
-export_type([token_bucket_limiter/0]).
-export_type([local_limiter/0]).

%% a token bucket limiter with a limiter server's bucket reference

%% the number of tokens currently available
-type token_bucket_limiter() :: #{
%% a token bucket limiter which may or not contains a reference to another limiter,
%% and can be used in a client alone
-type local_limiter() :: #{
tokens := non_neg_integer(),
rate := decimal(),
capacity := decimal(),
Expand All @@ -58,12 +57,12 @@
retry_ctx =>
undefined
%% the retry context
| retry_context(token_bucket_limiter()),
| retry_context(local_limiter()),
%% allow to add other keys
atom => any()
}.

%% a limiter server's bucket reference
%% a limiter instance which only contains a reference to another limiter(bucket)
-type ref_limiter() :: #{
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy(),
Expand All @@ -88,7 +87,7 @@
}.

-type bucket() :: emqx_limiter_bucket_ref:bucket_ref().
-type limiter() :: token_bucket_limiter() | ref_limiter() | infinity.
-type limiter() :: local_limiter() | ref_limiter() | infinity.
-type millisecond() :: non_neg_integer().

-type pause_type() :: pause | partial.
Expand Down Expand Up @@ -116,7 +115,7 @@
rate := decimal(),
initial := non_neg_integer(),
low_watermark := non_neg_integer(),
capacity := decimal(),
burst := decimal(),
divisible := boolean(),
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy()
Expand All @@ -134,8 +133,8 @@
%% API
%%--------------------------------------------------------------------
%%@doc create a limiter
-spec make_token_bucket_limiter(limiter_bucket_cfg(), bucket()) -> _.
make_token_bucket_limiter(Cfg, Bucket) ->
-spec make_local_limiter(limiter_bucket_cfg(), bucket()) -> _.
make_local_limiter(Cfg, Bucket) ->
Cfg#{
tokens => emqx_limiter_server:get_initial_val(Cfg),
lasttime => ?NOW,
Expand Down Expand Up @@ -312,8 +311,8 @@ on_failure(throw, Limiter) ->
Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
erlang:throw({rate_check_fail, Message}).

-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
inner_check_result(token_bucket_limiter()).
-spec do_check_with_parent_limiter(pos_integer(), local_limiter()) ->
inner_check_result(local_limiter()).
do_check_with_parent_limiter(
Need,
#{
Expand All @@ -336,7 +335,7 @@ do_check_with_parent_limiter(
)
end.

-spec do_reset(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()).
-spec do_reset(pos_integer(), local_limiter()) -> inner_check_result(local_limiter()).
do_reset(
Need,
#{
Expand Down
25 changes: 24 additions & 1 deletion apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
post_config_update/5
]).

-export([
find_root/1,
insert_root/2,
delete_root/1
]).

-export([
start_server/1,
start_server/2,
Expand Down Expand Up @@ -62,6 +68,7 @@

-define(UID(Id, Type), {Id, Type}).
-define(TAB, emqx_limiter_counters).
-define(ROOT_ID, root).

%%--------------------------------------------------------------------
%% API
Expand Down Expand Up @@ -104,9 +111,25 @@ insert_bucket(Id, Type, Bucket) ->
).

-spec delete_bucket(limiter_id(), limiter_type()) -> true.
delete_bucket(Type, Id) ->
delete_bucket(Id, Type) ->
ets:delete(?TAB, ?UID(Id, Type)).

-spec find_root(limiter_type()) ->
{ok, bucket_ref()} | undefined.
find_root(Type) ->
find_bucket(?ROOT_ID, Type).

-spec insert_root(
limiter_type(),
bucket_ref()
) -> boolean().
insert_root(Type, Bucket) ->
insert_bucket(?ROOT_ID, Type, Bucket).

-spec delete_root(limiter_type()) -> true.
delete_root(Type) ->
delete_bucket(?ROOT_ID, Type).

post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
Types = lists:delete(client, maps:keys(NewConf)),
_ = [on_post_config_update(Type, NewConf) || Type <- Types],
Expand Down
68 changes: 54 additions & 14 deletions apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
get_bucket_cfg_path/2,
desc/1,
types/0,
calc_capacity/1
calc_capacity/1,
extract_with_type/2,
default_client_config/0
]).

-define(KILOBYTE, 1024).
Expand Down Expand Up @@ -94,30 +96,33 @@
namespace() -> limiter.

roots() ->
[{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}].
[
{limiter,
hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{
importance => ?IMPORTANCE_HIDDEN
})}
].

fields(limiter) ->
[
{Type,
?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type),
default => #{},
importance => ?IMPORTANCE_HIDDEN,
aliases => alias_of_type(Type)
})}
|| Type <- types()
] ++
[
%% This is an undocumented feature, and it won't be support anymore
{client,
?HOCON(
?R_REF(client_fields),
#{
desc => ?DESC(client),
importance => ?IMPORTANCE_HIDDEN,
default => maps:from_list([
{erlang:atom_to_binary(Type), #{}}
|| Type <- types()
])
required => {false, recursively},
deprecated => {since, "5.0.25"}
}
)}
];
Expand All @@ -131,7 +136,7 @@ fields(node_opts) ->
})}
];
fields(client_fields) ->
client_fields(types(), #{default => #{}});
client_fields(types());
fields(bucket_opts) ->
fields_of_bucket(<<"infinity">>);
fields(client_opts) ->
Expand Down Expand Up @@ -194,7 +199,7 @@ fields(client_opts) ->
fields(listener_fields) ->
composite_bucket_fields(?LISTENER_BUCKET_KEYS, listener_client_fields);
fields(listener_client_fields) ->
client_fields(?LISTENER_BUCKET_KEYS, #{required => false});
client_fields(?LISTENER_BUCKET_KEYS);
fields(Type) ->
simple_bucket_field(Type).

Expand Down Expand Up @@ -236,6 +241,31 @@ calc_capacity(#{rate := infinity}) ->
calc_capacity(#{rate := Rate, burst := Burst}) ->
erlang:floor(1000 * Rate / default_period()) + Burst.

extract_with_type(_Type, undefined) ->
undefined;
extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
ClientVal = maps:find(Type, ClientCfg),
merge_client_bucket(Type, ClientVal, BucketVal);
extract_with_type(Type, BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
merge_client_bucket(Type, undefined, BucketVal).

%% Since the client configuration can be absent and be a undefined value,
%% but we must need some basic settings to control the behaviour of the limiter,
%% so here add this helper function to generate a default setting.
%% This is a temporary workaround until we found a better way to simplify.
default_client_config() ->
#{
rate => infinity,
initial => 0,
low_watermark => 0,
burst => 0,
divisible => false,
max_retry_time => timer:seconds(10),
failure_strategy => force
}.

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -362,7 +392,7 @@ simple_bucket_field(Type) when is_atom(Type) ->
?R_REF(?MODULE, client_opts),
#{
desc => ?DESC(client),
required => false,
required => {false, recursively},
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
}
Expand All @@ -375,7 +405,7 @@ composite_bucket_fields(Types, ClientRef) ->
{Type,
?HOCON(?R_REF(?MODULE, bucket_opts), #{
desc => ?DESC(?MODULE, Type),
required => false,
required => {false, recursively},
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})}
Expand All @@ -387,7 +417,7 @@ composite_bucket_fields(Types, ClientRef) ->
?R_REF(?MODULE, ClientRef),
#{
desc => ?DESC(client),
required => false
required => {false, recursively}
}
)}
].
Expand All @@ -410,11 +440,12 @@ fields_of_bucket(Default) ->
})}
].

client_fields(Types, Meta) ->
client_fields(Types) ->
[
{Type,
?HOCON(?R_REF(client_opts), Meta#{
?HOCON(?R_REF(client_opts), #{
desc => ?DESC(Type),
required => false,
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})}
Expand All @@ -436,3 +467,12 @@ alias_of_type(bytes) ->
[bytes_in];
alias_of_type(_) ->
[].

merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
#{Type => BucketVal, client => #{Type => ClientVal}};
merge_client_bucket(Type, {ok, ClientVal}, _) ->
#{client => #{Type => ClientVal}};
merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.

0 comments on commit 335d948

Please sign in to comment.