Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(config): enforcing atom key path in hotcode path #10698

Merged
merged 4 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions apps/emqx/src/emqx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,18 @@ run_fold_hook(HookPoint, Args, Acc) ->

-spec get_config(emqx_utils_maps:config_key_path()) -> term().
get_config(KeyPath) ->
emqx_config:get(KeyPath).
KeyPath1 = emqx_config:ensure_atom_conf_path(KeyPath, {raise_error, config_not_found}),
emqx_config:get(KeyPath1).

-spec get_config(emqx_utils_maps:config_key_path(), term()) -> term().
get_config(KeyPath, Default) ->
emqx_config:get(KeyPath, Default).
try
KeyPath1 = emqx_config:ensure_atom_conf_path(KeyPath, {raise_error, config_not_found}),
emqx_config:get(KeyPath1, Default)
catch
error:config_not_found ->
Default
end.

-spec get_raw_config(emqx_utils_maps:config_key_path()) -> term().
get_raw_config(KeyPath) ->
Expand Down
57 changes: 30 additions & 27 deletions apps/emqx/src/emqx_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
remove_handlers/0
]).

-export([ensure_atom_conf_path/2]).

-ifdef(TEST).
-export([erase_all/0]).
-endif.
Expand All @@ -113,7 +115,8 @@
update_cmd/0,
update_args/0,
update_error/0,
update_result/0
update_result/0,
runtime_config_key_path/0
]).

-type update_request() :: term().
Expand Down Expand Up @@ -144,6 +147,8 @@
-type config() :: #{atom() => term()} | list() | undefined.
-type app_envs() :: [proplists:property()].

-type runtime_config_key_path() :: [atom()].

%% @doc For the given path, get root value enclosed in a single-key map.
-spec get_root(emqx_utils_maps:config_key_path()) -> map().
get_root([RootName | _]) ->
Expand All @@ -156,25 +161,21 @@ get_root_raw([RootName | _]) ->

%% @doc Get a config value for the given path.
%% The path should at least include root config name.
-spec get(emqx_utils_maps:config_key_path()) -> term().
-spec get(runtime_config_key_path()) -> term().
get(KeyPath) -> do_get(?CONF, KeyPath).

-spec get(emqx_utils_maps:config_key_path(), term()) -> term().
-spec get(runtime_config_key_path(), term()) -> term().
get(KeyPath, Default) -> do_get(?CONF, KeyPath, Default).

-spec find(emqx_utils_maps:config_key_path()) ->
-spec find(runtime_config_key_path()) ->
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
find([]) ->
case do_get(?CONF, [], ?CONFIG_NOT_FOUND_MAGIC) of
?CONFIG_NOT_FOUND_MAGIC -> {not_found, []};
Res -> {ok, Res}
end;
find(KeyPath) ->
atom_conf_path(
KeyPath,
fun(AtomKeyPath) -> emqx_utils_maps:deep_find(AtomKeyPath, get_root(KeyPath)) end,
{return, {not_found, KeyPath}}
).
find(AtomKeyPath) ->
emqx_utils_maps:deep_find(AtomKeyPath, get_root(AtomKeyPath)).

-spec find_raw(emqx_utils_maps:config_key_path()) ->
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
Expand Down Expand Up @@ -712,21 +713,14 @@ do_put(Type, Putter, [RootName | KeyPath], DeepValue) ->
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
persistent_term:put(?PERSIS_KEY(Type, RootName), NewValue).

do_deep_get(?CONF, KeyPath, Map, Default) ->
atom_conf_path(
KeyPath,
fun(AtomKeyPath) -> emqx_utils_maps:deep_get(AtomKeyPath, Map, Default) end,
{return, Default}
);
do_deep_get(?CONF, AtomKeyPath, Map, Default) ->
emqx_utils_maps:deep_get(AtomKeyPath, Map, Default);
do_deep_get(?RAW_CONF, KeyPath, Map, Default) ->
emqx_utils_maps:deep_get([bin(Key) || Key <- KeyPath], Map, Default).

do_deep_put(?CONF, Putter, KeyPath, Map, Value) ->
atom_conf_path(
KeyPath,
fun(AtomKeyPath) -> Putter(AtomKeyPath, Map, Value) end,
{raise_error, {not_found, KeyPath}}
);
AtomKeyPath = ensure_atom_conf_path(KeyPath, {raise_error, {not_found, KeyPath}}),
Putter(AtomKeyPath, Map, Value);
do_deep_put(?RAW_CONF, Putter, KeyPath, Map, Value) ->
Putter([bin(Key) || Key <- KeyPath], Map, Value).

Expand Down Expand Up @@ -773,15 +767,24 @@ conf_key(?CONF, RootName) ->
conf_key(?RAW_CONF, RootName) ->
bin(RootName).

