Skip to content

Commit

Permalink
feat(ds): list disconnected persistent sessions in clients API
Browse files Browse the repository at this point in the history
Fixes https://emqx.atlassian.net/browse/EMQX-11540

Note that not all information provided by disconnected in-memory sessions is available to
disconnected persistent sessions, nor does all of them make sense.
  • Loading branch information
thalesmg committed Feb 20, 2024
1 parent 0515c55 commit 3db3fd6
Show file tree
Hide file tree
Showing 6 changed files with 683 additions and 34 deletions.
32 changes: 23 additions & 9 deletions apps/emqx/src/emqx_persistent_session_ds_state.erl
Expand Up @@ -36,10 +36,16 @@
-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
-export([get_subscriptions/1, put_subscription/4, del_subscription/3]).

-export([make_session_iterator/0, session_iterator_next/2]).
-export([make_session_iterator/0, session_iterator_next/2, session_count/0]).

-export_type([
t/0, metadata/0, subscriptions/0, seqno_type/0, stream_key/0, rank_key/0, session_iterator/0
t/0,
metadata/0,
subscriptions/0,
seqno_type/0,
stream_key/0,
rank_key/0,
session_iterator/0
]).

-include("emqx_mqtt.hrl").
Expand All @@ -53,7 +59,7 @@

-type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()).

-opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
-type session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.

%% Generic key-value wrapper that is used for exporting arbitrary
%% terms to mnesia:
Expand Down Expand Up @@ -359,14 +365,15 @@ del_rank(Key, Rec) ->
fold_ranks(Fun, Acc, Rec) ->
gen_fold(ranks, Fun, Acc, Rec).

-spec session_count() -> non_neg_integer().
session_count() ->
%% N.B.: this is potentially costly. Should not be called in hot paths.
%% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse...
do_session_count(make_session_iterator(), 0).

-spec make_session_iterator() -> session_iterator().
make_session_iterator() ->
case mnesia:dirty_first(?session_tab) of
'$end_of_table' ->
'$end_of_table';
Key ->
Key
end.
mnesia:dirty_first(?session_tab).

-spec session_iterator_next(session_iterator(), pos_integer()) ->
{[{emqx_persistent_session_ds:id(), metadata()}], session_iterator()}.
Expand Down Expand Up @@ -564,6 +571,13 @@ ro_transaction(Fun) ->
%% {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
%% Res.

%%

do_session_count('$end_of_table', N) ->
N;
do_session_count(Cursor, N) ->
do_session_count(mnesia:dirty_next(?session_tab, Cursor), N + 1).

-compile({inline, check_sequence/1}).

-ifdef(CHECK_SEQNO).
Expand Down
27 changes: 23 additions & 4 deletions apps/emqx_management/src/emqx_mgmt.erl
Expand Up @@ -66,6 +66,9 @@
do_kickout_clients/1
]).

%% Internal exports
-export([lookup_running_client/2]).

%% Internal functions
-export([do_call_client/2]).

Expand Down Expand Up @@ -314,10 +317,16 @@ nodes_info_count(PropList) ->
%%--------------------------------------------------------------------

lookup_client({clientid, ClientId}, FormatFun) ->
lists:append([
lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- emqx:running_nodes()
]);
IsPersistenceEnabled = emqx_persistent_message:is_persistence_enabled(),
case lookup_running_client(ClientId, FormatFun) of
[] when IsPersistenceEnabled ->
case emqx_persistent_session_ds_state:print_session(ClientId) of
undefined -> [];
Session -> [maybe_format(FormatFun, {ClientId, Session})]
end;
Res ->
Res
end;
lookup_client({username, Username}, FormatFun) ->
lists:append([
lookup_client(Node, {username, Username}, FormatFun)
Expand Down Expand Up @@ -633,6 +642,16 @@ create_banned(Banned) ->
delete_banned(Who) ->
emqx_banned:delete(Who).

%%--------------------------------------------------------------------
%% Internal exports
%%--------------------------------------------------------------------

lookup_running_client(ClientId, FormatFun) ->
lists:append([
lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- emqx:running_nodes()
]).

%%--------------------------------------------------------------------
%% Internal Functions.
%%--------------------------------------------------------------------
Expand Down
8 changes: 7 additions & 1 deletion apps/emqx_management/src/emqx_mgmt_api.erl
Expand Up @@ -39,7 +39,13 @@
parse_pager_params/1,
parse_qstring/2,
init_query_result/0,
accumulate_query_rows/4
init_query_state/5,
reset_query_state/1,
accumulate_query_rows/4,
finalize_query/2,
mark_complete/2,
format_query_result/3,
maybe_collect_total_from_tail_nodes/2
]).

