Skip to content

Commit

Permalink
feat(config): unsafe force_put
Browse files Browse the repository at this point in the history
plus fixed shared sub
  • Loading branch information
Georgy Sychev committed Apr 22, 2022
1 parent f75d82d commit d34c087
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 24 deletions.
12 changes: 1 addition & 11 deletions apps/emqx/etc/emqx.conf
Expand Up @@ -1035,7 +1035,7 @@ broker {

## Per-group dispatch strategy for shared subscription
##
## @doc broker.group.$group_name.shared_subscription_strategy
## @doc broker.shared_subscription_group.$group_name.strategy
## ValueType: random | round_robin | sticky | hash | local
## - random: dispatch the message to a random selected subscriber
## - round_robin: select the subscribers in a round-robin manner
Expand All @@ -1045,17 +1045,7 @@ broker {
## - hash: select the subscribers by the hash of clientIds
## Default: round_robin
shared_subscription_group {
local_group {
strategy = round_robin
}

sticky_group {
strategy = sticky
}

round_robin_group {
strategy = round_robin
}
}

## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages
Expand Down
61 changes: 51 additions & 10 deletions apps/emqx/src/emqx_config.erl
Expand Up @@ -46,6 +46,8 @@
find_raw/1,
put/1,
put/2,
force_put/2,
force_put/3,
erase/1
]).

Expand Down Expand Up @@ -90,6 +92,13 @@
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
-define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]).

-define(UNSAFE_ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
try [unsafe_atom(Key) || Key <- PATH] of
AtomKeyPath -> EXP
catch
error:badarg -> EXP_ON_FAIL
end
).
-define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
try [atom(Key) || Key <- PATH] of
AtomKeyPath -> EXP
Expand Down Expand Up @@ -238,7 +247,22 @@ erase(RootName) ->
persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))).

-spec put(emqx_map_lib:config_key_path(), term()) -> ok.
put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config).
put(KeyPath, Config) ->
Putter = fun(Path, Map, Value) ->
emqx_map_lib:deep_put(Path, Map, Value)
end,
do_put(?CONF, safe, Putter, KeyPath, Config).

-spec force_put(emqx_map_lib:config_key_path(), term()) -> ok.
force_put(KeyPath, Config) ->
force_put(KeyPath, Config, safe).

-spec force_put(emqx_map_lib:config_key_path(), term(), safe | unsafe) -> ok.
force_put(KeyPath, Config, Safety) ->
Putter = fun(Path, Map, Value) ->
emqx_map_lib:deep_force_put(Path, Map, Value)
end,
do_put(?CONF, Safety, Putter, KeyPath, Config).

-spec get_default_value(emqx_map_lib:config_key_path()) -> {ok, term()} | {error, term()}.
get_default_value([RootName | _] = KeyPath) ->
Expand Down Expand Up @@ -276,7 +300,11 @@ put_raw(Config) ->
).

-spec put_raw(emqx_map_lib:config_key_path(), term()) -> ok.
put_raw(KeyPath, Config) -> do_put(?RAW_CONF, KeyPath, Config).
put_raw(KeyPath, Config) ->
Putter = fun(Path, Map, Value) ->
emqx_map_lib:deep_force_put(Path, Map, Value)
end,
do_put(?RAW_CONF, safe, Putter, KeyPath, Config).

