Skip to content

Commit

Permalink
Merge pull request #9398 from emqx/1121-rule-bridge-properties
Browse files Browse the repository at this point in the history
1121 rule bridge properties
  • Loading branch information
zmstone committed Nov 23, 2022
2 parents 6e106ed + f3df2c8 commit cded5fc
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 52 deletions.
17 changes: 16 additions & 1 deletion apps/emqx/src/emqx_misc.erl
Expand Up @@ -54,7 +54,8 @@
pmap/3,
readable_error_msg/1,
safe_to_existing_atom/1,
safe_to_existing_atom/2
safe_to_existing_atom/2,
pub_props_to_packet/1
]).

-export([
Expand Down Expand Up @@ -568,3 +569,17 @@ ipv6_probe_test() ->
end.

-endif.

pub_props_to_packet(Properties) ->
F = fun
('User-Property', M) ->
case is_map(M) andalso map_size(M) > 0 of
true -> {true, maps:to_list(M)};
false -> false
end;
('User-Property-Pairs', _) ->
false;
(_, _) ->
true
end,
maps:filtermap(F, Properties).
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "An OTP application"},
{vsn, "0.1.7"},
{vsn, "0.1.8"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
10 changes: 6 additions & 4 deletions apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
Expand Up @@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
#mqtt_msg{
qos = QoS,
retain = Retain,
topic = topic(Mountpoint, Topic),
props = #{},
props = emqx_misc:pub_props_to_packet(PubProps),
payload = Payload
};
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}.

%% published from remote node over a MQTT connection
to_broker_msg(Msg, Vars, undefined) ->
to_broker_msg(Msg, Vars, #{});
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
Expand All @@ -103,8 +106,9 @@ to_broker_msg(
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
set_headers(
Props,
Props#{properties => emqx_misc:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
Expand Down Expand Up @@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) ->
estimate_size(Term) ->
erlang:external_size(Term).

set_headers(undefined, Msg) ->
Msg;
set_headers(Val, Msg) ->
emqx_message:set_headers(Val, Msg).
topic(undefined, Topic) -> Topic;
Expand Down
26 changes: 25 additions & 1 deletion apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
Expand Up @@ -218,7 +218,7 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
of the rule, then the string "undefined" is used.
"""
zh: """
要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。
"""
}
Expand All @@ -227,6 +227,30 @@ of the rule, then the string "undefined" is used.
zh: "消息负载"
}
}
republish_args_user_properties {
desc {
en: """
From which variable should the MQTT message's User-Property pairs be taken from.
The value must be a map.
You may configure it to <code>${pub_props.'User-Property'}</code> or
use <code>SELECT *,pub_props.'User-Property' as user_properties</code>
to forward the original user properties to the republished message.
You may also call <code>map_put</code> function like
<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
to inject user properties.
NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.
"""
zh: """
指定使用哪个变量来填充 MQTT 消息的 User-Property 列表。这个变量的值必须是一个 map 类型。
可以设置成 <code>${pub_props.'User-Property'}</code> 或者
使用 <code>SELECT *,pub_props.'User-Property' as user_properties</code> 来把源 MQTT 消息
的 User-Property 列表用于填充。
也可以使用 <code>map_put</code> 函数来添加新的 User-Property,
<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
注意:MQTT 协议允许一个消息中出现多次同一个 property 名,但是 EMQX 的规则引擎不允许。
"""
}
}

rule_engine_ignore_sys_message {
desc {
Expand Down
81 changes: 54 additions & 27 deletions apps/emqx_rule_engine/src/emqx_rule_actions.erl
Expand Up @@ -37,6 +37,8 @@

-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().

-define(ORIGINAL_USER_PROPERTIES, original).

%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
Expand All @@ -57,15 +59,17 @@ pre_process_action_args(
topic := Topic,
qos := QoS,
retain := Retain,
payload := Payload
payload := Payload,
user_properties := UserProperties
} = Args
) ->
Args#{
preprocessed_tmpl => #{
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
user_properties => preproc_user_properties(UserProperties)
}
};
pre_process_action_args(_, Args) ->
Expand Down Expand Up @@ -93,44 +97,39 @@ republish(
_Args
) ->
?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
%% republish a PUBLISH message
republish(
Selected,
#{flags := Flags, metadata := #{rule_id := RuleId}},
#{metadata := #{rule_id := RuleId}} = Env,
#{
preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks
payload := PayloadTks,
user_properties := UserPropertiesTks
}
}
) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
%% in case this is a "$events/" event
republish(
Selected,
#{metadata := #{rule_id := RuleId}},
#{
preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks
%% 'flags' is set for message re-publishes or message related
%% events such as message.acked and message.dropped
Flags0 = maps:get(flags, Env, #{}),
Flags = Flags0#{retain => Retain},
PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
?TRACE(
"RULE",
"republish_message",
#{
flags => Flags,
topic => Topic,
payload => Payload,
pub_props => PubProps
}
}
) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
),
safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).

%%--------------------------------------------------------------------
%% internal functions
Expand Down Expand Up @@ -168,13 +167,16 @@ pre_process_args(Mod, Func, Args) ->
false -> Args
end.

safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
Msg = #message{
id = emqx_guid:gen(),
qos = QoS,
from = RuleId,
flags = Flags,
headers = #{republish_by => RuleId},
headers = #{
republish_by => RuleId,
properties => emqx_misc:pub_props_to_packet(PubProps)
},
topic = Topic,
payload = Payload,
timestamp = erlang:system_time(millisecond)
Expand All @@ -187,6 +189,19 @@ preproc_vars(Data) when is_binary(Data) ->
preproc_vars(Data) ->
Data.

preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
%% keep the original
%% avoid processing this special variable because
%% we do not want to force users to select the value
%% the value will be taken from Env.pub_props directly
?ORIGINAL_USER_PROPERTIES;
preproc_user_properties(<<"${", _/binary>> = V) ->
%% use a variable
emqx_plugin_libs_rule:preproc_tmpl(V);
preproc_user_properties(_) ->
%% invalid, discard
undefined.

replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
case Var of
Expand All @@ -201,3 +216,15 @@ format_msg([], Selected) ->
emqx_json:encode(Selected);
format_msg(Tokens, Selected) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).

format_pub_props(UserPropertiesTks, Selected, Env) ->
UserProperties =
case UserPropertiesTks of
?ORIGINAL_USER_PROPERTIES ->
maps:get('User-Property', maps:get(pub_props, Env, #{}), #{});
undefined ->
#{};
_ ->
replace_simple_var(UserPropertiesTks, Selected, #{})
end,
#{'User-Property' => UserProperties}.
2 changes: 1 addition & 1 deletion apps/emqx_rule_engine/src/emqx_rule_engine.app.src
Expand Up @@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.3"},
{vsn, "5.0.4"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt]},
Expand Down
9 changes: 9 additions & 0 deletions apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
Expand Up @@ -173,6 +173,15 @@ fields("republish_args") ->
default => <<"${payload}">>,
example => <<"${payload}">>
}
)},
{user_properties,
?HOCON(
binary(),
#{
desc => ?DESC("republish_args_user_properties"),
default => <<"${user_properties}">>,
example => <<"${pub_props.'User-Property'}">>
}
)}
].

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_rule_engine/src/emqx_rule_events.erl
Expand Up @@ -1060,7 +1060,7 @@ printable_maps(Headers) ->
(K, V, AccIn) ->
AccIn#{K => V}
end,
#{},
#{'User-Property' => #{}},
Headers
).

Expand Down

0 comments on commit cded5fc

Please sign in to comment.