Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMQX 7751 support message properties #9401

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 45 additions & 0 deletions apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,49 @@ MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消
zh: "失败的详细原因。"
}
}
message_properties {
desc {
en: "The Properties of the PUBLISH message."
zh: "PUBLISH 消息里的 Property 字段。"
}
}
msg_payload_format_indicator {
desc {
en: """0 (0x00) Byte Indicates that the Payload is unspecified bytes, which is equivalent to not sending a Payload Format Indicator.

1 (0x01) Byte Indicates that the Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload MUST be well-formed UTF-8 as defined by the Unicode specification and restated in RFC 3629.
lafirest marked this conversation as resolved.
Show resolved Hide resolved
"""
zh: "载荷格式指示标识符,0 表示载荷是未指定格式的数据,相当于没有发送载荷格式指示;1 表示载荷是 UTF-8 编码的字符数据,载荷中的 UTF-8 数据必须是按照 Unicode 的规范和 RFC 3629 的标准要求进行编码的。"
}
}
msg_message_expiry_interval {
desc {
en: "Identifier of the Message Expiry Interval. If the Message Expiry Interval has passed and the Server has not managed to start onward delivery to a matching subscriber, then it MUST delete the copy of the message for that subscriber."
zh: "消息过期间隔标识符,以秒为单位。当消失已经过期时,如果服务端还没有开始向匹配的订阅者投递该消息,则服务端会删除该订阅者的消息副本。如果不设置,则消息永远不会过期"
}
}
msg_response_topic {
desc {
en: "Identifier of the Response Topic.The Response Topic MUST be a UTF-8 Encoded, It MUST NOT contain wildcard characters."
zh: "响应主题标识符, UTF-8 编码的字符串,用作响应消息的主题名。响应主题不能包含通配符,也不能包含多个主题,否则将造成协议错误。当存在响应主题时,消息将被视作请求报文。服务端在收到应用消息时必须将响应主题原封不动的发送给所有的订阅者。"
}
}
msg_correlation_data {
desc {
en: "Identifier of the Correlation Data. The Server MUST send the Correlation Data unaltered to all subscribers receiving the Application Message."
zh: "对比数据标识符,服务端在收到应用消息时必须原封不动的把对比数据发送给所有的订阅者。对比数据只对请求消息(Request Message)的发送端和响应消息(Response Message)的接收端有意义。"
}
}
msg_user_properties {
desc {
en: "The User-Property key-value pairs. Note: in case there are duplicated keys, only the last one will be used."
zh: "指定 MQTT 消息的 User Property 键值对。注意,如果出现重复的键,只有最后一个会保留。"
}
}
msg_content_type {
desc {
en: "The Content Type MUST be a UTF-8 Encoded String."
zh: "内容类型标识符,以 UTF-8 格式编码的字符串,用来描述应用消息的内容,服务端必须把收到的应用消息中的内容类型原封不动的发送给所有的订阅者。"
}
}
}
68 changes: 67 additions & 1 deletion apps/emqx_management/src/emqx_mgmt_api_publish.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ fields(message) ->
required => true,
example => <<"hello emqx api">>
})},
{properties,
hoconsc:mk(hoconsc:ref(?MODULE, message_properties), #{
desc => ?DESC(message_properties),
required => false
})},
{retain,
hoconsc:mk(boolean(), #{
desc => ?DESC(retain),
Expand All @@ -130,6 +135,43 @@ fields(publish_message) ->
default => plain
})}
] ++ fields(message);
fields(message_properties) ->
[
{'payload_format_indicator',
hoconsc:mk(typerefl:range(0, 1), #{
desc => ?DESC(msg_payload_format_indicator),
required => false,
example => 0
})},
{'message_expiry_interval',
hoconsc:mk(integer(), #{
desc => ?DESC(msg_message_expiry_interval),
required => false
})},
{'response_topic',
hoconsc:mk(binary(), #{
desc => ?DESC(msg_response_topic),
required => false,
example => <<"some_other_topic">>
})},
{'correlation_data',
hoconsc:mk(binary(), #{
desc => ?DESC(msg_correlation_data),
required => false
})},
{'user_properties',
hoconsc:mk(map(), #{
desc => ?DESC(msg_user_properties),
required => false,
example => #{<<"foo">> => <<"bar">>}
})},
{'content_type',
hoconsc:mk(binary(), #{
desc => ?DESC(msg_content_type),
required => false,
example => <<"text/plain">>
})}
];
fields(publish_ok) ->
[
{id,
Expand Down Expand Up @@ -288,20 +330,44 @@ make_message(Map) ->
QoS = maps:get(<<"qos">>, Map, 0),
Topic = maps:get(<<"topic">>, Map),
Retain = maps:get(<<"retain">>, Map, false),
Headers =
case maps:get(<<"properties">>, Map, #{}) of
Properties when
is_map(Properties) andalso
map_size(Properties) > 0
->
#{properties => to_msg_properties(Properties)};
_ ->
#{}
end,
try
_ = emqx_topic:validate(name, Topic)
catch
error:_Reason ->
throw(invalid_topic_name)
end,
Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}),
Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, Headers),
Size = emqx_message:estimate_size(Message),
(Size > size_limit()) andalso throw(packet_too_large),
{ok, Message};
{error, R} ->
{error, R}
end.

