New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add client mqueue/inflight messages API #12561
feat: add client mqueue/inflight messages API #12561
Conversation
6a46eef
to
a870dc0
Compare
from_clientid => emqx_utils_conv:bin(From), | ||
from_username => maps:get(username, Headers, <<>>) | ||
}, | ||
case erlang:byte_size(Payload) =< ?MAX_MSG_PAYLOAD_SIZE of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: message formatter/schema, payload size limit is copied from emqx_retainer_api.
Perhaps some extra info should be added when payload is omitted due to exceeding the limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add an option like ?payload=none | base64 | plain
to the API.
Return 400 if payload is not JSON serializable and plain
is requested.
About size limit, maybe alongside the page
limit, there can be a max_payload_bytes
limit and each response try to accumulate not greater than this limit however ensure at least one message is returned even if the size is above limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) -> | |||
emqx_inflight:size(Inflight); | |||
info(inflight_max, #session{inflight = Inflight}) -> | |||
emqx_inflight:max_size(Inflight); | |||
info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) -> | |||
{InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams), | |||
{[I#inflight_data.message || {_, I} <- InflightList], Meta}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we enrich message with some inflight details (wait_ack | wait_comp
, timestamp
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe no need for now
1a76e0f
to
60000af
Compare
qos => Qos, | ||
topic => Topic, | ||
publish_at => | ||
emqx_utils_calendar:epoch_to_rfc3339(Timestamp), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe should not format it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed retainer messages formatting...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed time formatting
from_clientid => emqx_utils_conv:bin(From), | ||
from_username => maps:get(username, Headers, <<>>) | ||
}, | ||
case erlang:byte_size(Payload) =< ?MAX_MSG_PAYLOAD_SIZE of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add an option like ?payload=none | base64 | plain
to the API.
Return 400 if payload is not JSON serializable and plain
is requested.
About size limit, maybe alongside the page
limit, there can be a max_payload_bytes
limit and each response try to accumulate not greater than this limit however ensure at least one message is returned even if the size is above limit.
60000af
to
e0d13ed
Compare
<<"meta">> => #{ | ||
<<"count">> => 1, | ||
<<"continuation">> => <<"AAYS53qRa0n07AAABFIACg==">>, | ||
<<"limit">> => 50, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is maybe no need to return limit
in response ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed limit
from meta
changes/ce/feat-12561.en.md
Outdated
@@ -0,0 +1,3 @@ | |||
Implement HTTP API to get the list of client's inflight and mqueue messages: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement HTTP API to get the list of client's inflight and mqueue messages: | |
Implement HTTP APIs to get the list of client's inflight and mqueue messages: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
changes/ce/feat-12561.en.md
Outdated
@@ -0,0 +1,3 @@ | |||
Implement HTTP API to get the list of client's inflight and mqueue messages: | |||
- GET /clients/{clientid}/mqueue_messages?page=1&limit=100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
page is now continuation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, thanks
changes/ce/feat-12561.en.md
Outdated
@@ -0,0 +1,3 @@ | |||
Implement HTTP API to get the list of client's inflight and mqueue messages: | |||
- GET /clients/{clientid}/mqueue_messages?page=1&limit=100 | |||
- GET /clients/{clientid}/inflight_messages?page=1&limit=100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add:
- he ordering of the messages in each page?
- how to start the first page
- how to continue the next pages
- what happens if continuation is invalid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
a186ee3
to
5606257
Compare
Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)}, | ||
%% Make sure minirest won't set another content-type for self-encoded JSON response body | ||
Headers = #{<<"content-type">> => <<"application/json">>}, | ||
case emqx_utils_json:safe_encode(Resp) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this can actually serialize pretty much every payload, as it is not parsed and validated.
For example, if message.payload is invalid JSON string (missing enclosing }
), it won't be detected:
emqx_utils_json:safe_encode(#{message => <<"{\"foo\": \"bar\"">>}).
{ok,<<"{\"message\":\"{\\\"foo\\\": \\\"bar\\\"\"}">>}
```
5606257
to
a2e7616
Compare
Fixes EMQX-11861
Release version: v/e5.6.0
NOTE: session_mem only implementation, persistent_session is planned to be addressed in a separate PR.
Summary
PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update