Skip to content

Commit

Permalink
feat: do not drop MQTTv5 properties in rule/bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
kraftwerk28 authored and zmstone committed Nov 21, 2022
1 parent 966e6dd commit 9e45ad1
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 17 deletions.
17 changes: 16 additions & 1 deletion apps/emqx/src/emqx_misc.erl
Expand Up @@ -54,7 +54,8 @@
pmap/3,
readable_error_msg/1,
safe_to_existing_atom/1,
safe_to_existing_atom/2
safe_to_existing_atom/2,
pub_props_to_packet/1
]).

-export([
Expand Down Expand Up @@ -568,3 +569,17 @@ ipv6_probe_test() ->
end.

-endif.

pub_props_to_packet(Properties) ->
F = fun
('User-Property', M) ->
case is_map(M) andalso map_size(M) > 0 of
true -> {true, maps:to_list(M)};
false -> false
end;
('User-Property-Pairs', _) ->
false;
(_, _) ->
true
end,
maps:filtermap(F, Properties).
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "An OTP application"},
{vsn, "0.1.7"},
{vsn, "0.1.8"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
10 changes: 6 additions & 4 deletions apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
Expand Up @@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
#mqtt_msg{
qos = QoS,
retain = Retain,
topic = topic(Mountpoint, Topic),
props = #{},
props = emqx_misc:pub_props_to_packet(PubProps),
payload = Payload
};
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}.