to_msg_properties(Properties) ->
maps:fold(
fun to_property/3,
#{},
Properties
).

to_property(<<"payload_format_indicator">>, V, M) -> M#{'Payload-Format-Indicator' => V};
to_property(<<"message_expiry_interval">>, V, M) -> M#{'Message-Expiry-Interval' => V};
to_property(<<"response_topic">>, V, M) -> M#{'Response-Topic' => V};
to_property(<<"correlation_data">>, V, M) -> M#{'Correlation-Data' => V};
to_property(<<"user_properties">>, V, M) -> M#{'User-Property' => maps:to_list(V)};
to_property(<<"content_type">>, V, M) -> M#{'Content-Type' => V}.

sstrigler marked this conversation as resolved.
Show resolved Hide resolved
%% get the global packet size limit since HTTP API does not belong to any zone.
size_limit() ->
try
Expand Down
99 changes: 65 additions & 34 deletions apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").

-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
-include_lib("common_test/include/ct.hrl").

-define(TOPIC1, <<"api_topic1">>).
-define(TOPIC2, <<"api_topic2">>).
Expand All @@ -44,25 +42,56 @@ end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).

t_publish_api({init, Config}) ->
Config;
t_publish_api({'end', _Config}) ->
ok;
t_publish_api(_) ->
{ok, Client} = emqtt:start_link(#{
username => <<"api_username">>, clientid => <<"api_clientid">>
}),
{ok, Client} = emqtt:start_link(
#{
username => <<"api_username">>,
clientid => <<"api_clientid">>,
proto_ver => v5
}
),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
[{client, Client} | Config];
t_publish_api({'end', Config}) ->
Client = ?config(client, Config),
emqtt:stop(Client),
ok;
t_publish_api(_) ->
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Body = #{topic => ?TOPIC1, payload => Payload},
UserProperties = #{<<"foo">> => <<"bar">>},
Properties =
#{
<<"payload_format_indicator">> => 0,
<<"message_expiry_interval">> => 1000,
<<"response_topic">> => ?TOPIC2,
<<"correlation_data">> => <<"some_correlation_id">>,
<<"user_properties">> => UserProperties,
<<"content_type">> => <<"application/json">>
},
zmstone marked this conversation as resolved.
Show resolved Hide resolved
Body = #{topic => ?TOPIC1, payload => Payload, properties => Properties},
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
ResponseMap = decode_json(Response),
?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
emqtt:stop(Client).
{ok, Message} = receive_assert(?TOPIC1, 0, Payload),
RecvProperties = maps:get(properties, Message),
UserPropertiesList = maps:to_list(UserProperties),
#{
'Payload-Format-Indicator' := 0,
'Message-Expiry-Interval' := RecvMessageExpiry,
'Correlation-Data' := <<"some_correlation_id">>,
'User-Property' := UserPropertiesList,
'Content-Type' := <<"application/json">>
} = RecvProperties,
?assert(RecvMessageExpiry =< 1000),
%% note: without props this time
Body2 = #{topic => ?TOPIC2, payload => Payload},
{ok, Response2} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body2),
ResponseMap2 = decode_json(Response2),
?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap2))),
?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))).