-ifdef(TEST).
Expand Down
209 changes: 190 additions & 19 deletions apps/emqx_management/src/emqx_mgmt_api_clients.erl
Expand Up @@ -698,26 +698,13 @@ list_clients(QString) ->
case maps:get(<<"node">>, QString, undefined) of
undefined ->
Options = #{fast_total_counting => true},
emqx_mgmt_api:cluster_query(
?CHAN_INFO_TAB,
QString,
?CLIENT_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2,
Options
);
list_clients_cluster_query(QString, Options);
Node0 ->
case emqx_utils:safe_to_existing_atom(Node0) of
{ok, Node1} ->
QStringWithoutNode = maps:without([<<"node">>], QString),
emqx_mgmt_api:node_query(
Node1,
?CHAN_INFO_TAB,
QStringWithoutNode,
?CLIENT_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2
);
QStringWithoutNode = maps:remove(<<"node">>, QString),
Options = #{},
list_clients_node_query(Node1, QStringWithoutNode, Options);
{error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}}
end
Expand Down Expand Up @@ -851,6 +838,164 @@ do_unsubscribe(ClientID, Topic) ->
Res
end.

list_clients_cluster_query(QString, Options) ->
case emqx_mgmt_api:parse_pager_params(QString) of
false ->
{error, page_limit_invalid};
Meta = #{} ->
try
{_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA),
Nodes = emqx:running_nodes(),
ResultAcc = emqx_mgmt_api:init_query_result(),
QueryState = emqx_mgmt_api:init_query_state(
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
),
Res = do_list_clients_cluster_query(Nodes, QueryState, ResultAcc),
emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
catch
throw:{bad_value_type, {Key, ExpectedType, AcutalValue}} ->
{error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}}
end
end.

%% adapted from `emqx_mgmt_api:do_cluster_query'
do_list_clients_cluster_query(
[Node | Tail] = Nodes,
QueryState0,
ResultAcc
) ->
case emqx_mgmt_api:do_query(Node, QueryState0) of
{error, Error} ->
{error, Node, Error};
{Rows, QueryState1 = #{complete := Complete0}} ->
case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of
{enough, NResultAcc} ->
%% TODO: add persistent session count?
%% TODO: this may return `{error, _, _}'...
QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes(
Tail, QueryState1
),
QueryState = add_persistent_session_count(QueryState2),
Complete = Complete0 andalso Tail =:= [] andalso no_persistent_sessions(),
emqx_mgmt_api:finalize_query(
NResultAcc, emqx_mgmt_api:mark_complete(QueryState, Complete)
);
{more, NResultAcc} when not Complete0 ->
do_list_clients_cluster_query(Nodes, QueryState1, NResultAcc);
{more, NResultAcc} when Tail =/= [] ->
do_list_clients_cluster_query(
Tail, emqx_mgmt_api:reset_query_state(QueryState1), NResultAcc
);
{more, NResultAcc} ->
QueryState = add_persistent_session_count(QueryState1),
do_persistent_session_query(NResultAcc, QueryState)
end
end.

list_clients_node_query(Node, QString, Options) ->
case emqx_mgmt_api:parse_pager_params(QString) of
false ->
{error, page_limit_invalid};
Meta = #{} ->
{_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA),
ResultAcc = emqx_mgmt_api:init_query_result(),
QueryState = emqx_mgmt_api:init_query_state(
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
),
Res = do_list_clients_node_query(Node, QueryState, ResultAcc),
emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
end.