%% published from remote node over a MQTT connection
to_broker_msg(Msg, Vars, undefined) ->
to_broker_msg(Msg, Vars, #{});
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
Expand All @@ -103,8 +106,9 @@ to_broker_msg(
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
set_headers(
Props,
Props#{properties => emqx_misc:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
Expand Down Expand Up @@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) ->
estimate_size(Term) ->
erlang:external_size(Term).

set_headers(undefined, Msg) ->
Msg;
set_headers(Val, Msg) ->
emqx_message:set_headers(Val, Msg).
topic(undefined, Topic) -> Topic;
Expand Down
27 changes: 23 additions & 4 deletions apps/emqx_rule_engine/src/emqx_rule_actions.erl
Expand Up @@ -110,8 +110,9 @@ republish(
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
PubProps = format_pub_props(maps:get(<<"pub_props">>, Selected, #{})),
?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload, PubProps);
%% in case this is a "$events/" event
republish(
Selected,
Expand All @@ -129,8 +130,9 @@ republish(
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
PubProps = maps:get(pub_props, Selected, #{}),
?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload, PubProps).

%%--------------------------------------------------------------------
%% internal functions
Expand Down Expand Up @@ -168,13 +170,16 @@ pre_process_args(Mod, Func, Args) ->
false -> Args
end.

safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
Msg = #message{
id = emqx_guid:gen(),
qos = QoS,
from = RuleId,
flags = Flags,
headers = #{republish_by => RuleId},
headers = #{
republish_by => RuleId,
properties => emqx_misc:pub_props_to_packet(PubProps)
},
topic = Topic,
payload = Payload,
timestamp = erlang:system_time(millisecond)
Expand All @@ -201,3 +206,17 @@ format_msg([], Selected) ->
emqx_json:encode(Selected);
format_msg(Tokens, Selected) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).

format_pub_props(Props) ->
maps:fold(fun format_pub_prop/3, #{}, Props).

format_pub_prop(K, V, Acc) when is_atom(K) ->
Acc#{K => V};
format_pub_prop(K, V, Acc) when is_binary(K) ->
try
K1 = erlang:binary_to_existing_atom(K),
format_pub_prop(K1, V, Acc)
catch
_:_ ->
Acc#{K => V}
end.
2 changes: 1 addition & 1 deletion apps/emqx_rule_engine/src/emqx_rule_engine.app.src
Expand Up @@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.3"},
{vsn, "5.0.4"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt]},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_rule_engine/src/emqx_rule_events.erl
Expand Up @@ -1060,7 +1060,7 @@ printable_maps(Headers) ->
(K, V, AccIn) ->
AccIn#{K => V}
end,
#{},
#{'User-Property' => #{}},
Headers
).

Expand Down
86 changes: 81 additions & 5 deletions apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
Expand Up @@ -59,11 +59,13 @@ groups() ->
t_sqlselect_0,
t_sqlselect_00,
t_sqlselect_001,
t_sqlselect_inject_props,
t_sqlselect_01,
t_sqlselect_02,
t_sqlselect_1,
t_sqlselect_2,
t_sqlselect_3,
t_sqlselect_message_publish_event,
t_sqlparse_event_1,
t_sqlparse_event_2,
t_sqlparse_event_3,
Expand Down Expand Up @@ -936,6 +938,38 @@ t_sqlselect_001(_Config) ->
)
).

t_sqlselect_inject_props(_Config) ->
SQL =
"SELECT json_decode(payload) as p, payload, "
"map_put('discard', 'discard', pub_props) as pub_props, "
"map_put('inject_key', 'inject_val', pub_props.'User-Property') as pub_props.'User-Property' "
"FROM \"t3/#\", \"t1\" "
"WHERE p.x = 1",
Repub = republish_action(<<"t2">>),
{ok, TopicRule1} = emqx_rule_engine:create_rule(
#{
sql => SQL,
id => ?TMP_RULEID,
actions => [Repub]
}
),
Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
ct:sleep(100),
receive
{publish, #{topic := T, payload := Payload, properties := Props2}} ->
?assertEqual(Props, Props2),
?assertEqual(<<"t2">>, T),
?assertEqual(<<"{\"x\":1}">>, Payload)
after 1000 ->
ct:fail(wait_for_t2)
end,
emqtt:stop(Client),
delete_rule(TopicRule1).

t_sqlselect_01(_Config) ->
SQL =
"SELECT json_decode(payload) as p, payload "
Expand All @@ -949,10 +983,11 @@ t_sqlselect_01(_Config) ->
actions => [Repub]
}
),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
Props = user_properties(#{<<"mykey">> => <<"myval">>}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
ct:sleep(100),
receive
{publish, #{topic := T, payload := Payload}} ->
Expand All @@ -962,17 +997,18 @@ t_sqlselect_01(_Config) ->
ct:fail(wait_for_t2)
end,

emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":2}">>, [{qos, 0}]),
receive
{publish, #{topic := <<"t2">>, payload := _}} ->
ct:fail(unexpected_t2)
after 1000 ->
ok
end,

emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
emqtt:publish(Client, <<"t3/a">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
receive
{publish, #{topic := T3, payload := Payload3}} ->
{publish, #{topic := T3, payload := Payload3, properties := Props2}} ->
?assertEqual(Props, Props2),
?assertEqual(<<"t2">>, T3),
?assertEqual(<<"{\"x\":1}">>, Payload3)
after 1000 ->
Expand Down Expand Up @@ -1135,6 +1171,43 @@ t_sqlselect_3(_Config) ->
emqtt:stop(Client),
delete_rule(TopicRule).

t_sqlselect_message_publish_event(_Config) ->
%% republish the client.connected msg
Topic = <<"foo/bar/1">>,
SQL = <<
"SELECT clientid, pub_props "
"FROM \"$events/message_dropped\" "
>>,

%"WHERE topic = \"", Topic/binary, "\"">>,
Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
{ok, TopicRule} = emqx_rule_engine:create_rule(
#{
sql => SQL,
id => ?TMP_RULEID,
actions => [Repub]
}
),
{ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1),
{ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
ct:sleep(200),
{ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client2),
Props = user_properties(#{<<"mykey">> => <<"222222222222">>}),
emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
receive
{publish, #{topic := T, payload := Payload, properties := Props1}} ->
?assertEqual(Props1, Props),
?assertEqual(<<"t2">>, T),
?assertEqual(<<"clientid=pub-02">>, Payload)
after 1000 ->
ct:fail(wait_for_t2)
end,
emqtt:stop(Client2),
emqtt:stop(Client1),
delete_rule(TopicRule).

t_sqlparse_event_1(_Config) ->
Sql =
"select topic as tp "
Expand Down Expand Up @@ -2869,6 +2942,9 @@ verify_ipaddr(IPAddrS) ->
init_events_counters() ->
ets:new(events_record_tab, [named_table, bag, public]).

user_properties(PairsMap) ->
#{'User-Property' => maps:to_list(PairsMap)}.

%%------------------------------------------------------------------------------
%% Start Apps
%%------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions changes/v5.0.11-en.md
Expand Up @@ -19,6 +19,7 @@

- Improve node name generation rules to avoid potential atom table overflow risk [#9387](https://github.com/emqx/emqx/pull/9387).

- Keep MQTT v5 User-Properties from bridge ingested MQTT messsages to bridge target [#9240](https://github.com/emqx/emqx/pull/9240).

## Bug fixes

Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.11-zh.md
Expand Up @@ -17,6 +17,8 @@

- 改进了节点名称生成规则,以避免潜在的原子表溢出风险 [#9387](https://github.com/emqx/emqx/pull/9387)

- 为桥接收到的 MQTT v5 消息再转发时保留 User-Properties [#9240](https://github.com/emqx/emqx/pull/9240)

## 修复

- 修复创建追踪日志时偶尔会报`end_at time has already passed`错误,导致创建失败。[#9303](https://github.com/emqx/emqx/pull/9303)
Expand Down

0 comments on commit 9e45ad1

Please sign in to comment.