%%============================================================================
%% Load/Update configs From/To files
Expand Down Expand Up @@ -536,17 +564,17 @@ do_get(Type, [RootName | KeyPath], Default) ->
RootV = persistent_term:get(?PERSIS_KEY(Type, bin(RootName)), #{}),
do_deep_get(Type, KeyPath, RootV, Default).

do_put(Type, [], DeepValue) ->
do_put(Type, Safety, Putter, [], DeepValue) ->
maps:fold(
fun(RootName, Value, _Res) ->
do_put(Type, [RootName], Value)
do_put(Type, Safety, Putter, [RootName], Value)
end,
ok,
DeepValue
);
do_put(Type, [RootName | KeyPath], DeepValue) ->
do_put(Type, Safety, Putter, [RootName | KeyPath], DeepValue) ->
OldValue = do_get(Type, [RootName], #{}),
NewValue = do_deep_put(Type, KeyPath, OldValue, DeepValue),
NewValue = do_deep_put(Type, Safety, Putter, KeyPath, OldValue, DeepValue),
persistent_term:put(?PERSIS_KEY(Type, bin(RootName)), NewValue).

do_deep_get(?CONF, KeyPath, Map, Default) ->
Expand All @@ -558,19 +586,32 @@ do_deep_get(?CONF, KeyPath, Map, Default) ->
do_deep_get(?RAW_CONF, KeyPath, Map, Default) ->
emqx_map_lib:deep_get([bin(Key) || Key <- KeyPath], Map, Default).

do_deep_put(?CONF, KeyPath, Map, Value) ->
do_deep_put(?CONF, safe, Putter, KeyPath, Map, Value) ->
?ATOM_CONF_PATH(
KeyPath,
emqx_map_lib:deep_put(AtomKeyPath, Map, Value),
Putter(AtomKeyPath, Map, Value),
error({not_found, KeyPath})
);
do_deep_put(?RAW_CONF, KeyPath, Map, Value) ->
emqx_map_lib:deep_put([bin(Key) || Key <- KeyPath], Map, Value).
do_deep_put(?CONF, unsafe, Putter, KeyPath, Map, Value) ->
?UNSAFE_ATOM_CONF_PATH(
KeyPath,
Putter(AtomKeyPath, Map, Value),
error({not_found, KeyPath})
);
do_deep_put(?RAW_CONF, _Safety, Putter, KeyPath, Map, Value) ->
Putter([bin(Key) || Key <- KeyPath], Map, Value).

root_names_from_conf(RawConf) ->
Keys = maps:keys(RawConf),
[Name || Name <- get_root_names(), lists:member(Name, Keys)].

unsafe_atom(Bin) when is_binary(Bin) ->
binary_to_atom(Bin, utf8);
unsafe_atom(Str) when is_list(Str) ->
list_to_atom(Str);
unsafe_atom(Atom) when is_atom(Atom) ->
Atom.

atom(Bin) when is_binary(Bin) ->
binary_to_existing_atom(Bin, utf8);
atom(Str) when is_list(Str) ->
Expand Down
21 changes: 21 additions & 0 deletions apps/emqx/src/emqx_map_lib.erl
Expand Up @@ -20,6 +20,7 @@
deep_get/3,
deep_find/2,
deep_put/3,
deep_force_put/3,
deep_remove/2,
deep_merge/2,
safe_atom_key_map/1,
Expand Down Expand Up @@ -73,6 +74,26 @@ deep_put([Key | KeyPath], Map, Data) ->
SubMap = maps:get(Key, Map, #{}),
Map#{Key => deep_put(KeyPath, SubMap, Data)}.

-spec deep_force_put(config_key_path(), map(), term()) -> map().
deep_force_put([], _Map, Data) ->
Data;
deep_force_put([Key | KeyPath] = FullPath, Map, Data) ->
case Map of
#{Key := InnerValue} ->
Map#{Key => deep_force_put(KeyPath, InnerValue, Data)};

#{} ->
maps:put(Key, path_to_map(KeyPath, Data), Map);

_ ->
path_to_map(FullPath, Data)
end.

-spec path_to_map(config_key_path(), term()) -> map().
path_to_map([], Data) -> Data;
path_to_map([Key | Tail], Data) ->
#{Key => path_to_map(Tail, Data)}.

-spec deep_remove(config_key_path(), map()) -> map().
deep_remove([], Map) ->
Map;
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx/test/emqx_shared_sub_SUITE.erl
Expand Up @@ -456,7 +456,7 @@ ensure_config(Strategy, AckEnabled) ->
ensure_group_config(Group2Strategy) ->
maps:foreach(
fun(Group, Strategy) ->
emqx_config:put([broker, shared_subscription_group, Group, strategy], Strategy)
emqx_config:force_put([broker, shared_subscription_group, Group, strategy], Strategy, unsafe)
end,
Group2Strategy
).
Expand All @@ -467,8 +467,8 @@ ensure_group_config(Node, Group2Strategy) ->
rpc:call(
Node,
emqx_config,
put,
[[broker, shared_subscription_group, Group, strategy], Strategy]
force_put,
[[broker, shared_subscription_group, Group, strategy], Strategy, unsafe]
)
end,
Group2Strategy
Expand Down

0 comments on commit d34c087

Please sign in to comment.