add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
%% TODO: currently, counting persistent sessions can be not only costly (needs
%% to traverse the whole table), but also hard to deduplicate live connections
%% from it... So this count will possibly overshoot the true count of
%% sessions.
SessionCount = emqx_persistent_session_ds_state:session_count(),
Totals = Totals0#{undefined => SessionCount},
QueryState0#{total := Totals};
false ->
QueryState0
end;
add_persistent_session_count(QueryState) ->
QueryState.

%% adapted from `emqx_mgmt_api:do_node_query'
do_list_clients_node_query(
Node,
QueryState,
ResultAcc
) ->
case emqx_mgmt_api:do_query(Node, QueryState) of
{error, Error} ->
{error, Node, Error};
{Rows, NQueryState = #{complete := Complete}} ->
case emqx_mgmt_api:accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} ->
FComplete = Complete andalso no_persistent_sessions(),
emqx_mgmt_api:finalize_query(
NResultAcc, emqx_mgmt_api:mark_complete(NQueryState, FComplete)
);
{more, NResultAcc} when Complete ->
do_persistent_session_query(NResultAcc, NQueryState);
{more, NResultAcc} ->
do_list_clients_node_query(Node, NQueryState, NResultAcc)
end
end.

init_persistent_session_iterator() ->
emqx_persistent_session_ds_state:make_session_iterator().

no_persistent_sessions() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
'$end_of_table' =:= init_persistent_session_iterator();
false ->
false
end.

do_persistent_session_query(ResultAcc, QueryState) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
do_persistent_session_query1(
ResultAcc,
QueryState,
init_persistent_session_iterator()
);
false ->
emqx_mgmt_api:finalize_query(ResultAcc, QueryState)
end.

do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
%% Since persistent session data is accessible from all nodes, there's no need to go
%% through all the nodes.
#{limit := Limit} = QueryState,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
Rows = remove_live_sessions(Rows0),
case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
{enough, NResultAcc} ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
{more, NResultAcc} when Iter =:= '$end_of_table' ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
{more, NResultAcc} ->
do_persistent_session_query1(NResultAcc, QueryState, Iter)
end.

remove_live_sessions(Rows) ->
lists:filtermap(
fun({ClientId, _Session}) ->
case emqx_mgmt:lookup_running_client(ClientId, _FormatFn = undefined) of
[] ->
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}};
[_ | _] ->
false
end
end,
Rows
).

%%--------------------------------------------------------------------
%% QueryString to Match Spec

Expand Down Expand Up @@ -925,7 +1070,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} |
%% format funcs

format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) ->
format_channel_info(node(), ChannInfo).
%% channel info from ETS table (live and/or in-memory session)
format_channel_info(node(), ChannInfo);
format_channel_info({ClientId, PSInfo}) ->
%% offline persistent session
format_persistent_session_info(ClientId, PSInfo).

format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
Node = maps:get(node, ClientInfo0, WhichNode),
Expand Down Expand Up @@ -983,7 +1132,29 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
maps:without(RemoveList, ClientInfoMap),
TimesKeys
)
).
);
format_channel_info(undefined, {ClientId, PSInfo0 = #{}}) ->
format_persistent_session_info(ClientId, PSInfo0).

format_persistent_session_info(ClientId, PSInfo0) ->
Metadata = maps:get(metadata, PSInfo0, #{}),
PSInfo1 = maps:with([created_at, expiry_interval], Metadata),
CreatedAt = maps:get(created_at, PSInfo1),
PSInfo2 = convert_expiry_interval_unit(PSInfo1),
PSInfo3 = PSInfo2#{
clientid => ClientId,
connected => false,
connected_at => CreatedAt,
ip_address => undefined,
is_persistent => true,
port => undefined
},
PSInfo = lists:foldl(
fun result_format_time_fun/2,
PSInfo3,
[created_at, connected_at]
),
result_format_undefined_to_null(PSInfo).

%% format func helpers
take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
Expand Down

0 comments on commit 3db3fd6

Please sign in to comment.