t_publish_no_subscriber({init, Config}) ->
Config;
Expand Down Expand Up @@ -163,16 +192,18 @@ t_publish_bad_topic_bulk(_Config) ->
).

t_publish_bulk_api({init, Config}) ->
Config;
t_publish_bulk_api({'end', _Config}) ->
ok;
t_publish_bulk_api(_) ->
{ok, Client} = emqtt:start_link(#{
username => <<"api_username">>, clientid => <<"api_clientid">>
}),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
[{client, Client} | Config];
t_publish_bulk_api({'end', Config}) ->
Client = ?config(client, Config),
emqtt:stop(Client),
ok;
t_publish_bulk_api(_) ->
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Expand All @@ -199,9 +230,8 @@ t_publish_bulk_api(_) ->
end,
ResponseList
),
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
emqtt:stop(Client).
?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))),
?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))).

t_publish_no_subscriber_bulk({init, Config}) ->
Config;
Expand Down Expand Up @@ -232,8 +262,8 @@ t_publish_no_subscriber_bulk(_) ->
],
ResponseList
),
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))),
?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))),
emqtt:stop(Client).

t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
Expand Down Expand Up @@ -267,17 +297,19 @@ t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) -
t_publish_bulk_dispatch_failure({init, Config}) ->
meck:new(emqx, [no_link, passthrough, no_history]),
meck:expect(emqx, is_running, fun() -> false end),
Config;
t_publish_bulk_dispatch_failure({'end', _Config}) ->
meck:unload(emqx),
ok;
t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
{ok, Client} = emqtt:start_link(#{
username => <<"api_username">>, clientid => <<"api_clientid">>
}),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
[{client, Client} | Config];
t_publish_bulk_dispatch_failure({'end', Config}) ->
meck:unload(emqx),
Client = ?config(client, Config),
emqtt:stop(Client),
ok;
t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Expand All @@ -303,21 +335,20 @@ t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
],
decode_json(ResponseBody)
),
emqtt:stop(Client).
).

receive_assert(Topic, Qos, Payload) ->
receive
{publish, Message} ->
ReceiveTopic = maps:get(topic, Message),
ReceiveQos = maps:get(qos, Message),
ReceivePayload = maps:get(payload, Message),
?assertEqual(ReceiveTopic, Topic),
?assertEqual(ReceiveQos, Qos),
?assertEqual(ReceivePayload, Payload),
ok
?assertEqual(Topic, ReceiveTopic),
?assertEqual(Qos, ReceiveQos),
?assertEqual(Payload, ReceivePayload),
{ok, Message}
after 5000 ->
timeout
{error, timeout}
end.

decode_json(In) ->
Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.11-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

- Keep MQTT v5 User-Property pairs from bridge ingested MQTT messsages to bridge target [#9398](https://github.com/emqx/emqx/pull/9398).

- Support message properties in `/publish` API [#9401](https://github.com/emqx/emqx/pull/9401).

## Bug fixes

- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.11-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

- 为桥接收到的 MQTT v5 消息再转发时保留 User-Property 列表 [#9398](https://github.com/emqx/emqx/pull/9398)。

- 支持在 /publish API 中添加消息属性 [#9401](https://github.com/emqx/emqx/pull/9401)。

## 修复

- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。
Expand Down