Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1121 rule bridge properties #9398

Merged
merged 2 commits into from Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion apps/emqx/src/emqx_misc.erl
Expand Up @@ -54,7 +54,8 @@
pmap/3,
readable_error_msg/1,
safe_to_existing_atom/1,
safe_to_existing_atom/2
safe_to_existing_atom/2,
pub_props_to_packet/1
]).

-export([
Expand Down Expand Up @@ -568,3 +569,17 @@ ipv6_probe_test() ->
end.

-endif.

pub_props_to_packet(Properties) ->
F = fun
('User-Property', M) ->
case is_map(M) andalso map_size(M) > 0 of
true -> {true, maps:to_list(M)};
false -> false
end;
('User-Property-Pairs', _) ->
false;
(_, _) ->
true
end,
maps:filtermap(F, Properties).
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "An OTP application"},
{vsn, "0.1.7"},
{vsn, "0.1.8"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
10 changes: 6 additions & 4 deletions apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
Expand Up @@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
#mqtt_msg{
qos = QoS,
retain = Retain,
topic = topic(Mountpoint, Topic),
props = #{},
props = emqx_misc:pub_props_to_packet(PubProps),
payload = Payload
};
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}.

%% published from remote node over a MQTT connection
to_broker_msg(Msg, Vars, undefined) ->
to_broker_msg(Msg, Vars, #{});
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
Expand All @@ -103,8 +106,9 @@ to_broker_msg(
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
set_headers(
Props,
Props#{properties => emqx_misc:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
Expand Down Expand Up @@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) ->
estimate_size(Term) ->
erlang:external_size(Term).

set_headers(undefined, Msg) ->
Msg;
set_headers(Val, Msg) ->
emqx_message:set_headers(Val, Msg).
topic(undefined, Topic) -> Topic;
Expand Down
26 changes: 25 additions & 1 deletion apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
Expand Up @@ -218,7 +218,7 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
of the rule, then the string "undefined" is used.
"""
zh: """
要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。
"""
}
Expand All @@ -227,6 +227,30 @@ of the rule, then the string "undefined" is used.
zh: "消息负载"
}
}
republish_args_user_properties {
desc {
en: """
From which variable should the MQTT message's User-Property pairs be taken from.
The value must be a map.
You may configure it to <code>${pub_props.'User-Property'}</code> or
use <code>SELECT *,pub_props.'User-Property' as user_properties</code>
to forward the original user properties to the republished message.
You may also call <code>map_put</code> function like
<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
to inject user properties.
NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.
"""
zh: """
指定使用哪个变量来填充 MQTT 消息的 User-Property 列表。这个变量的值必须是一个 map 类型。
可以设置成 <code>${pub_props.'User-Property'}</code> 或者
使用 <code>SELECT *,pub_props.'User-Property' as user_properties</code> 来把源 MQTT 消息
的 User-Property 列表用于填充。
也可以使用 <code>map_put</code> 函数来添加新的 User-Property,
<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
注意:MQTT 协议允许一个消息中出现多次同一个 property 名,但是 EMQX 的规则引擎不允许。
"""
}
}

rule_engine_ignore_sys_message {
desc {
Expand Down
81 changes: 54 additions & 27 deletions apps/emqx_rule_engine/src/emqx_rule_actions.erl
Expand Up @@ -37,6 +37,8 @@

-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().

-define(ORIGINAL_USER_PROPERTIES, original).

%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
Expand All @@ -57,15 +59,17 @@ pre_process_action_args(
topic := Topic,
qos := QoS,
retain := Retain,
payload := Payload
payload := Payload,
user_properties := UserProperties
} = Args
) ->
Args#{
preprocessed_tmpl => #{
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
user_properties => preproc_user_properties(UserProperties)
}
};
pre_process_action_args(_, Args) ->
Expand Down Expand Up @@ -93,44 +97,39 @@ republish(
_Args
) ->
?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
%% republish a PUBLISH message
republish(
Selected,
#{flags := Flags, metadata := #{rule_id := RuleId}},
#{metadata := #{rule_id := RuleId}} = Env,
#{
preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks
payload := PayloadTks,
user_properties := UserPropertiesTks
}
}
) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
%% in case this is a "$events/" event
republish(
Selected,
#{metadata := #{rule_id := RuleId}},
#{
preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks
%% 'flags' is set for message re-publishes or message related
%% events such as message.acked and message.dropped
Flags0 = maps:get(flags, Env, #{}),
Flags = Flags0#{retain => Retain},
PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
?TRACE(
"RULE",
"republish_message",
#{
flags => Flags,
topic => Topic,
payload => Payload,
pub_props => PubProps
}
}
) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
Comment on lines -128 to -131
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these three lines duplicates line 113-116, so i merged the two function clauses.
the only special part is the flags field.

?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
),
safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).

%%--------------------------------------------------------------------
%% internal functions
Expand Down Expand Up @@ -168,13 +167,16 @@ pre_process_args(Mod, Func, Args) ->
false -> Args
end.

safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
Msg = #message{
id = emqx_guid:gen(),
qos = QoS,
from = RuleId,
flags = Flags,
headers = #{republish_by => RuleId},
headers = #{
republish_by => RuleId,
properties => emqx_misc:pub_props_to_packet(PubProps)
},
topic = Topic,
payload = Payload,
timestamp = erlang:system_time(millisecond)
Expand All @@ -187,6 +189,19 @@ preproc_vars(Data) when is_binary(Data) ->
preproc_vars(Data) ->
Data.

preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
%% keep the original
%% avoid processing this special variable because
%% we do not want to force users to select the value
%% the value will be taken from Env.pub_props directly
?ORIGINAL_USER_PROPERTIES;
preproc_user_properties(<<"${", _/binary>> = V) ->
%% use a variable
emqx_plugin_libs_rule:preproc_tmpl(V);
preproc_user_properties(_) ->
%% invalid, discard
undefined.

replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
case Var of
Expand All @@ -201,3 +216,15 @@ format_msg([], Selected) ->
emqx_json:encode(Selected);
format_msg(Tokens, Selected) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).

format_pub_props(UserPropertiesTks, Selected, Env) ->
UserProperties =
case UserPropertiesTks of
?ORIGINAL_USER_PROPERTIES ->
maps:get('User-Property', maps:get(pub_props, Env, #{}), #{});
undefined ->
#{};
_ ->
replace_simple_var(UserPropertiesTks, Selected, #{})
end,
#{'User-Property' => UserProperties}.
2 changes: 1 addition & 1 deletion apps/emqx_rule_engine/src/emqx_rule_engine.app.src
Expand Up @@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.3"},
{vsn, "5.0.4"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt]},
Expand Down
9 changes: 9 additions & 0 deletions apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
Expand Up @@ -173,6 +173,15 @@ fields("republish_args") ->
default => <<"${payload}">>,
example => <<"${payload}">>
}
)},
{user_properties,
?HOCON(
binary(),
#{
desc => ?DESC("republish_args_user_properties"),
default => <<"${user_properties}">>,
example => <<"${pub_props.'User-Property'}">>
}
)}
].

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_rule_engine/src/emqx_rule_events.erl
Expand Up @@ -1060,7 +1060,7 @@ printable_maps(Headers) ->
(K, V, AccIn) ->
AccIn#{K => V}
end,
#{},
#{'User-Property' => #{}},
Headers
).

Expand Down