Skip to content

Commit

Permalink
feat: add client mqueue/inflight messages API
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeTupchiy committed Feb 23, 2024
1 parent 5af01c0 commit 60000af
Show file tree
Hide file tree
Showing 12 changed files with 568 additions and 6 deletions.
4 changes: 4 additions & 0 deletions apps/emqx/src/emqx_channel.erl
Expand Up @@ -1198,6 +1198,10 @@ handle_call(
ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(keepalive, NChannel));
handle_call({Type, _Meta} = MsgsReq, Channel = #channel{session = Session}) when
Type =:= mqueue_msgs; Type =:= inflight_msgs
->
{reply, emqx_session:info(MsgsReq, Session), Channel};
handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel).
Expand Down
41 changes: 40 additions & 1 deletion apps/emqx/src/emqx_inflight.erl
Expand Up @@ -36,7 +36,8 @@
max_size/1,
is_full/1,
is_empty/1,
window/1
window/1,
query/2
]).

-export_type([inflight/0]).
Expand Down Expand Up @@ -138,3 +139,41 @@ size(?INFLIGHT(Tree)) ->
-spec max_size(inflight()) -> non_neg_integer().
max_size(?INFLIGHT(MaxSize, _Tree)) ->
MaxSize.

-spec query(inflight(), #{page := P, limit := L}) ->
{[{key(), term()}], #{page := P, limit := L, count := C, hasnext := boolean()}}
when
P :: pos_integer(),
L :: non_neg_integer(),
C :: non_neg_integer().
query(?INFLIGHT(Tree), #{page := Page, limit := Limit} = Pager) ->
Start = Page * Limit - Limit + 1,
Count = gb_trees:size(Tree),
case Count < Start orelse Limit =:= 0 of
true ->
{[], Pager#{hasnext => false, count => Count}};
false ->
HasNext = Start + Limit - 1 < Count,
TreeIter = gb_trees:iterator(Tree),
{sublist(TreeIter, Start, Limit), Pager#{hasnext => HasNext, count => Count}}
end.

sublist(It, 1, L) when is_integer(L), L >= 0 ->
sublist(It, L);
sublist(It, S, L) when is_integer(S), S >= 2 ->
case gb_trees:next(It) of
none ->
[];
{_Key, _Val, ItNext} ->
sublist(ItNext, S - 1, L)
end.

sublist(It, L) when L > 0 ->
case gb_trees:next(It) of
none ->
[];
{Key, Val, ItNext} ->
[{Key, Val} | sublist(ItNext, L - 1)]
end;
sublist(_, 0) ->
[].
40 changes: 39 additions & 1 deletion apps/emqx/src/emqx_mqueue.erl
Expand Up @@ -68,7 +68,8 @@
stats/1,
dropped/1,
to_list/1,
filter/2
filter/2,
query/2
]).

-define(NO_PRIORITY_TABLE, disabled).
Expand Down Expand Up @@ -171,6 +172,43 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
end.

-spec query(mqueue(), #{page := P, limit := L}) ->
{[message()], #{page := P, limit := L, count := C, hasnext := boolean()}}
when
P :: pos_integer(),
L :: non_neg_integer(),
C :: non_neg_integer().
query(MQ, #{page := Page, limit := Limit} = Pager) ->
Start = Page * Limit - Limit + 1,
Count = len(MQ),
case Count < Start orelse Limit =:= 0 of
true ->
{[], Pager#{hasnext => false, count => Count}};
false ->
HasNext = Start + Limit - 1 < Count,
{sublist(MQ, Start, Limit), Pager#{hasnext => HasNext, count => Count}}
end.

sublist(MQ, 1, L) when is_integer(L), L >= 0 ->
sublist(MQ, L);
sublist(MQ, S, L) when is_integer(S), S >= 2 ->
case out(MQ) of
{empty, _MQ} ->
[];
{{value, _Msg}, Q1} ->
sublist(Q1, S - 1, L)
end.

sublist(MQ, L) when L > 0 ->
case out(MQ) of
{empty, _MQ} ->
[];
{{value, Msg}, Q1} ->
[Msg | sublist(Q1, L - 1)]
end;
sublist(_, 0) ->
[].

to_list(MQ, Acc) ->
case out(MQ) of
{empty, _MQ} ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_session.erl
Expand Up @@ -527,7 +527,7 @@ info(Session) ->

-spec info
([atom()], t()) -> [{atom(), _Value}];
(atom(), t()) -> _Value.
(atom() | {atom(), _Meta}, t()) -> _Value.
info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys];
info(impl, Session) ->
Expand Down
5 changes: 5 additions & 0 deletions apps/emqx/src/emqx_session_mem.erl
Expand Up @@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
emqx_inflight:size(Inflight);
info(inflight_max, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) ->
{InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams),
{[I#inflight_data.message || {_, I} <- InflightList], Meta};
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
info(mqueue, #session{mqueue = MQueue}) ->
Expand All @@ -278,6 +281,8 @@ info(mqueue_max, #session{mqueue = MQueue}) ->
emqx_mqueue:max_len(MQueue);
info(mqueue_dropped, #session{mqueue = MQueue}) ->
emqx_mqueue:dropped(MQueue);
info({mqueue_msgs, PagerParams}, #session{mqueue = MQueue}) ->
emqx_mqueue:query(MQueue, PagerParams);
info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
PacketId;
info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
Expand Down
66 changes: 64 additions & 2 deletions apps/emqx/test/emqx_inflight_SUITE.erl
Expand Up @@ -116,5 +116,67 @@ t_window(_) ->
),
?assertEqual([a, b], emqx_inflight:window(Inflight)).

% t_to_list(_) ->
% error('TODO').
t_to_list(_) ->
Inflight = lists:foldl(
fun(Seq, InflightAcc) ->
emqx_inflight:insert(Seq, integer_to_binary(Seq), InflightAcc)
end,
emqx_inflight:new(100),
[1, 6, 2, 3, 10, 7, 9, 8, 4, 5]
),
ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)],
?assertEqual(ExpList, emqx_inflight:to_list(Inflight)).

t_query(_) ->
Inflight = lists:foldl(
fun(Seq, QAcc) ->
emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc)
end,
emqx_inflight:new(500),
lists:reverse(lists:seq(1, 114))
),

lists:foreach(
fun(PageSeq) ->
Limit = 10,
{Page, Meta} = emqx_inflight:query(Inflight, #{page => PageSeq, limit => Limit}),
?assertEqual(10, length(Page)),
ExpFirst = PageSeq * Limit - Limit + 1,
ExpLast = PageSeq * Limit,
?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)),
?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)),
?assertMatch(
#{page := PageSeq, limit := Limit, count := 114, hasnext := true},
Meta
)
end,
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{page => 12, limit => 10}),
?assertEqual(4, length(LastPartialPage)),
?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)),
?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)),
?assertMatch(#{page := 12, limit := 10, count := 114, hasnext := false}, LastMeta),

{LenExceedPage, LenExceedMeta} = emqx_inflight:query(Inflight, #{page => 13, limit => 10}),
?assertEqual([], LenExceedPage),
?assertMatch(#{hasnext := false}, LenExceedMeta),

{LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{page => 1, limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual({1, <<"1">>}, hd(LargePage)),
?assertEqual({114, <<"114">>}, lists:last(LargePage)),
?assertMatch(#{hasnext := false}, LargeMeta),

{FullPage, FullMeta} = emqx_inflight:query(Inflight, #{page => 1, limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual({1, <<"1">>}, hd(FullPage)),
?assertEqual({114, <<"114">>}, lists:last(FullPage)),
?assertMatch(#{hasnext := false}, FullMeta),

{EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{page => 1, limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 1}, EmptyMeta),
{EmptyExceedPage, EmptyExceedMeta} = emqx_inflight:query(Inflight, #{page => 200, limit => 0}),
?assertEqual([], EmptyExceedPage),
?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 200}, EmptyExceedMeta).
56 changes: 56 additions & 0 deletions apps/emqx/test/emqx_mqueue_SUITE.erl
Expand Up @@ -282,6 +282,62 @@ t_dropped(_) ->
{Msg, Q2} = ?Q:in(Msg, Q1),
?assertEqual(1, ?Q:dropped(Q2)).

t_query(_) ->
Q = lists:foldl(
fun(Seq, QAcc) ->
Msg = emqx_message:make(<<"t">>, integer_to_binary(Seq)),
{_, QAcc1} = ?Q:in(Msg, QAcc),
QAcc1
end,
?Q:init(#{max_len => 500, store_qos0 => true}),
lists:seq(1, 114)
),

lists:foreach(
fun(PageSeq) ->
Limit = 10,
{Page, Meta} = ?Q:query(Q, #{page => PageSeq, limit => Limit}),
?assertEqual(10, length(Page)),
ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
ExpLastPayload = integer_to_binary(PageSeq * Limit),
?assertEqual(ExpFirstPayload, emqx_message:payload(lists:nth(1, Page))),
?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))),
?assertMatch(
#{page := PageSeq, limit := Limit, count := 114, hasnext := true},
Meta
)
end,
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = ?Q:query(Q, #{page => 12, limit => 10}),
?assertEqual(4, length(LastPartialPage)),
?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))),
?assertMatch(#{page := 12, limit := 10, count := 114, hasnext := false}, LastMeta),

{LenExceedPage, LenExceedMeta} = ?Q:query(Q, #{page => 13, limit => 10}),
?assertEqual([], LenExceedPage),
?assertMatch(#{hasnext := false}, LenExceedMeta),

{LargePage, LargeMeta} = ?Q:query(Q, #{page => 1, limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
?assertMatch(#{hasnext := false}, LargeMeta),

{FullPage, FullMeta} = ?Q:query(Q, #{page => 1, limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))),
?assertMatch(#{hasnext := false}, FullMeta),

{EmptyPage, EmptyMeta} = ?Q:query(Q, #{page => 1, limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 1}, EmptyMeta),
{EmptyExceedPage, EmptyExceedMeta} = ?Q:query(Q, #{page => 200, limit => 0}),
?assertEqual([], EmptyExceedPage),
?assertMatch(#{hasnext := false, count := 114, limit := 0, page := 200}, EmptyExceedMeta).

conservation_prop() ->
?FORALL(
{Priorities, Messages},
Expand Down
7 changes: 7 additions & 0 deletions apps/emqx_management/src/emqx_mgmt.erl
Expand Up @@ -52,6 +52,7 @@
kickout_clients/1,
list_authz_cache/1,
list_client_subscriptions/1,
list_client_msgs/3,
client_subscriptions/2,
clean_authz_cache/1,
clean_authz_cache/2,
Expand Down Expand Up @@ -417,6 +418,12 @@ list_client_subscriptions_mem(ClientId) ->
end
end.

list_client_msgs(MsgsType, ClientId, PagerParams) when
MsgsType =:= inflight_msgs;
MsgsType =:= mqueue_msgs
->
call_client(ClientId, {MsgsType, PagerParams}).

client_subscriptions(Node, ClientId) ->
{Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.

Expand Down

0 comments on commit 60000af

Please sign in to comment.