atom_conf_path(Path, ExpFun, OnFail) ->
try [atom(Key) || Key <- Path] of
AtomKeyPath -> ExpFun(AtomKeyPath)
ensure_atom_conf_path(Path, OnFail) ->
case lists:all(fun erlang:is_atom/1, Path) of
true ->
%% Do not try to build new atom PATH if it already is.
Path;
_ ->
to_atom_conf_path(Path, OnFail)
end.

to_atom_conf_path(Path, OnFail) ->
try
[atom(Key) || Key <- Path]
catch
error:badarg ->
case OnFail of
{return, Val} ->
Val;
{raise_error, Err} ->
error(Err)
error(Err);
{return, V} ->
V
end
end.
18 changes: 15 additions & 3 deletions apps/emqx/src/emqx_shared_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,18 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->

-spec strategy(emqx_topic:group()) -> strategy().
strategy(Group) ->
case emqx:get_config([broker, shared_subscription_group, Group, strategy], undefined) of
undefined -> emqx:get_config([broker, shared_subscription_strategy]);
Strategy -> Strategy
try
emqx:get_config([
broker,
shared_subscription_group,
binary_to_existing_atom(Group),
strategy
])
catch
error:{config_not_found, _} ->
get_default_shared_subscription_strategy();
error:badarg ->
get_default_shared_subscription_strategy()
end.

-spec ack_enabled() -> boolean().
Expand Down Expand Up @@ -544,3 +553,6 @@ delete_route_if_needed({Group, Topic} = GroupTopic) ->
if_no_more_subscribers(GroupTopic, fun() ->
ok = emqx_router:do_delete_route(Topic, {Group, node()})
end).

get_default_shared_subscription_strategy() ->
emqx:get_config([broker, shared_subscription_strategy]).
13 changes: 13 additions & 0 deletions apps/emqx/test/emqx_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ t_cluster_nodes(_) ->
?assertEqual(Expected, emqx:cluster_nodes(cores)),
?assertEqual([], emqx:cluster_nodes(stopped)).

t_get_config(_) ->
?assertEqual(false, emqx:get_config([overload_protection, enable])),
?assertEqual(false, emqx:get_config(["overload_protection", <<"enable">>])).

t_get_config_default_1(_) ->
?assertEqual(false, emqx:get_config([overload_protection, enable], undefined)),
?assertEqual(false, emqx:get_config(["overload_protection", <<"enable">>], undefined)).

t_get_config_default_2(_) ->
AtomPathRes = emqx:get_config([overload_protection, <<"_!no_@exist_">>], undefined),
NonAtomPathRes = emqx:get_config(["doesnotexist", <<"db_backend">>], undefined),
?assertEqual(undefined, NonAtomPathRes),
?assertEqual(undefined, AtomPathRes).
%%--------------------------------------------------------------------
%% Hook fun
%%--------------------------------------------------------------------
Expand Down
6 changes: 5 additions & 1 deletion apps/emqx_authn/src/emqx_authn_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,11 @@ with_listener(ListenerID, Fun) ->
find_listener(ListenerID) ->
case binary:split(ListenerID, <<":">>) of
[BType, BName] ->
case emqx_config:find([listeners, BType, BName]) of
case
emqx_config:find([
listeners, binary_to_existing_atom(BType), binary_to_existing_atom(BName)
])
of
{ok, _} ->
{ok, {BType, BName}};
{not_found, _, _} ->
Expand Down
6 changes: 5 additions & 1 deletion apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,15 @@ get_metrics_from_local_node(BridgeType, BridgeName) ->
).

is_enabled_bridge(BridgeType, BridgeName) ->
try emqx:get_config([bridges, BridgeType, BridgeName]) of
try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of
ConfMap ->
maps:get(enable, ConfMap, false)
catch
error:{config_not_found, _} ->
throw(not_found);
error:badarg ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found)
end.

Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
]).

start_link() ->
MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000),
MaxHistory = emqx_conf:get([node, cluster_call, max_history], 100),
CleanupMs = emqx_conf:get([node, cluster_call, cleanup_interval], 5 * 60 * 1000),
start_link(MaxHistory, CleanupMs).

start_link(MaxHistory, CleanupMs) ->
Expand Down
1 change: 1 addition & 0 deletions changes/ce/perf-10698.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimize memory usage when accessing the configuration during runtime.
5 changes: 5 additions & 0 deletions changes/ce/perf-10698.zh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
在运行时降低读取配置的内存占用。




Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_ee_schema_registry, [
{description, "EMQX Schema Registry"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, [emqx_ee_schema_registry_sup]},
{mod, {emqx_ee_schema_registry_app, []}},
{applications, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ get_serde(SchemaName) ->

-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
get_schema(SchemaName) ->
case emqx_config:get([?CONF_KEY_ROOT, schemas, SchemaName], undefined) of
case
emqx_config:get(
[?CONF_KEY_ROOT, schemas, binary_to_atom(SchemaName)], undefined
)
of
undefined ->
{error, not_found};
Config ->
Expand Down