diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 192335a251..41dc5c65df 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1198,6 +1198,10 @@ handle_call( ChanInfo1 = info(NChannel), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), reply(ok, reset_timer(keepalive, NChannel)); +handle_call({Type, _Meta} = MsgsReq, Channel = #channel{session = Session}) when + Type =:= mqueue_msgs; Type =:= inflight_msgs +-> + {reply, emqx_session:info(MsgsReq, Session), Channel}; handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). diff --git a/apps/emqx/src/emqx_inflight.erl b/apps/emqx/src/emqx_inflight.erl index c342a846f1..c97e4f5593 100644 --- a/apps/emqx/src/emqx_inflight.erl +++ b/apps/emqx/src/emqx_inflight.erl @@ -36,7 +36,8 @@ max_size/1, is_full/1, is_empty/1, - window/1 + window/1, + query/2 ]). -export_type([inflight/0]). @@ -138,3 +139,41 @@ size(?INFLIGHT(Tree)) -> -spec max_size(inflight()) -> non_neg_integer(). max_size(?INFLIGHT(MaxSize, _Tree)) -> MaxSize. + +-spec query(inflight(), #{page := P, limit := L}) -> + {[{key(), term()}], #{page := P, limit := L, count := C, hasnext := boolean()}} +when + P :: pos_integer(), + L :: non_neg_integer(), + C :: non_neg_integer(). +query(?INFLIGHT(Tree), #{page := Page, limit := Limit} = Pager) -> + Start = Page * Limit - Limit + 1, + Count = gb_trees:size(Tree), + case Count < Start orelse Limit =:= 0 of + true -> + {[], Pager#{hasnext => false, count => Count}}; + false -> + HasNext = Start + Limit - 1 < Count, + TreeIter = gb_trees:iterator(Tree), + {sublist(TreeIter, Start, Limit), Pager#{hasnext => HasNext, count => Count}} + end. + +sublist(It, 1, L) when is_integer(L), L >= 0 -> + sublist(It, L); +sublist(It, S, L) when is_integer(S), S >= 2 -> + case gb_trees:next(It) of + none -> + []; + {_Key, _Val, ItNext} -> + sublist(ItNext, S - 1, L) + end. + +sublist(It, L) when L > 0 -> + case gb_trees:next(It) of + none -> + []; + {Key, Val, ItNext} -> + [{Key, Val} | sublist(ItNext, L - 1)] + end; +sublist(_, 0) -> + []. diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl index d085a196b4..81735a2208 100644 --- a/apps/emqx/src/emqx_mqueue.erl +++ b/apps/emqx/src/emqx_mqueue.erl @@ -68,7 +68,8 @@ stats/1, dropped/1, to_list/1, - filter/2 + filter/2, + query/2 ]). -define(NO_PRIORITY_TABLE, disabled). @@ -171,6 +172,43 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) -> MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff} end. +-spec query(mqueue(), #{page := P, limit := L}) -> + {[message()], #{page := P, limit := L, count := C, hasnext := boolean()}} +when + P :: pos_integer(), + L :: non_neg_integer(), + C :: non_neg_integer(). +query(MQ, #{page := Page, limit := Limit} = Pager) -> + Start = Page * Limit - Limit + 1, + Count = len(MQ), + case Count < Start orelse Limit =:= 0 of + true -> + {[], Pager#{hasnext => false, count => Count}}; + false -> + HasNext = Start + Limit - 1 < Count, + {sublist(MQ, Start, Limit), Pager#{hasnext => HasNext, count => Count}} + end. + +sublist(MQ, 1, L) when is_integer(L), L >= 0 -> + sublist(MQ, L); +sublist(MQ, S, L) when is_integer(S), S >= 2 -> + case out(MQ) of + {empty, _MQ} -> + []; + {{value, _Msg}, Q1} -> + sublist(Q1, S - 1, L) + end. + +sublist(MQ, L) when L > 0 -> + case out(MQ) of + {empty, _MQ} -> + []; + {{value, Msg}, Q1} -> + [Msg | sublist(Q1, L - 1)] + end; +sublist(_, 0) -> + []. + to_list(MQ, Acc) -> case out(MQ) of {empty, _MQ} -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index a84ed4d83e..de9af53882 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -527,7 +527,7 @@ info(Session) -> -spec info ([atom()], t()) -> [{atom(), _Value}]; - (atom(), t()) -> _Value. + (atom() | {atom(), _Meta}, t()) -> _Value. info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(impl, Session) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index e5e60583fc..dbb440f412 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) -> emqx_inflight:size(Inflight); info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); +info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) -> + {InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams), + {[I#inflight_data.message || {_, I} <- InflightList], Meta}; info(retry_interval, #session{retry_interval = Interval}) -> Interval; info(mqueue, #session{mqueue = MQueue}) -> @@ -278,6 +281,8 @@ info(mqueue_max, #session{mqueue = MQueue}) -> emqx_mqueue:max_len(MQueue); info(mqueue_dropped, #session{mqueue = MQueue}) -> emqx_mqueue:dropped(MQueue); +info({mqueue_msgs, PagerParams}, #session{mqueue = MQueue}) -> + emqx_mqueue:query(MQueue, PagerParams); info(next_pkt_id, #session{next_pkt_id = PacketId}) -> PacketId; info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) -> diff --git a/apps/emqx/test/emqx_inflight_SUITE.erl b/apps/emqx/test/emqx_inflight_SUITE.erl index c3b7ca6fc5..fa0bfc57f0 100644 --- a/apps/emqx/test/emqx_inflight_SUITE.erl +++ b/apps/emqx/test/emqx_inflight_SUITE.erl @@ -116,5 +116,67 @@ t_window(_) -> ), ?assertEqual([a, b], emqx_inflight:window(Inflight)). -% t_to_list(_) -> -% error('TODO'). +t_to_list(_) -> + Inflight = lists:foldl( + fun(Seq, InflightAcc) -> + emqx_inflight:insert(Seq, integer_to_binary(Seq), InflightAcc) + end, + emqx_inflight:new(100), + [1, 6, 2, 3, 10, 7, 9, 8, 4, 5] + ), + ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)], + ?assertEqual(ExpList, emqx_inflight:to_list(Inflight)). + +t_query(_) -> + Inflight = lists:foldl( + fun(Seq, QAcc) -> + emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc) + end, + emqx_inflight:new(500), + lists:reverse(lists:seq(1, 114)) + ), + + lists:foreach( + fun(PageSeq) -> + Limit = 10, + {Page, Meta} = emqx_inflight:query(Inflight, #{page => PageSeq, limit => Limit}), + ?assertEqual(10, length(Page)), + ExpFirst = PageSeq * Limit - Limit + 1, + ExpLast = PageSeq * Limit, + ?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)), + ?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)), + ?assertMatch( + #{page := PageSeq, limit := Limit, count := 114, hasnext := true}, + Meta + ) + end, + lists:seq(1, 11) + ), + {LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{page => 12, limit => 10}), + ?assertEqual(4, length(LastPartialPage)), + ?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)), + ?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)), + ?assertMatch(#{page := 12, limit := 10, count := 114, hasnext := false}, LastMeta), + + {LenExceedPage, LenExceedMeta} = emqx_inflight:query(Inflight, #{page => 13, limit => 10}), + ?assertEqual([], LenExceedPage), + ?assertMatch(#{hasnext := false}, LenExceedMeta), + + {LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{page => 1, limit => 1000}), + ?assertEqual(114, length(LargePage)), + ?assertEqual({1, <<"1">>}, hd(LargePage)), + ?assertEqual({114, <<"114">>}, lists:last(LargePage)), + ?assertMatch(#{hasnext := false}, LargeMeta), + + {FullPage, FullMeta} = emqx_inflight:query(Inflight, #{page => 1, limit => 114}), + ?assertEqual(114, length(FullPage)), + ?assertEqual({1, <<"1">>}, hd(FullPage)), + ?assertEqual({114, <<"114">>}, lists:last(FullPage)), + ?assertMatch(#{hasnext := false}, FullMeta), + + {EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{page => 1, limit => 0}), + ?assertEqual([], EmptyPage), + ?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 1}, EmptyMeta), + {EmptyExceedPage, EmptyExceedMeta} = emqx_inflight:query(Inflight, #{page => 200, limit => 0}), + ?assertEqual([], EmptyExceedPage), + ?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 200}, EmptyExceedMeta). diff --git a/apps/emqx/test/emqx_mqueue_SUITE.erl b/apps/emqx/test/emqx_mqueue_SUITE.erl index 51db4b98a8..b18eb47dd8 100644 --- a/apps/emqx/test/emqx_mqueue_SUITE.erl +++ b/apps/emqx/test/emqx_mqueue_SUITE.erl @@ -282,6 +282,62 @@ t_dropped(_) -> {Msg, Q2} = ?Q:in(Msg, Q1), ?assertEqual(1, ?Q:dropped(Q2)). +t_query(_) -> + Q = lists:foldl( + fun(Seq, QAcc) -> + Msg = emqx_message:make(<<"t">>, integer_to_binary(Seq)), + {_, QAcc1} = ?Q:in(Msg, QAcc), + QAcc1 + end, + ?Q:init(#{max_len => 500, store_qos0 => true}), + lists:seq(1, 114) + ), + + lists:foreach( + fun(PageSeq) -> + Limit = 10, + {Page, Meta} = ?Q:query(Q, #{page => PageSeq, limit => Limit}), + ?assertEqual(10, length(Page)), + ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), + ExpLastPayload = integer_to_binary(PageSeq * Limit), + ?assertEqual(ExpFirstPayload, emqx_message:payload(lists:nth(1, Page))), + ?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))), + ?assertMatch( + #{page := PageSeq, limit := Limit, count := 114, hasnext := true}, + Meta + ) + end, + lists:seq(1, 11) + ), + {LastPartialPage, LastMeta} = ?Q:query(Q, #{page => 12, limit => 10}), + ?assertEqual(4, length(LastPartialPage)), + ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))), + ?assertMatch(#{page := 12, limit := 10, count := 114, hasnext := false}, LastMeta), + + {LenExceedPage, LenExceedMeta} = ?Q:query(Q, #{page => 13, limit => 10}), + ?assertEqual([], LenExceedPage), + ?assertMatch(#{hasnext := false}, LenExceedMeta), + + {LargePage, LargeMeta} = ?Q:query(Q, #{page => 1, limit => 1000}), + ?assertEqual(114, length(LargePage)), + ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))), + ?assertMatch(#{hasnext := false}, LargeMeta), + + {FullPage, FullMeta} = ?Q:query(Q, #{page => 1, limit => 114}), + ?assertEqual(114, length(FullPage)), + ?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))), + ?assertMatch(#{hasnext := false}, FullMeta), + + {EmptyPage, EmptyMeta} = ?Q:query(Q, #{page => 1, limit => 0}), + ?assertEqual([], EmptyPage), + ?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 1}, EmptyMeta), + {EmptyExceedPage, EmptyExceedMeta} = ?Q:query(Q, #{page => 200, limit => 0}), + ?assertEqual([], EmptyExceedPage), + ?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 200}, EmptyExceedMeta). + conservation_prop() -> ?FORALL( {Priorities, Messages}, diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index a1ab0bc3f5..e470805d82 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -52,6 +52,7 @@ kickout_clients/1, list_authz_cache/1, list_client_subscriptions/1, + list_client_msgs/3, client_subscriptions/2, clean_authz_cache/1, clean_authz_cache/2, @@ -417,6 +418,12 @@ list_client_subscriptions_mem(ClientId) -> end end. +list_client_msgs(MsgsType, ClientId, PagerParams) when + MsgsType =:= inflight_msgs; + MsgsType =:= mqueue_msgs +-> + call_client(ClientId, {MsgsType, PagerParams}). + client_subscriptions(Node, ClientId) -> {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 298e5a2db4..87cecc2826 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -47,7 +47,9 @@ unsubscribe/2, unsubscribe_batch/2, set_keepalive/2, - sessions_count/2 + sessions_count/2, + inflight_msgs/2, + mqueue_msgs/2 ]). -export([ @@ -84,6 +86,9 @@ message => <<"Client ID not found">> }). +%% 1MB = 1024 x 1024 +-define(MAX_MSG_PAYLOAD_SIZE, 1048576). + namespace() -> undefined. api_spec() -> @@ -101,6 +106,8 @@ paths() -> "/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe/bulk", "/clients/:clientid/keepalive", + "/clients/:clientid/mqueue_messages", + "/clients/:clientid/inflight_messages", "/sessions_count" ]. @@ -391,6 +398,70 @@ schema("/clients/:clientid/keepalive") -> } } }; +schema("/clients/:clientid/mqueue_messages") -> + #{ + 'operationId' => mqueue_msgs, + get => #{ + description => ?DESC(get_client_mqueue_msgs), + tags => ?TAGS, + parameters => [ + {clientid, hoconsc:mk(binary(), #{in => path})}, + hoconsc:ref(emqx_dashboard_swagger, page), + hoconsc:ref(emqx_dashboard_swagger, limit) + ], + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example(?R_REF(messages), #{ + <<"data">> => [message_example()], + <<"meta">> => #{ + <<"count">> => 1, + <<"limit">> => 50, + <<"page">> => 1, + <<"hasnext">> => false + } + }), + 400 => + emqx_dashboard_swagger:error_codes( + ['INVALID_PARAMETER'], <<"Invalid parameters">> + ), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> + ) + } + } + }; +schema("/clients/:clientid/inflight_messages") -> + #{ + 'operationId' => inflight_msgs, + get => #{ + description => ?DESC(get_client_inflight_msgs), + tags => ?TAGS, + parameters => [ + {clientid, hoconsc:mk(binary(), #{in => path})}, + hoconsc:ref(emqx_dashboard_swagger, page), + hoconsc:ref(emqx_dashboard_swagger, limit) + ], + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example(?R_REF(messages), #{ + <<"data">> => [message_example()], + <<"meta">> => #{ + <<"count">> => 1, + <<"limit">> => 50, + <<"page">> => 1, + <<"hasnext">> => false + } + }), + 400 => + emqx_dashboard_swagger:error_codes( + ['INVALID_PARAMETER'], <<"Invalid parameters">> + ), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> + ) + } + } + }; schema("/sessions_count") -> #{ 'operationId' => sessions_count, @@ -621,6 +692,21 @@ fields(subscribe) -> fields(unsubscribe) -> [ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>, example => <<"testtopic/#">>})} + ]; +fields(messages) -> + [ + {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(msgs_list)})}, + {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta), #{})} + ]; +fields(message) -> + [ + {msgid, hoconsc:mk(binary(), #{desc => ?DESC(msg_id)})}, + {topic, hoconsc:mk(binary(), #{desc => ?DESC(msg_topic)})}, + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => ?DESC(msg_qos)})}, + {publish_at, hoconsc:mk(string(), #{desc => ?DESC(msg_publish_at)})}, + {from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})}, + {from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})}, + {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})} ]. %%%============================================================================================== @@ -693,6 +779,15 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> end end. +mqueue_msgs(get, #{bindings := #{clientid := ClientID}, query_string := QString}) -> + list_client_msgs(mqueue_msgs, ClientID, QString). + +inflight_msgs(get, #{ + bindings := #{clientid := ClientID}, + query_string := QString +}) -> + list_client_msgs(inflight_msgs, ClientID, QString). + %%%============================================================================================== %% api apply @@ -1021,6 +1116,19 @@ remove_live_sessions(Rows) -> Rows ). +list_client_msgs(MsgType, ClientID, QString) -> + case emqx_mgmt_api:parse_pager_params(QString) of + false -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + PagerParams = #{} -> + case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of + {error, not_found} -> + {404, ?CLIENTID_NOT_FOUND}; + {Msgs, Meta = #{}} when is_list(Msgs) -> + {200, #{meta => Meta, data => lists:map(fun format_msg/1, Msgs)}} + end + end. + %%-------------------------------------------------------------------- %% QueryString to Match Spec @@ -1181,6 +1289,31 @@ format_persistent_session_info(ClientId, PSInfo0) -> ), result_format_undefined_to_null(PSInfo). +format_msg(#message{ + id = ID, + qos = Qos, + topic = Topic, + from = From, + timestamp = Timestamp, + headers = Headers, + payload = Payload +}) -> + Msg = #{ + msgid => emqx_guid:to_hexstr(ID), + qos => Qos, + topic => Topic, + publish_at => + emqx_utils_calendar:epoch_to_rfc3339(Timestamp), + from_clientid => emqx_utils_conv:bin(From), + from_username => maps:get(username, Headers, <<>>) + }, + case erlang:byte_size(Payload) =< ?MAX_MSG_PAYLOAD_SIZE of + true -> + Msg#{payload => base64:encode(Payload)}; + _ -> + Msg + end. + %% format func helpers take_maps_from_inner(_Key, Value, Current) when is_map(Value) -> maps:merge(Current, Value); @@ -1282,6 +1415,17 @@ client_example() -> <<"recv_msg.qos0">> => 0 }. +message_example() -> + #{ + <<"msgid">> => <<"000611F460D57FA9F44500000D360002">>, + <<"topic">> => <<"t/test">>, + <<"qos">> => 0, + <<"publish_at">> => <<"2024-02-22T10:39:14.560+02:00">>, + <<"from_clientid">> => <<"mqttx_59ac0a87">>, + <<"from_username">> => <<"test-user">>, + <<"payload">> => <<"eyJmb28iOiAiYmFyIn0=">> + }. + sessions_count(get, #{query_string := QString}) -> Since = maps:get(<<"since">>, QString, 0), Count = emqx_cm_registry_keeper:count(Since), diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index f0da985af7..e973dcae2b 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -20,6 +20,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). all() -> AllTCs = emqx_common_test_helpers:all(?MODULE), @@ -41,10 +42,12 @@ persistent_session_testcases() -> ]. init_per_suite(Config) -> + ok = snabbkaffe:start_trace(), emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_) -> + ok = snabbkaffe:stop(), emqx_mgmt_api_test_util:end_suite(). init_per_group(persistent_sessions, Config) -> @@ -756,6 +759,162 @@ t_client_id_not_found(_Config) -> ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)), ?assertMatch( {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody]) + ), + %% Mqueue messages + ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))), + %% Inflight messages + ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))). + +t_mqueue_messages(_Config) -> + ClientId = "client_mqueue_msgs", + Topic = <<"t/test_mqueue_msgs">>, + Count = emqx_mgmt:default_row_limit(), + ok = client_with_mqueue(ClientId, Topic, Count), + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]), + ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])), + test_messages(Path, Topic, Count). + +t_inflight_messages(_Config) -> + ClientId = "client_inflight_msgs", + Topic = <<"t/test_inflight_msgs">>, + PubCount = emqx_mgmt:default_row_limit(), + ok = client_with_inflight(ClientId, Topic, PubCount), + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]), + InflightLimit = emqx:get_config([mqtt, max_inflight]), + test_messages(Path, Topic, InflightLimit). + +client_with_mqueue(ClientId, Topic, Count) -> + {ok, Client} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 120}} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + ok = emqtt:disconnect(Client), + publish_msgs(Topic, Count). + +client_with_inflight(ClientId, Topic, Count) -> + {ok, Client} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, true}, + {auto_ack, never} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + publish_msgs(Topic, Count). + +publish_msgs(Topic, Count) -> + lists:foreach( + fun(Seq) -> + emqx_broker:publish(emqx_message:make(undefined, ?QOS_1, Topic, integer_to_binary(Seq))) + end, + lists:seq(1, Count) + ). + +test_messages(Path, Topic, Count) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + DefaultLimit = emqx_mgmt:default_row_limit(), + {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path), + #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp), + ?assertEqual( + #{ + <<"page">> => 1, + <<"count">> => Count, + <<"limit">> => DefaultLimit, + <<"hasnext">> => false + }, + Meta + ), + ?assertEqual(length(Msgs), Count), + lists:foreach( + fun({Seq, #{<<"payload">> := P} = M}) -> + ?assertEqual(Seq, binary_to_integer(base64:decode(P))), + ?assertMatch( + #{ + <<"msgid">> := _, + <<"topic">> := Topic, + <<"qos">> := _, + <<"publish_at">> := _, + <<"from_clientid">> := _, + <<"from_username">> := _ + }, + M + ) + end, + lists:zip(lists:seq(1, Count), Msgs) + ), + + Limit = 19, + lists:foreach( + fun(PageSeq) -> + Qs = io_lib:format("page=~p&limit=~p", [PageSeq, Limit]), + {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader), + #{<<"meta">> := MetaP, <<"data">> := MsgsP} = emqx_utils_json:decode(MsgsRespP), + ?assertEqual( + #{ + <<"page">> => PageSeq, + <<"count">> => Count, + <<"limit">> => Limit, + <<"hasnext">> => true + }, + MetaP + ), + ?assertEqual(length(MsgsP), Limit), + ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), + ExpLastPayload = integer_to_binary(PageSeq * Limit), + ?assertEqual(ExpFirstPayload, base64:decode(maps:get(<<"payload">>, hd(MsgsP)))), + ?assertEqual(ExpLastPayload, base64:decode(maps:get(<<"payload">>, lists:last(MsgsP)))) + end, + lists:seq(1, Count div 19) + ), + LastPartialPage = Count div 19 + 1, + LastQs = io_lib:format("page=~p&limit=~p", [LastPartialPage, Limit]), + {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader), + #{<<"meta">> := MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(MsgsRespLastP), + ?assertEqual( + #{ + <<"page">> => LastPartialPage, + <<"count">> => Count, + <<"limit">> => Limit, + <<"hasnext">> => false + }, + MetaLastP + ), + + ?assertEqual( + integer_to_binary(LastPartialPage * Limit - Limit + 1), + base64:decode(maps:get(<<"payload">>, hd(MsgsLastP))) + ), + ?assertEqual( + integer_to_binary(Count), base64:decode(maps:get(<<"payload">>, lists:last(MsgsLastP))) + ), + + ExceedPage = LastPartialPage + 1, + ExceedQs = io_lib:format("page=~p&limit=~p", [ExceedPage, Limit]), + {ok, MsgsRespExceedP} = emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader), + #{<<"meta">> := #{<<"hasnext">> := false}, <<"data">> := []} = emqx_utils_json:decode( + MsgsRespExceedP + ), + + %% Invalid page params + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=0&page=1", AuthHeader) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=limit&page=page", AuthHeader) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=10&page=-1", AuthHeader) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=-5&page=5", AuthHeader) ). time_string_to_epoch_millisecond(DateTime) -> diff --git a/changes/ce/feat-12561.en.md b/changes/ce/feat-12561.en.md new file mode 100644 index 0000000000..7c5a597f6c --- /dev/null +++ b/changes/ce/feat-12561.en.md @@ -0,0 +1,3 @@ +Implement HTTP API to get the list of client's inflight and mqueue messages: + - GET /clients/{clientid}/mqueue_messages?page=1&limit=100 + - GET /clients/{clientid}/inflight_messages?page=1&limit=100 diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 2431c09ecb..b866dad0d1 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -35,6 +35,51 @@ get_client_subs.desc: get_client_subs.label: """Get client subscriptions""" +get_client_mqueue_msgs.desc: +"""Get client mqueue messages""" +get_client_mqueue_msgs.label: +"""Get client mqueue messages""" + +get_client_inflight_msgs.desc: +"""Get client inflight messages""" +get_client_inflight_msgs.label: +"""Get client inflight messages""" + +msgs_list.desc: +"""Client's inflight or mqueue messages list.""" +msgs_list.label: +"""Client's inflight or mqueue messages""" + +msg_id.desc: +"""Message ID.""" +msg_id.label: +"""Message ID""" + +msg_topic.desc: +"""Message topic.""" +msg_topic.label: +"""Message Topic""" + +msg_qos.desc: +"""Message QoS.""" +msg_topic.label: +"""Message Qos""" + +msg_publish_at.desc: +"""Message publish time, RFC 3339 format.""" +msg_publish_at.label: +"""Message Publish Time.""" + +msg_from_clientid.desc: +"""Message publisher's client ID.""" +msg_from_clientid.desc: +"""Message publisher's Client ID""" + +msg_from_username.desc: +"""Message publisher's username.""" +msg_from_username.label: +"""Message Publisher's Username """ + subscribe.desc: """Subscribe""" subscribe.label: