Skip to content

Commit

Permalink
feat: add client mqueue/inflight messages API
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeTupchiy committed Mar 6, 2024
1 parent df3ebc5 commit 5606257
Show file tree
Hide file tree
Showing 14 changed files with 854 additions and 11 deletions.
4 changes: 4 additions & 0 deletions apps/emqx/src/emqx_channel.erl
Expand Up @@ -1210,6 +1210,10 @@ handle_call(
ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(keepalive, NChannel));
handle_call({Type, _Meta} = MsgsReq, Channel = #channel{session = Session}) when
Type =:= mqueue_msgs; Type =:= inflight_msgs
->
{reply, emqx_session:info(MsgsReq, Session), Channel};
handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel).
Expand Down
47 changes: 46 additions & 1 deletion apps/emqx/src/emqx_inflight.erl
Expand Up @@ -36,7 +36,8 @@
max_size/1,
is_full/1,
is_empty/1,
window/1
window/1,
query/2
]).

-export_type([inflight/0]).
Expand Down Expand Up @@ -138,3 +139,47 @@ size(?INFLIGHT(Tree)) ->
-spec max_size(inflight()) -> non_neg_integer().
max_size(?INFLIGHT(MaxSize, _Tree)) ->
MaxSize.

-spec query(inflight(), #{'after' => After, limit := L}) ->
{[{key(), term()}], #{'after' := After, count := C}}
when
After :: none | end_of_data | key(),
L :: non_neg_integer(),
C :: non_neg_integer().
query(?INFLIGHT(Tree), #{limit := Limit} = Pager) ->
Count = gb_trees:size(Tree),
AfterKey = maps:get('after', Pager, none),
{List, NextAfter} = sublist(iterator_from(AfterKey, Tree), Limit),
{List, #{'after' => NextAfter, count => Count}}.

iterator_from(none, Tree) ->
gb_trees:iterator(Tree);
iterator_from(AfterKey, Tree) ->
It = gb_trees:iterator_from(AfterKey, Tree),
case gb_trees:next(It) of
{AfterKey, _Val, ItNext} -> ItNext;
_ -> It
end.

sublist(_It, 0) ->
{[], none};
sublist(It, Len) ->
{ListAcc, HasNext} = sublist(It, Len, []),
{lists:reverse(ListAcc), next_after(ListAcc, HasNext)}.

sublist(It, 0, Acc) ->
{Acc, gb_trees:next(It) =/= none};
sublist(It, Len, Acc) ->
case gb_trees:next(It) of
none ->
{Acc, false};
{Key, Val, ItNext} ->
sublist(ItNext, Len - 1, [{Key, Val} | Acc])
end.

next_after(_Acc, false) ->
end_of_data;
next_after([{LastKey, _LastVal} | _Acc], _HasNext) ->
LastKey;
next_after([], _HasNext) ->
end_of_data.
52 changes: 51 additions & 1 deletion apps/emqx/src/emqx_mqueue.erl
Expand Up @@ -68,7 +68,8 @@
stats/1,
dropped/1,
to_list/1,
filter/2
filter/2,
query/2
]).

-define(NO_PRIORITY_TABLE, disabled).
Expand Down Expand Up @@ -171,6 +172,55 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
end.

-spec query(mqueue(), #{'after' => AfterMsgId, limit := L}) ->
{[message()], #{'after' := AfterMsgId, count := C}}
when
AfterMsgId :: none | end_of_data | binary(),
C :: non_neg_integer(),
L :: non_neg_integer().
query(MQ, #{limit := Limit} = Pager) ->
AfterMsgId = maps:get('after', Pager, none),
{List, NextAfter} = sublist(skip_until(MQ, AfterMsgId), Limit),
{List, #{'after' => NextAfter, count => len(MQ)}}.

skip_until(MQ, none = _MsgId) ->
MQ;
skip_until(MQ, MsgId) ->
do_skip_until(MQ, MsgId).

do_skip_until(MQ, MsgId) ->
case out(MQ) of
{empty, MQ} ->
MQ;
{{value, #message{id = MsgId}}, Q1} ->
Q1;
{{value, _Msg}, Q1} ->
do_skip_until(Q1, MsgId)
end.

sublist(_MQ, 0) ->
{[], none};
sublist(MQ, Len) ->
{ListAcc, HasNext} = sublist(MQ, Len, []),
{lists:reverse(ListAcc), next_after(ListAcc, HasNext)}.

sublist(MQ, 0, Acc) ->
{Acc, element(1, out(MQ)) =/= empty};
sublist(MQ, Len, Acc) ->
case out(MQ) of
{empty, _MQ} ->
{Acc, false};
{{value, Msg}, Q1} ->
sublist(Q1, Len - 1, [Msg | Acc])
end.

next_after(_Acc, false) ->
end_of_data;
next_after([#message{id = Id} | _Acc], _HasNext) ->
Id;
next_after([], _HasNext) ->
end_of_data.

to_list(MQ, Acc) ->
case out(MQ) of
{empty, _MQ} ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_session.erl
Expand Up @@ -527,7 +527,7 @@ info(Session) ->

-spec info
([atom()], t()) -> [{atom(), _Value}];
(atom(), t()) -> _Value.
(atom() | {atom(), _Meta}, t()) -> _Value.
info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys];
info(impl, Session) ->
Expand Down
5 changes: 5 additions & 0 deletions apps/emqx/src/emqx_session_mem.erl
Expand Up @@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
emqx_inflight:size(Inflight);
info(inflight_max, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) ->
{InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams),
{[I#inflight_data.message || {_, I} <- InflightList], Meta};
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
info(mqueue, #session{mqueue = MQueue}) ->
Expand All @@ -278,6 +281,8 @@ info(mqueue_max, #session{mqueue = MQueue}) ->
emqx_mqueue:max_len(MQueue);
info(mqueue_dropped, #session{mqueue = MQueue}) ->
emqx_mqueue:dropped(MQueue);
info({mqueue_msgs, PagerParams}, #session{mqueue = MQueue}) ->
emqx_mqueue:query(MQueue, PagerParams);
info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
PacketId;
info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
Expand Down
82 changes: 80 additions & 2 deletions apps/emqx/test/emqx_inflight_SUITE.erl
Expand Up @@ -116,5 +116,83 @@ t_window(_) ->
),
?assertEqual([a, b], emqx_inflight:window(Inflight)).

% t_to_list(_) ->
% error('TODO').
t_to_list(_) ->
Inflight = lists:foldl(
fun(Seq, InflightAcc) ->
emqx_inflight:insert(Seq, integer_to_binary(Seq), InflightAcc)
end,
emqx_inflight:new(100),
[1, 6, 2, 3, 10, 7, 9, 8, 4, 5]
),
ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)],
?assertEqual(ExpList, emqx_inflight:to_list(Inflight)).

t_query(_) ->
EmptyInflight = emqx_inflight:new(500),
?assertMatch(
{[], #{'after' := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50})
),
?assertMatch(
{[], #{'after' := end_of_data}},
emqx_inflight:query(EmptyInflight, #{'after' => <<"empty">>, limit => 50})
),
?assertMatch(
{[], #{'after' := end_of_data}},
emqx_inflight:query(EmptyInflight, #{'after' => none, limit => 50})
),

Inflight = lists:foldl(
fun(Seq, QAcc) ->
emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc)
end,
EmptyInflight,
lists:reverse(lists:seq(1, 114))
),

LastAfter = lists:foldl(
fun(PageSeq, After) ->
Limit = 10,
PagerParams = #{'after' => After, limit => Limit},
{Page, #{'after' := NextAfter} = Meta} = emqx_inflight:query(Inflight, PagerParams),
?assertEqual(10, length(Page)),
ExpFirst = PageSeq * Limit - Limit + 1,
ExpLast = PageSeq * Limit,
?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)),
?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)),
?assertMatch(
#{count := 114, 'after' := IntAfter} when is_integer(IntAfter),
Meta
),
NextAfter
end,
none,
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{
'after' => LastAfter, limit => 10
}),
?assertEqual(4, length(LastPartialPage)),
?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)),
?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)),
?assertMatch(#{'after' := end_of_data, count := 114}, LastMeta),

?assertMatch(
{[], #{'after' := end_of_data}},
emqx_inflight:query(Inflight, #{'after' => <<"not-existing-cont-id">>, limit => 10})
),

{LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual({1, <<"1">>}, hd(LargePage)),
?assertEqual({114, <<"114">>}, lists:last(LargePage)),
?assertMatch(#{'after' := end_of_data}, LargeMeta),

{FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual({1, <<"1">>}, hd(FullPage)),
?assertEqual({114, <<"114">>}, lists:last(FullPage)),
?assertMatch(#{'after' := end_of_data}, FullMeta),

{EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{'after' := none, count := 114}, EmptyMeta).
68 changes: 68 additions & 0 deletions apps/emqx/test/emqx_mqueue_SUITE.erl
Expand Up @@ -282,6 +282,74 @@ t_dropped(_) ->
{Msg, Q2} = ?Q:in(Msg, Q1),
?assertEqual(1, ?Q:dropped(Q2)).

t_query(_) ->
EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}),
?assertMatch({[], #{'after' := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})),
?assertMatch(
{[], #{'after' := end_of_data}},
?Q:query(EmptyQ, #{'after' => <<"empty">>, limit => 50})
),
?assertMatch(
{[], #{'after' := end_of_data}}, ?Q:query(EmptyQ, #{'after' => none, limit => 50})
),

Q = lists:foldl(
fun(Seq, QAcc) ->
Msg = emqx_message:make(<<"t">>, integer_to_binary(Seq)),
{_, QAcc1} = ?Q:in(Msg, QAcc),
QAcc1
end,
EmptyQ,
lists:seq(1, 114)
),

LastAfter = lists:foldl(
fun(PageSeq, After) ->
Limit = 10,
PagerParams = #{'after' => After, limit => Limit},
{Page, #{'after' := NextAfter} = Meta} = ?Q:query(Q, PagerParams),
?assertEqual(10, length(Page)),
ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
ExpLastPayload = integer_to_binary(PageSeq * Limit),
?assertEqual(
ExpFirstPayload,
emqx_message:payload(lists:nth(1, Page)),
#{page_seq => PageSeq, page => Page, meta => Meta}
),
?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))),
?assertMatch(#{count := 114, 'after' := <<_/binary>>}, Meta),
NextAfter
end,
none,
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = ?Q:query(Q, #{'after' => LastAfter, limit => 10}),
?assertEqual(4, length(LastPartialPage)),
?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))),
?assertMatch(#{'after' := end_of_data, count := 114}, LastMeta),

?assertMatch(
{[], #{'after' := end_of_data}},
?Q:query(Q, #{'after' => <<"not-existing-cont-id">>, limit => 10})
),

{LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
?assertMatch(#{'after' := end_of_data}, LargeMeta),

{FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))),
?assertMatch(#{'after' := end_of_data}, FullMeta),

{EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{'after' := none, count := 114}, EmptyMeta).

conservation_prop() ->
?FORALL(
{Priorities, Messages},
Expand Down
18 changes: 17 additions & 1 deletion apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
Expand Up @@ -178,8 +178,24 @@ fields(hasnext) ->
>>,
Meta = #{desc => Desc, required => true},
[{hasnext, hoconsc:mk(boolean(), Meta)}];
fields('after') ->
Desc = <<
"Opaque token returned in the previous response that can then be used"
" in subsequent requests to get the next chunk of results.<br/>"
"It is used instead of \"page\" parameter to traverse volatile data.<br/>"
"Can be omitted or set to \"none\" to get the first chunk of data.<br/>"
"\"end_of_data\" is returned, if there is no more data.<br/>"
"Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
" error response."
>>,
Meta = #{
in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">>
},
[{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
fields(meta) ->
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext).
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext);
fields(after_meta) ->
fields('after') ++ fields(count).

-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema().
schema_with_example(Type, Example) ->
Expand Down
7 changes: 7 additions & 0 deletions apps/emqx_management/src/emqx_mgmt.erl
Expand Up @@ -52,6 +52,7 @@
kickout_clients/1,
list_authz_cache/1,
list_client_subscriptions/1,
list_client_msgs/3,
client_subscriptions/2,
clean_authz_cache/1,
clean_authz_cache/2,
Expand Down Expand Up @@ -417,6 +418,12 @@ list_client_subscriptions_mem(ClientId) ->
end
end.

list_client_msgs(MsgsType, ClientId, PagerParams) when
MsgsType =:= inflight_msgs;
MsgsType =:= mqueue_msgs
->
call_client(ClientId, {MsgsType, PagerParams}).

client_subscriptions(Node, ClientId) ->
{Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.

Expand Down

0 comments on commit 5606257

Please sign in to comment.