diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 4912ebe957..158fe77954 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -36,10 +36,16 @@ -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([get_subscriptions/1, put_subscription/4, del_subscription/3]). --export([make_session_iterator/0, session_iterator_next/2]). +-export([make_session_iterator/0, session_iterator_next/2, session_count/0]). -export_type([ - t/0, metadata/0, subscriptions/0, seqno_type/0, stream_key/0, rank_key/0, session_iterator/0 + t/0, + metadata/0, + subscriptions/0, + seqno_type/0, + stream_key/0, + rank_key/0, + session_iterator/0 ]). -include("emqx_mqtt.hrl"). @@ -53,7 +59,7 @@ -type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). --opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. +-type session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. %% Generic key-value wrapper that is used for exporting arbitrary %% terms to mnesia: @@ -359,14 +365,15 @@ del_rank(Key, Rec) -> fold_ranks(Fun, Acc, Rec) -> gen_fold(ranks, Fun, Acc, Rec). +-spec session_count() -> non_neg_integer(). +session_count() -> + %% N.B.: this is potentially costly. Should not be called in hot paths. + %% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse... + do_session_count(make_session_iterator(), 0). + -spec make_session_iterator() -> session_iterator(). make_session_iterator() -> - case mnesia:dirty_first(?session_tab) of - '$end_of_table' -> - '$end_of_table'; - Key -> - Key - end. + mnesia:dirty_first(?session_tab). -spec session_iterator_next(session_iterator(), pos_integer()) -> {[{emqx_persistent_session_ds:id(), metadata()}], session_iterator()}. @@ -564,6 +571,13 @@ ro_transaction(Fun) -> %% {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun), %% Res. +%% + +do_session_count('$end_of_table', N) -> + N; +do_session_count(Cursor, N) -> + do_session_count(mnesia:dirty_next(?session_tab, Cursor), N + 1). + -compile({inline, check_sequence/1}). -ifdef(CHECK_SEQNO). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index a5cf0b5847..a1ab0bc3f5 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -66,6 +66,9 @@ do_kickout_clients/1 ]). +%% Internal exports +-export([lookup_running_client/2]). + %% Internal functions -export([do_call_client/2]). @@ -314,10 +317,16 @@ nodes_info_count(PropList) -> %%-------------------------------------------------------------------- lookup_client({clientid, ClientId}, FormatFun) -> - lists:append([ - lookup_client(Node, {clientid, ClientId}, FormatFun) - || Node <- emqx:running_nodes() - ]); + IsPersistenceEnabled = emqx_persistent_message:is_persistence_enabled(), + case lookup_running_client(ClientId, FormatFun) of + [] when IsPersistenceEnabled -> + case emqx_persistent_session_ds_state:print_session(ClientId) of + undefined -> []; + Session -> [maybe_format(FormatFun, {ClientId, Session})] + end; + Res -> + Res + end; lookup_client({username, Username}, FormatFun) -> lists:append([ lookup_client(Node, {username, Username}, FormatFun) @@ -633,6 +642,16 @@ create_banned(Banned) -> delete_banned(Who) -> emqx_banned:delete(Who). +%%-------------------------------------------------------------------- +%% Internal exports +%%-------------------------------------------------------------------- + +lookup_running_client(ClientId, FormatFun) -> + lists:append([ + lookup_client(Node, {clientid, ClientId}, FormatFun) + || Node <- emqx:running_nodes() + ]). + %%-------------------------------------------------------------------- %% Internal Functions. %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 4db37a7dcb..b254fd0cfb 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -39,7 +39,13 @@ parse_pager_params/1, parse_qstring/2, init_query_result/0, - accumulate_query_rows/4 + init_query_state/5, + reset_query_state/1, + accumulate_query_rows/4, + finalize_query/2, + mark_complete/2, + format_query_result/3, + maybe_collect_total_from_tail_nodes/2 ]). -ifdef(TEST). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 8965f46339..f5f99d68e8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -698,26 +698,13 @@ list_clients(QString) -> case maps:get(<<"node">>, QString, undefined) of undefined -> Options = #{fast_total_counting => true}, - emqx_mgmt_api:cluster_query( - ?CHAN_INFO_TAB, - QString, - ?CLIENT_QSCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format_channel_info/2, - Options - ); + list_clients_cluster_query(QString, Options); Node0 -> case emqx_utils:safe_to_existing_atom(Node0) of {ok, Node1} -> - QStringWithoutNode = maps:without([<<"node">>], QString), - emqx_mgmt_api:node_query( - Node1, - ?CHAN_INFO_TAB, - QStringWithoutNode, - ?CLIENT_QSCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format_channel_info/2 - ); + QStringWithoutNode = maps:remove(<<"node">>, QString), + Options = #{}, + list_clients_node_query(Node1, QStringWithoutNode, Options); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} end @@ -851,6 +838,164 @@ do_unsubscribe(ClientID, Topic) -> Res end. +list_clients_cluster_query(QString, Options) -> + case emqx_mgmt_api:parse_pager_params(QString) of + false -> + {error, page_limit_invalid}; + Meta = #{} -> + try + {_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA), + Nodes = emqx:running_nodes(), + ResultAcc = emqx_mgmt_api:init_query_result(), + QueryState = emqx_mgmt_api:init_query_state( + ?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options + ), + Res = do_list_clients_cluster_query(Nodes, QueryState, ResultAcc), + emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res) + catch + throw:{bad_value_type, {Key, ExpectedType, AcutalValue}} -> + {error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}} + end + end. + +%% adapted from `emqx_mgmt_api:do_cluster_query' +do_list_clients_cluster_query( + [Node | Tail] = Nodes, + QueryState0, + ResultAcc +) -> + case emqx_mgmt_api:do_query(Node, QueryState0) of + {error, Error} -> + {error, Node, Error}; + {Rows, QueryState1 = #{complete := Complete0}} -> + case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of + {enough, NResultAcc} -> + %% TODO: add persistent session count? + %% TODO: this may return `{error, _, _}'... + QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes( + Tail, QueryState1 + ), + QueryState = add_persistent_session_count(QueryState2), + Complete = Complete0 andalso Tail =:= [] andalso no_persistent_sessions(), + emqx_mgmt_api:finalize_query( + NResultAcc, emqx_mgmt_api:mark_complete(QueryState, Complete) + ); + {more, NResultAcc} when not Complete0 -> + do_list_clients_cluster_query(Nodes, QueryState1, NResultAcc); + {more, NResultAcc} when Tail =/= [] -> + do_list_clients_cluster_query( + Tail, emqx_mgmt_api:reset_query_state(QueryState1), NResultAcc + ); + {more, NResultAcc} -> + QueryState = add_persistent_session_count(QueryState1), + do_persistent_session_query(NResultAcc, QueryState) + end + end. + +list_clients_node_query(Node, QString, Options) -> + case emqx_mgmt_api:parse_pager_params(QString) of + false -> + {error, page_limit_invalid}; + Meta = #{} -> + {_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA), + ResultAcc = emqx_mgmt_api:init_query_result(), + QueryState = emqx_mgmt_api:init_query_state( + ?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options + ), + Res = do_list_clients_node_query(Node, QueryState, ResultAcc), + emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res) + end. + +add_persistent_session_count(QueryState0 = #{total := Totals0}) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + %% TODO: currently, counting persistent sessions can be not only costly (needs + %% to traverse the whole table), but also hard to deduplicate live connections + %% from it... So this count will possibly overshoot the true count of + %% sessions. + SessionCount = emqx_persistent_session_ds_state:session_count(), + Totals = Totals0#{undefined => SessionCount}, + QueryState0#{total := Totals}; + false -> + QueryState0 + end; +add_persistent_session_count(QueryState) -> + QueryState. + +%% adapted from `emqx_mgmt_api:do_node_query' +do_list_clients_node_query( + Node, + QueryState, + ResultAcc +) -> + case emqx_mgmt_api:do_query(Node, QueryState) of + {error, Error} -> + {error, Node, Error}; + {Rows, NQueryState = #{complete := Complete}} -> + case emqx_mgmt_api:accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of + {enough, NResultAcc} -> + FComplete = Complete andalso no_persistent_sessions(), + emqx_mgmt_api:finalize_query( + NResultAcc, emqx_mgmt_api:mark_complete(NQueryState, FComplete) + ); + {more, NResultAcc} when Complete -> + do_persistent_session_query(NResultAcc, NQueryState); + {more, NResultAcc} -> + do_list_clients_node_query(Node, NQueryState, NResultAcc) + end + end. + +init_persistent_session_iterator() -> + emqx_persistent_session_ds_state:make_session_iterator(). + +no_persistent_sessions() -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + '$end_of_table' =:= init_persistent_session_iterator(); + false -> + false + end. + +do_persistent_session_query(ResultAcc, QueryState) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + do_persistent_session_query1( + ResultAcc, + QueryState, + init_persistent_session_iterator() + ); + false -> + emqx_mgmt_api:finalize_query(ResultAcc, QueryState) + end. + +do_persistent_session_query1(ResultAcc, QueryState, Iter0) -> + %% Since persistent session data is accessible from all nodes, there's no need to go + %% through all the nodes. + #{limit := Limit} = QueryState, + {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), + Rows = remove_live_sessions(Rows0), + case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of + {enough, NResultAcc} -> + emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); + {more, NResultAcc} when Iter =:= '$end_of_table' -> + emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); + {more, NResultAcc} -> + do_persistent_session_query1(NResultAcc, QueryState, Iter) + end. + +remove_live_sessions(Rows) -> + lists:filtermap( + fun({ClientId, _Session}) -> + case emqx_mgmt:lookup_running_client(ClientId, _FormatFn = undefined) of + [] -> + {true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}}; + [_ | _] -> + false + end + end, + Rows + ). + %%-------------------------------------------------------------------- %% QueryString to Match Spec @@ -925,7 +1070,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | %% format funcs format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) -> - format_channel_info(node(), ChannInfo). + %% channel info from ETS table (live and/or in-memory session) + format_channel_info(node(), ChannInfo); +format_channel_info({ClientId, PSInfo}) -> + %% offline persistent session + format_persistent_session_info(ClientId, PSInfo). format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> Node = maps:get(node, ClientInfo0, WhichNode), @@ -983,7 +1132,29 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> maps:without(RemoveList, ClientInfoMap), TimesKeys ) - ). + ); +format_channel_info(undefined, {ClientId, PSInfo0 = #{}}) -> + format_persistent_session_info(ClientId, PSInfo0). + +format_persistent_session_info(ClientId, PSInfo0) -> + Metadata = maps:get(metadata, PSInfo0, #{}), + PSInfo1 = maps:with([created_at, expiry_interval], Metadata), + CreatedAt = maps:get(created_at, PSInfo1), + PSInfo2 = convert_expiry_interval_unit(PSInfo1), + PSInfo3 = PSInfo2#{ + clientid => ClientId, + connected => false, + connected_at => CreatedAt, + ip_address => undefined, + is_persistent => true, + port => undefined + }, + PSInfo = lists:foldl( + fun result_format_time_fun/2, + PSInfo3, + [created_at, connected_at] + ), + result_format_undefined_to_null(PSInfo). %% format func helpers take_maps_from_inner(_Key, Value, Current) when is_map(Value) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 6daf918f10..e7e6f14fe6 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -18,10 +18,27 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> - emqx_common_test_helpers:all(?MODULE). + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {group, persistent_sessions} + | AllTCs -- persistent_session_testcases() + ]. + +groups() -> + [{persistent_sessions, persistent_session_testcases()}]. + +persistent_session_testcases() -> + [ + t_persistent_sessions1, + t_persistent_sessions2, + t_persistent_sessions3, + t_persistent_sessions4, + t_persistent_sessions5 + ]. init_per_suite(Config) -> emqx_mgmt_api_test_util:init_suite(), @@ -30,6 +47,33 @@ init_per_suite(Config) -> end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite(). +init_per_group(persistent_sessions, Config) -> + AppSpecs = [ + {emqx, "session_persistence.enable = true"}, + emqx_management + ], + Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( + "dashboard.listeners.http { enable = true, bind = 18084 }" + ), + Cluster = [ + {emqx_mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}}, + {emqx_mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}} + ], + Nodes = emqx_cth_cluster:start( + Cluster, + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{nodes, Nodes} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(persistent_sessions, Config) -> + Nodes = ?config(nodes, Config), + emqx_cth_cluster:stop(Nodes), + ok; +end_per_group(_Group, _Config) -> + ok. + t_clients(_) -> process_flag(trap_exit, true), @@ -171,6 +215,290 @@ t_clients(_) -> AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path), ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1). +t_persistent_sessions1(Config) -> + [N1, _N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + %% Scenario 1 + %% 1) Client connects and is listed as connected. + ?tp(notice, "scenario 1", #{}), + O = #{api_port => APIPort}, + ClientId = <<"c1">>, + C1 = connect_client(#{port => Port1, clientid => ClientId}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + %% 2) Client disconnects and is listed as disconnected. + ok = emqtt:disconnect(C1), + assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}), + %% 3) Client reconnects and is listed as connected. + C2 = connect_client(#{port => Port1, clientid => ClientId}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + %% 4) Client disconnects. + ok = emqtt:stop(C2), + %% 5) Session is GC'ed, client is removed from list. + ?tp(notice, "gc", #{}), + %% simulate GC + ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]), + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, + list_request(APIPort) + ) + ), + ok + end, + [] + ), + ok. + +t_persistent_sessions2(Config) -> + [N1, _N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + %% Scenario 2 + %% 1) Client connects and is listed as connected. + ?tp(notice, "scenario 2", #{}), + O = #{api_port => APIPort}, + ClientId = <<"c2">>, + C1 = connect_client(#{port => Port1, clientid => ClientId}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + unlink(C1), + %% 2) Client connects to the same node and takes over, listed only once. + C2 = connect_client(#{port => Port1, clientid => ClientId}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + ok = emqtt:stop(C2), + ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]), + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, + list_request(APIPort) + ) + ), + + ok + end, + [] + ), + ok. + +t_persistent_sessions3(Config) -> + [N1, N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + Port2 = get_mqtt_port(N2, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + %% Scenario 3 + %% 1) Client connects and is listed as connected. + ?tp(notice, "scenario 3", #{}), + O = #{api_port => APIPort}, + ClientId = <<"c3">>, + C1 = connect_client(#{port => Port1, clientid => ClientId}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + unlink(C1), + %% 2) Client connects to *another node* and takes over, listed only once. + C2 = connect_client(#{port => Port2, clientid => ClientId}), + assert_single_client(O#{node => N2, clientid => ClientId, status => connected}), + %% Doesn't show up in the other node while alive + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, + list_request(APIPort, "node=" ++ atom_to_list(N1)) + ) + ), + ok = emqtt:stop(C2), + ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]), + + ok + end, + [] + ), + ok. + +t_persistent_sessions4(Config) -> + [N1, N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + Port2 = get_mqtt_port(N2, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + %% Scenario 4 + %% 1) Client connects and is listed as connected. + ?tp(notice, "scenario 4", #{}), + O = #{api_port => APIPort}, + ClientId = <<"c4">>, + C1 = connect_client(#{port => Port1, clientid => ClientId}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + %% 2) Client disconnects and is listed as disconnected. + ok = emqtt:stop(C1), + %% While disconnected, shows up in both nodes. + assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}), + assert_single_client(O#{node => N2, clientid => ClientId, status => disconnected}), + %% 3) Client reconnects to *another node* and is listed as connected once. + C2 = connect_client(#{port => Port2, clientid => ClientId}), + assert_single_client(O#{node => N2, clientid => ClientId, status => connected}), + %% Doesn't show up in the other node while alive + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, + list_request(APIPort, "node=" ++ atom_to_list(N1)) + ) + ), + ok = emqtt:stop(C2), + ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]), + + ok + end, + [] + ), + ok. + +t_persistent_sessions5(Config) -> + [N1, N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + Port2 = get_mqtt_port(N2, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + %% Pagination with mixed clients + ClientId1 = <<"c5">>, + ClientId2 = <<"c6">>, + ClientId3 = <<"c7">>, + ClientId4 = <<"c8">>, + %% persistent + C1 = connect_client(#{port => Port1, clientid => ClientId1}), + C2 = connect_client(#{port => Port2, clientid => ClientId2}), + %% in-memory + C3 = connect_client(#{ + port => Port1, clientid => ClientId3, expiry => 0, clean_start => true + }), + C4 = connect_client(#{ + port => Port2, clientid => ClientId4, expiry => 0, clean_start => true + }), + + P1 = list_request(APIPort, "limit=3&page=1"), + P2 = list_request(APIPort, "limit=3&page=2"), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_, _, _], + <<"meta">> := #{ + %% TODO: if/when we fix the persistent session count, this + %% should be 4. + <<"count">> := 6, + <<"hasnext">> := true + } + }}}, + P1 + ), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_], + <<"meta">> := #{ + %% TODO: if/when we fix the persistent session count, this + %% should be 4. + <<"count">> := 6, + <<"hasnext">> := false + } + }}}, + P2 + ), + {ok, {_, _, #{<<"data">> := R1}}} = P1, + {ok, {_, _, #{<<"data">> := R2}}} = P2, + ?assertEqual( + lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]), + lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R1 ++ R2)) + ), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_, _], + <<"meta">> := #{ + %% TODO: if/when we fix the persistent session count, this + %% should be 4. + <<"count">> := 6, + <<"hasnext">> := true + } + }}}, + list_request(APIPort, "limit=2&page=1") + ), + %% Disconnect persistent sessions + lists:foreach(fun emqtt:stop/1, [C1, C2]), + + P3 = + ?retry(200, 10, begin + P3_ = list_request(APIPort, "limit=3&page=1"), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_, _, _], + <<"meta">> := #{ + <<"count">> := 4, + <<"hasnext">> := true + } + }}}, + P3_ + ), + P3_ + end), + P4 = + ?retry(200, 10, begin + P4_ = list_request(APIPort, "limit=3&page=2"), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [_], + <<"meta">> := #{ + <<"count">> := 4, + <<"hasnext">> := false + } + }}}, + P4_ + ), + P4_ + end), + {ok, {_, _, #{<<"data">> := R3}}} = P3, + {ok, {_, _, #{<<"data">> := R4}}} = P4, + ?assertEqual( + lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]), + lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R3 ++ R4)) + ), + + lists:foreach(fun emqtt:stop/1, [C3, C4]), + + ok + end, + [] + ), + ok. + t_clients_bad_value_type(_) -> %% get /clients AuthHeader = [emqx_common_test_http:default_auth_header()], @@ -442,3 +770,111 @@ time_string_to_epoch(DateTime, Unit) when is_binary(DateTime) -> binary_to_list(DateTime), [{unit, Unit}] ) end. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. + +request(Method, Path, Params) -> + request(Method, Path, Params, _QueryParams = ""). + +request(Method, Path, Params, QueryParams) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +list_request(Port) -> + list_request(Port, _QueryParams = ""). + +list_request(Port, QueryParams) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]), + request(get, Path, [], QueryParams). + +lookup_request(ClientId) -> + lookup_request(ClientId, 18083). + +lookup_request(ClientId, Port) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]), + request(get, Path, []). + +assert_single_client(Opts) -> + #{ + api_port := APIPort, + clientid := ClientId, + node := Node, + status := Connected + } = Opts, + IsConnected = + case Connected of + connected -> true; + disconnected -> false + end, + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}}, + list_request(APIPort) + ) + ), + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}}, + list_request(APIPort, "node=" ++ atom_to_list(Node)), + #{node => Node} + ) + ), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}}, + lookup_request(ClientId, APIPort) + ), + ok. + +connect_client(Opts) -> + Defaults = #{ + expiry => 30, + clean_start => false + }, + #{ + port := Port, + clientid := ClientId, + clean_start := CleanStart, + expiry := EI + } = maps:merge(Defaults, Opts), + {ok, C} = emqtt:start_link([ + {port, Port}, + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, CleanStart}, + {properties, #{'Session-Expiry-Interval' => EI}} + ]), + {ok, _} = emqtt:connect(C), + C. diff --git a/changes/ce/fix-12500.en.md b/changes/ce/fix-12500.en.md new file mode 100644 index 0000000000..d1763f57ba --- /dev/null +++ b/changes/ce/fix-12500.en.md @@ -0,0 +1,3 @@ +Now disconnected persistent sessions are returned in the `GET /clients` and `GET /client/:clientid` HTTP APIs. + +Known issue: the total count returned by this API may overestimate the total number of clients.