From 00c57de4c3ebef0f076952a4f62dfe1d92f64695 Mon Sep 17 00:00:00 2001 From: kraftwerk28 Date: Wed, 26 Oct 2022 15:57:17 +0300 Subject: [PATCH 1/2] feat: do not drop MQTTv5 properties in rule/bridge --- apps/emqx/src/emqx_misc.erl | 17 +++- .../emqx_connector/src/emqx_connector.app.src | 2 +- .../src/mqtt/emqx_connector_mqtt_msg.erl | 10 ++- .../src/emqx_rule_actions.erl | 27 +++++- .../src/emqx_rule_engine.app.src | 2 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 2 +- .../test/emqx_rule_engine_SUITE.erl | 88 +++++++++++++++++-- changes/v5.0.11-en.md | 2 + changes/v5.0.11-zh.md | 2 + 9 files changed, 134 insertions(+), 18 deletions(-) diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index 674465e6a5..9683b1a8b0 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -54,7 +54,8 @@ pmap/3, readable_error_msg/1, safe_to_existing_atom/1, - safe_to_existing_atom/2 + safe_to_existing_atom/2, + pub_props_to_packet/1 ]). -export([ @@ -568,3 +569,17 @@ ipv6_probe_test() -> end. -endif. + +pub_props_to_packet(Properties) -> + F = fun + ('User-Property', M) -> + case is_map(M) andalso map_size(M) > 0 of + true -> {true, maps:to_list(M)}; + false -> false + end; + ('User-Property-Pairs', _) -> + false; + (_, _) -> + true + end, + maps:filtermap(F, Properties). diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 64003209fd..547a37b8e5 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "An OTP application"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 43700506b3..469dd952b8 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{ Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), + PubProps = maps:get(pub_props, MapMsg, #{}), #mqtt_msg{ qos = QoS, retain = Retain, topic = topic(Mountpoint, Topic), - props = #{}, + props = emqx_misc:pub_props_to_packet(PubProps), payload = Payload }; to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection +to_broker_msg(Msg, Vars, undefined) -> + to_broker_msg(Msg, Vars, #{}); to_broker_msg( #{dup := Dup} = MapMsg, #{ @@ -103,8 +106,9 @@ to_broker_msg( Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), + PubProps = maps:get(pub_props, MapMsg, #{}), set_headers( - Props, + Props#{properties => emqx_misc:pub_props_to_packet(PubProps)}, emqx_message:set_flags( #{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload) @@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) -> estimate_size(Term) -> erlang:external_size(Term). -set_headers(undefined, Msg) -> - Msg; set_headers(Val, Msg) -> emqx_message:set_headers(Val, Msg). topic(undefined, Topic) -> Topic; diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 998fc1a5e3..dd26f0a29a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -110,8 +110,9 @@ republish( Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), + PubProps = format_pub_props(maps:get(<<"pub_props">>, Selected, #{})), ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}), - safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); + safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload, PubProps); %% in case this is a "$events/" event republish( Selected, @@ -129,8 +130,9 @@ republish( Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), + PubProps = maps:get(pub_props, Selected, #{}), ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}), - safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). + safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload, PubProps). %%-------------------------------------------------------------------- %% internal functions @@ -168,13 +170,16 @@ pre_process_args(Mod, Func, Args) -> false -> Args end. -safe_publish(RuleId, Topic, QoS, Flags, Payload) -> +safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) -> Msg = #message{ id = emqx_guid:gen(), qos = QoS, from = RuleId, flags = Flags, - headers = #{republish_by => RuleId}, + headers = #{ + republish_by => RuleId, + properties => emqx_misc:pub_props_to_packet(PubProps) + }, topic = Topic, payload = Payload, timestamp = erlang:system_time(millisecond) @@ -201,3 +206,17 @@ format_msg([], Selected) -> emqx_json:encode(Selected); format_msg(Tokens, Selected) -> emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected). + +format_pub_props(Props) -> + maps:fold(fun format_pub_prop/3, #{}, Props). + +format_pub_prop(K, V, Acc) when is_atom(K) -> + Acc#{K => V}; +format_pub_prop(K, V, Acc) when is_binary(K) -> + try + K1 = erlang:binary_to_existing_atom(K), + format_pub_prop(K1, V, Acc) + catch + _:_ -> + Acc#{K => V} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 6bb9ad0101..6419e41844 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.3"}, + {vsn, "5.0.4"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 2935aeeb9c..b80b9777a0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -1060,7 +1060,7 @@ printable_maps(Headers) -> (K, V, AccIn) -> AccIn#{K => V} end, - #{}, + #{'User-Property' => #{}}, Headers ). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 17fe1a36cd..087a73c345 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -59,11 +59,13 @@ groups() -> t_sqlselect_0, t_sqlselect_00, t_sqlselect_001, + t_sqlselect_inject_props, t_sqlselect_01, t_sqlselect_02, t_sqlselect_1, t_sqlselect_2, t_sqlselect_3, + t_sqlselect_message_publish_event, t_sqlparse_event_1, t_sqlparse_event_2, t_sqlparse_event_3, @@ -936,9 +938,41 @@ t_sqlselect_001(_Config) -> ) ). +t_sqlselect_inject_props(_Config) -> + SQL = + "SELECT json_decode(payload) as p, payload, " + "map_put('discard', 'discard', pub_props) as pub_props, " + "map_put('inject_key', 'inject_val', pub_props.'User-Property') as pub_props.'User-Property' " + "FROM \"t3/#\", \"t1\" " + "WHERE p.x = 1", + Repub = republish_action(<<"t2">>), + {ok, TopicRule1} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [Repub] + } + ), + Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), + emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]), + ct:sleep(100), + receive + {publish, #{topic := T, payload := Payload, properties := Props2}} -> + ?assertEqual(Props, Props2), + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"{\"x\":1}">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + emqtt:stop(Client), + delete_rule(TopicRule1). + t_sqlselect_01(_Config) -> SQL = - "SELECT json_decode(payload) as p, payload " + "SELECT json_decode(payload) as p, payload, pub_props " "FROM \"t3/#\", \"t1\" " "WHERE p.x = 1", Repub = republish_action(<<"t2">>), @@ -949,10 +983,11 @@ t_sqlselect_01(_Config) -> actions => [Repub] } ), - {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + Props = user_properties(#{<<"mykey">> => <<"myval">>}), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0), + emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]), ct:sleep(100), receive {publish, #{topic := T, payload := Payload}} -> @@ -962,7 +997,7 @@ t_sqlselect_01(_Config) -> ct:fail(wait_for_t2) end, - emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0), + emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":2}">>, [{qos, 0}]), receive {publish, #{topic := <<"t2">>, payload := _}} -> ct:fail(unexpected_t2) @@ -970,9 +1005,10 @@ t_sqlselect_01(_Config) -> ok end, - emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0), + emqtt:publish(Client, <<"t3/a">>, Props, <<"{\"x\":1}">>, [{qos, 0}]), receive - {publish, #{topic := T3, payload := Payload3}} -> + {publish, #{topic := T3, payload := Payload3, properties := Props2}} -> + ?assertEqual(Props, Props2), ?assertEqual(<<"t2">>, T3), ?assertEqual(<<"{\"x\":1}">>, Payload3) after 1000 -> @@ -1135,6 +1171,43 @@ t_sqlselect_3(_Config) -> emqtt:stop(Client), delete_rule(TopicRule). +t_sqlselect_message_publish_event(_Config) -> + %% republish the client.connected msg + Topic = <<"foo/bar/1">>, + SQL = << + "SELECT clientid, pub_props " + "FROM \"$events/message_dropped\" " + >>, + + %"WHERE topic = \"", Topic/binary, "\"">>, + Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>), + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [Repub] + } + ), + {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1), + ct:sleep(200), + {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + Props = user_properties(#{<<"mykey">> => <<"222222222222">>}), + emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]), + receive + {publish, #{topic := T, payload := Payload, properties := Props1}} -> + ?assertEqual(Props1, Props), + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"clientid=pub-02">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + emqtt:stop(Client2), + emqtt:stop(Client1), + delete_rule(TopicRule). + t_sqlparse_event_1(_Config) -> Sql = "select topic as tp " @@ -2869,6 +2942,9 @@ verify_ipaddr(IPAddrS) -> init_events_counters() -> ets:new(events_record_tab, [named_table, bag, public]). +user_properties(PairsMap) -> + #{'User-Property' => maps:to_list(PairsMap)}. + %%------------------------------------------------------------------------------ %% Start Apps %%------------------------------------------------------------------------------ diff --git a/changes/v5.0.11-en.md b/changes/v5.0.11-en.md index def22e17cd..e53c5785eb 100644 --- a/changes/v5.0.11-en.md +++ b/changes/v5.0.11-en.md @@ -21,6 +21,8 @@ - Set the default value for the maximum level of a topic to 128 [#9406](https://github.com/emqx/emqx/pull/9406). +- Keep MQTT v5 User-Property pairs from bridge ingested MQTT messsages to bridge target [#9398](https://github.com/emqx/emqx/pull/9398). + ## Bug fixes - Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307). diff --git a/changes/v5.0.11-zh.md b/changes/v5.0.11-zh.md index 7a4e770f97..3ea516dad3 100644 --- a/changes/v5.0.11-zh.md +++ b/changes/v5.0.11-zh.md @@ -19,6 +19,8 @@ - 将主题的最大层级限制的默认值设置为128 [#9406](https://github.com/emqx/emqx/pull/9406)。 +- 为桥接收到的 MQTT v5 消息再转发时保留 User-Property 列表 [#9398](https://github.com/emqx/emqx/pull/9398)。 + ## 修复 - 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。 From f3df2c80d8b3a5b604ec8518091146d4e7cbb671 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 22 Nov 2022 19:39:21 +0100 Subject: [PATCH 2/2] feat: add user_properties arg for republish action --- .../i18n/emqx_rule_engine_schema.conf | 26 +++++- .../src/emqx_rule_actions.erl | 88 ++++++++++--------- .../src/emqx_rule_engine_schema.erl | 9 ++ .../test/emqx_rule_engine_SUITE.erl | 87 +++++++++++++----- 4 files changed, 148 insertions(+), 62 deletions(-) diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf index c0009a0404..bc5735c676 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf @@ -218,7 +218,7 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re of the rule, then the string "undefined" is used. """ zh: """ -要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。。 +要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。 默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。 """ } @@ -227,6 +227,30 @@ of the rule, then the string "undefined" is used. zh: "消息负载" } } + republish_args_user_properties { + desc { + en: """ +From which variable should the MQTT message's User-Property pairs be taken from. +The value must be a map. +You may configure it to ${pub_props.'User-Property'} or +use SELECT *,pub_props.'User-Property' as user_properties +to forward the original user properties to the republished message. +You may also call map_put function like +map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties +to inject user properties. +NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not. +""" + zh: """ +指定使用哪个变量来填充 MQTT 消息的 User-Property 列表。这个变量的值必须是一个 map 类型。 +可以设置成 ${pub_props.'User-Property'} 或者 +使用 SELECT *,pub_props.'User-Property' as user_properties 来把源 MQTT 消息 +的 User-Property 列表用于填充。 +也可以使用 map_put 函数来添加新的 User-Property, +map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties +注意:MQTT 协议允许一个消息中出现多次同一个 property 名,但是 EMQX 的规则引擎不允许。 +""" + } + } rule_engine_ignore_sys_message { desc { diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index dd26f0a29a..8971159e76 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -37,6 +37,8 @@ -callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args(). +-define(ORIGINAL_USER_PROPERTIES, original). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -57,7 +59,8 @@ pre_process_action_args( topic := Topic, qos := QoS, retain := Retain, - payload := Payload + payload := Payload, + user_properties := UserProperties } = Args ) -> Args#{ @@ -65,7 +68,8 @@ pre_process_action_args( topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), qos => preproc_vars(QoS), retain => preproc_vars(Retain), - payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) + payload => emqx_plugin_libs_rule:preproc_tmpl(Payload), + user_properties => preproc_user_properties(UserProperties) } }; pre_process_action_args(_, Args) -> @@ -93,16 +97,16 @@ republish( _Args ) -> ?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic}); -%% republish a PUBLISH message republish( Selected, - #{flags := Flags, metadata := #{rule_id := RuleId}}, + #{metadata := #{rule_id := RuleId}} = Env, #{ preprocessed_tmpl := #{ qos := QoSTks, retain := RetainTks, topic := TopicTks, - payload := PayloadTks + payload := PayloadTks, + user_properties := UserPropertiesTks } } ) -> @@ -110,29 +114,22 @@ republish( Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), - PubProps = format_pub_props(maps:get(<<"pub_props">>, Selected, #{})), - ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}), - safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload, PubProps); -%% in case this is a "$events/" event -republish( - Selected, - #{metadata := #{rule_id := RuleId}}, - #{ - preprocessed_tmpl := #{ - qos := QoSTks, - retain := RetainTks, - topic := TopicTks, - payload := PayloadTks + %% 'flags' is set for message re-publishes or message related + %% events such as message.acked and message.dropped + Flags0 = maps:get(flags, Env, #{}), + Flags = Flags0#{retain => Retain}, + PubProps = format_pub_props(UserPropertiesTks, Selected, Env), + ?TRACE( + "RULE", + "republish_message", + #{ + flags => Flags, + topic => Topic, + payload => Payload, + pub_props => PubProps } - } -) -> - Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), - Payload = format_msg(PayloadTks, Selected), - QoS = replace_simple_var(QoSTks, Selected, 0), - Retain = replace_simple_var(RetainTks, Selected, false), - PubProps = maps:get(pub_props, Selected, #{}), - ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}), - safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload, PubProps). + ), + safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps). %%-------------------------------------------------------------------- %% internal functions @@ -192,6 +189,19 @@ preproc_vars(Data) when is_binary(Data) -> preproc_vars(Data) -> Data. +preproc_user_properties(<<"${pub_props.'User-Property'}">>) -> + %% keep the original + %% avoid processing this special variable because + %% we do not want to force users to select the value + %% the value will be taken from Env.pub_props directly + ?ORIGINAL_USER_PROPERTIES; +preproc_user_properties(<<"${", _/binary>> = V) -> + %% use a variable + emqx_plugin_libs_rule:preproc_tmpl(V); +preproc_user_properties(_) -> + %% invalid, discard + undefined. + replace_simple_var(Tokens, Data, Default) when is_list(Tokens) -> [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), case Var of @@ -207,16 +217,14 @@ format_msg([], Selected) -> format_msg(Tokens, Selected) -> emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected). -format_pub_props(Props) -> - maps:fold(fun format_pub_prop/3, #{}, Props). - -format_pub_prop(K, V, Acc) when is_atom(K) -> - Acc#{K => V}; -format_pub_prop(K, V, Acc) when is_binary(K) -> - try - K1 = erlang:binary_to_existing_atom(K), - format_pub_prop(K1, V, Acc) - catch - _:_ -> - Acc#{K => V} - end. +format_pub_props(UserPropertiesTks, Selected, Env) -> + UserProperties = + case UserPropertiesTks of + ?ORIGINAL_USER_PROPERTIES -> + maps:get('User-Property', maps:get(pub_props, Env, #{}), #{}); + undefined -> + #{}; + _ -> + replace_simple_var(UserPropertiesTks, Selected, #{}) + end, + #{'User-Property' => UserProperties}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index eec41bde89..d299a6bb4b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -173,6 +173,15 @@ fields("republish_args") -> default => <<"${payload}">>, example => <<"${payload}">> } + )}, + {user_properties, + ?HOCON( + binary(), + #{ + desc => ?DESC("republish_args_user_properties"), + default => <<"${user_properties}">>, + example => <<"${pub_props.'User-Property'}">> + } )} ]. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 087a73c345..50bb55fe1f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -65,7 +65,8 @@ groups() -> t_sqlselect_1, t_sqlselect_2, t_sqlselect_3, - t_sqlselect_message_publish_event, + t_sqlselect_message_publish_event_keep_original_props_1, + t_sqlselect_message_publish_event_keep_original_props_2, t_sqlparse_event_1, t_sqlparse_event_2, t_sqlparse_event_3, @@ -941,8 +942,7 @@ t_sqlselect_001(_Config) -> t_sqlselect_inject_props(_Config) -> SQL = "SELECT json_decode(payload) as p, payload, " - "map_put('discard', 'discard', pub_props) as pub_props, " - "map_put('inject_key', 'inject_val', pub_props.'User-Property') as pub_props.'User-Property' " + "map_put('inject_key', 'inject_val', user_properties) as user_properties " "FROM \"t3/#\", \"t1\" " "WHERE p.x = 1", Repub = republish_action(<<"t2">>), @@ -958,13 +958,12 @@ t_sqlselect_inject_props(_Config) -> {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]), - ct:sleep(100), receive {publish, #{topic := T, payload := Payload, properties := Props2}} -> ?assertEqual(Props, Props2), ?assertEqual(<<"t2">>, T), ?assertEqual(<<"{\"x\":1}">>, Payload) - after 1000 -> + after 2000 -> ct:fail(wait_for_t2) end, emqtt:stop(Client), @@ -972,10 +971,10 @@ t_sqlselect_inject_props(_Config) -> t_sqlselect_01(_Config) -> SQL = - "SELECT json_decode(payload) as p, payload, pub_props " + "SELECT json_decode(payload) as p, payload " "FROM \"t3/#\", \"t1\" " "WHERE p.x = 1", - Repub = republish_action(<<"t2">>), + Repub = republish_action(<<"t2">>, <<"${payload}">>, <<"${pub_props.'User-Property'}">>), {ok, TopicRule1} = emqx_rule_engine:create_rule( #{ sql => SQL, @@ -988,12 +987,11 @@ t_sqlselect_01(_Config) -> {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]), - ct:sleep(100), receive {publish, #{topic := T, payload := Payload}} -> ?assertEqual(<<"t2">>, T), ?assertEqual(<<"{\"x\":1}">>, Payload) - after 1000 -> + after 2000 -> ct:fail(wait_for_t2) end, @@ -1001,7 +999,7 @@ t_sqlselect_01(_Config) -> receive {publish, #{topic := <<"t2">>, payload := _}} -> ct:fail(unexpected_t2) - after 1000 -> + after 2000 -> ok end, @@ -1011,8 +1009,8 @@ t_sqlselect_01(_Config) -> ?assertEqual(Props, Props2), ?assertEqual(<<"t2">>, T3), ?assertEqual(<<"{\"x\":1}">>, Payload3) - after 1000 -> - ct:fail(wait_for_t2) + after 2000 -> + ct:fail(wait_for_t3) end, emqtt:stop(Client), @@ -1080,13 +1078,12 @@ t_sqlselect_1(_Config) -> {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - ct:sleep(200), emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0), receive {publish, #{topic := T, payload := Payload}} -> ?assertEqual(<<"t2">>, T), ?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload) - after 1000 -> + after 2000 -> ct:fail(wait_for_t2) end, @@ -1149,14 +1146,13 @@ t_sqlselect_3(_Config) -> {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - ct:sleep(200), {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]), {ok, _} = emqtt:connect(Client1), receive {publish, #{topic := T, payload := Payload}} -> ?assertEqual(<<"t2">>, T), ?assertEqual(<<"clientid=c_emqx1">>, Payload) - after 1000 -> + after 2000 -> ct:fail(wait_for_t2) end, @@ -1171,11 +1167,51 @@ t_sqlselect_3(_Config) -> emqtt:stop(Client), delete_rule(TopicRule). -t_sqlselect_message_publish_event(_Config) -> +t_sqlselect_message_publish_event_keep_original_props_1(_Config) -> + %% republish the client.connected msg + Topic = <<"foo/bar/1">>, + SQL = << + "SELECT clientid " + "FROM \"$events/message_dropped\" " + >>, + + %"WHERE topic = \"", Topic/binary, "\"">>, + Repub = republish_action( + <<"t2">>, + <<"clientid=${clientid}">>, + <<"${pub_props.'User-Property'}">> + ), + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [Repub] + } + ), + {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1), + {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + Props = user_properties(#{<<"mykey">> => <<"111111">>}), + emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]), + receive + {publish, #{topic := T, payload := Payload, properties := Props1}} -> + ?assertEqual(Props1, Props), + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"clientid=pub-02">>, Payload) + after 2000 -> + ct:fail(wait_for_t2) + end, + emqtt:stop(Client2), + emqtt:stop(Client1), + delete_rule(TopicRule). + +t_sqlselect_message_publish_event_keep_original_props_2(_Config) -> %% republish the client.connected msg Topic = <<"foo/bar/1">>, SQL = << - "SELECT clientid, pub_props " + "SELECT clientid, pub_props.'User-Property' as user_properties " "FROM \"$events/message_dropped\" " >>, @@ -1191,7 +1227,6 @@ t_sqlselect_message_publish_event(_Config) -> {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1), - ct:sleep(200), {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]), {ok, _} = emqtt:connect(Client2), Props = user_properties(#{<<"mykey">> => <<"222222222222">>}), @@ -1201,7 +1236,7 @@ t_sqlselect_message_publish_event(_Config) -> ?assertEqual(Props1, Props), ?assertEqual(<<"t2">>, T), ?assertEqual(<<"clientid=pub-02">>, Payload) - after 1000 -> + after 2000 -> ct:fail(wait_for_t2) end, emqtt:stop(Client2), @@ -2553,10 +2588,20 @@ t_get_basic_usage_info_1(_Config) -> republish_action(Topic) -> republish_action(Topic, <<"${payload}">>). + republish_action(Topic, Payload) -> + republish_action(Topic, Payload, <<"${user_properties}">>). + +republish_action(Topic, Payload, UserProperties) -> #{ function => republish, - args => #{payload => Payload, topic => Topic, qos => 0, retain => false} + args => #{ + payload => Payload, + topic => Topic, + qos => 0, + retain => false, + user_properties => UserProperties + } }. make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->