Skip to content

Commit

Permalink
feat: add topic_metrics and slow_subs configuration to data impor…
Browse files Browse the repository at this point in the history
…t/export

Fixes: EMQX-10590
  • Loading branch information
SergeTupchiy committed Jul 21, 2023
1 parent a1a7c4f commit b37920d
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 8 deletions.
3 changes: 2 additions & 1 deletion apps/emqx_management/src/emqx_mgmt_data_backup.erl
Expand Up @@ -57,7 +57,8 @@
<<"flapping_detect">>,
<<"broker">>,
<<"force_gc">>,
<<"zones">>
<<"zones">>,
<<"slow_subs">>
]).

-define(DEFAULT_OPTS, #{}).
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_modules/src/emqx_modules.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_modules, [
{description, "EMQX Modules"},
{vsn, "5.0.17"},
{vsn, "5.0.18"},
{modules, []},
{applications, [kernel, stdlib, emqx, emqx_ctl]},
{mod, {emqx_modules_app, []}},
Expand Down
41 changes: 39 additions & 2 deletions apps/emqx_modules/src/emqx_modules_conf.erl
Expand Up @@ -18,6 +18,7 @@
-module(emqx_modules_conf).

-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).

%% Load/Unload
-export([
Expand All @@ -37,6 +38,11 @@
post_config_update/5
]).

%% Data backup
-export([
import_config/1
]).

%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -78,6 +84,20 @@ remove_topic_metrics(Topic) ->
{error, Reason} -> {error, Reason}
end.

%%--------------------------------------------------------------------
%% Data backup (Topic-Metrics)
%%--------------------------------------------------------------------

import_config(#{<<"topic_metrics">> := Topics}) ->
case emqx_conf:update([topic_metrics], {merge_topics, Topics}, #{override_to => cluster}) of
{ok, _} ->
{ok, #{root_key => topic_metrics, changed => []}};
Error ->
{error, #{root_key => topic_metrics, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => topic_metrics, changed => []}}.

%%--------------------------------------------------------------------
%% Config Handler
%%--------------------------------------------------------------------
Expand All @@ -103,7 +123,13 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
{ok, RawConf -- [Topic]};
_ ->
{error, not_found}
end.
end;
pre_config_update(_, {merge_topics, NewConf}, OldConf) ->
KeyFun = fun(#{<<"topic">> := T}) -> T end,
MergedConf = emqx_utils:merge_lists(OldConf, NewConf, KeyFun),
{ok, MergedConf};
pre_config_update(_, NewConf, _OldConf) ->
{ok, NewConf}.

-spec post_config_update(
list(atom()),
Expand All @@ -113,7 +139,6 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
emqx_config:app_envs()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.

post_config_update(
_,
{add_topic_metrics, Topic},
Expand All @@ -135,6 +160,18 @@ post_config_update(
case emqx_topic_metrics:deregister(Topic) of
ok -> ok;
{error, Reason} -> {error, Reason}
end;
post_config_update(_, _UpdateReq, NewConfig, OldConfig, _AppEnvs) ->
#{
removed := Removed,
added := Added
} = emqx_utils:diff_lists(NewConfig, OldConfig, fun(#{topic := T}) -> T end),
Deregistered = [emqx_topic_metrics:deregister(T) || #{topic := T} <- Removed],
Registered = [emqx_topic_metrics:register(T) || #{topic := T} <- Added],
Errs = [Res || Res <- Registered ++ Deregistered, Res =/= ok],
case Errs of
[] -> ok;
_ -> {error, Errs}
end.

%%--------------------------------------------------------------------
Expand Down
42 changes: 38 additions & 4 deletions apps/emqx_modules/test/emqx_modules_conf_SUITE.erl
Expand Up @@ -39,12 +39,46 @@ end_per_suite(_Conf) ->
init_per_testcase(_CaseName, Conf) ->
Conf.

end_per_testcase(_CaseName, _Conf) ->
[emqx_modules_conf:remove_topic_metrics(T) || T <- emqx_modules_conf:topic_metrics()],
ok.

%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------

t_topic_metrics_list(_) ->
ok.

t_topic_metrics_add_remove(_) ->
ok.
?assertEqual([], emqx_modules_conf:topic_metrics()),
?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic">>)),
?assertEqual([<<"test-topic">>], emqx_modules_conf:topic_metrics()),
?assertEqual(ok, emqx_modules_conf:remove_topic_metrics(<<"test-topic">>)),
?assertEqual([], emqx_modules_conf:topic_metrics()),
?assertMatch({error, _}, emqx_modules_conf:remove_topic_metrics(<<"test-topic">>)).

t_topic_metrics_merge_update(_) ->
?assertEqual([], emqx_modules_conf:topic_metrics()),
?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-import1">>)),
?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-import2">>)),
ImportConf = #{
<<"topic_metrics">> =>
[
#{<<"topic">> => <<"imported_topic1">>},
#{<<"topic">> => <<"imported_topic2">>}
]
},
?assertMatch({ok, _}, emqx_modules_conf:import_config(ImportConf)),
ExpTopics = [
<<"test-topic-before-import1">>,
<<"test-topic-before-import2">>,
<<"imported_topic1">>,
<<"imported_topic2">>
],
?assertEqual(ExpTopics, emqx_modules_conf:topic_metrics()).

t_topic_metrics_update(_) ->
?assertEqual([], emqx_modules_conf:topic_metrics()),
?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-update1">>)),
?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-update2">>)),
UpdConf = [#{<<"topic">> => <<"new_topic1">>}, #{<<"topic">> => <<"new_topic2">>}],
?assertMatch({ok, _}, emqx_conf:update([topic_metrics], UpdConf, #{override_to => cluster})),
?assertEqual([<<"new_topic1">>, <<"new_topic2">>], emqx_modules_conf:topic_metrics()).
4 changes: 4 additions & 0 deletions changes/ce/fix-11322.en.md
@@ -0,0 +1,4 @@
Import additional configurations from EMQX backup file (`emqx ctl import` command):
- rule_engine (previously not imported due to the bug)
- topic_metrics (previously not implemented)
- slow_subs (previously not implemented).

0 comments on commit b37920d

Please sign in to comment.