Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple clientid/username Qs params in clients API #12719

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
Original file line number Diff line number Diff line change
Expand Up @@ -444,20 +444,79 @@ check_parameter(
check_parameter([], _Bindings, _QueryStr, _Module, NewBindings, NewQueryStr) ->
{NewBindings, NewQueryStr};
check_parameter([{Name, Type} | Spec], Bindings, QueryStr, Module, BindingsAcc, QueryStrAcc) ->
Schema = ?INIT_SCHEMA#{roots => [{Name, Type}]},
case hocon_schema:field_schema(Type, in) of
path ->
Schema = ?INIT_SCHEMA#{roots => [{Name, Type}]},
Option = #{atom_key => true},
NewBindings = hocon_tconf:check_plain(Schema, Bindings, Option),
NewBindingsAcc = maps:merge(BindingsAcc, NewBindings),
check_parameter(Spec, Bindings, QueryStr, Module, NewBindingsAcc, QueryStrAcc);
query ->
Type1 = maybe_wrap_array_qs_param(Type),
Schema = ?INIT_SCHEMA#{roots => [{Name, Type1}]},
Option = #{},
NewQueryStr = hocon_tconf:check_plain(Schema, QueryStr, Option),
NewQueryStrAcc = maps:merge(QueryStrAcc, NewQueryStr),
check_parameter(Spec, Bindings, QueryStr, Module, BindingsAcc, NewQueryStrAcc)
end.

%% Compatibility layer for minirest 1.4.0 that parses repetitive QS params into lists.
%% Previous minirest releases dropped all but the last repetitive params.

maybe_wrap_array_qs_param(FieldSchema) ->
Conv = hocon_schema:field_schema(FieldSchema, converter),
Type = hocon_schema:field_schema(FieldSchema, type),
case array_or_single_qs_param(Type, Conv) of
any ->
FieldSchema;
array ->
override_conv(FieldSchema, fun wrap_array_conv/2, Conv);
single ->
override_conv(FieldSchema, fun unwrap_array_conv/2, Conv)
end.

array_or_single_qs_param(?ARRAY(_Type), undefined) ->
array;
%% Qs field schema is an array and defines a converter:
%% don't change (wrap/unwrap) the original value, and let the converter handle it.
%% For example, it can be a CSV list.
array_or_single_qs_param(?ARRAY(_Type), _Conv) ->
any;
array_or_single_qs_param(?UNION(Types), _Conv) ->
HasArray = lists:any(
fun
(?ARRAY(_)) -> true;
(_) -> false
end,
Types
),
case HasArray of
true -> any;
false -> single
end;
array_or_single_qs_param(_, _Conv) ->
single.

override_conv(FieldSchema, NewConv, OldConv) ->
Conv = compose_converters(NewConv, OldConv),
hocon_schema:override(FieldSchema, FieldSchema#{converter => Conv}).

compose_converters(NewFun, undefined = _OldFun) ->
NewFun;
compose_converters(NewFun, OldFun) ->
case erlang:fun_info(OldFun, arity) of
{_, 2} ->
fun(V, Opts) -> OldFun(NewFun(V, Opts), Opts) end;
{_, 1} ->
fun(V, Opts) -> OldFun(NewFun(V, Opts)) end
end.

wrap_array_conv(Val, _Opts) when is_list(Val); Val =:= undefined -> Val;
wrap_array_conv(SingleVal, _Opts) -> [SingleVal].

unwrap_array_conv([HVal | _], _Opts) -> HVal;
unwrap_array_conv(SingleVal, _Opts) -> SingleVal.

check_request_body(#{body := Body}, Schema, Module, CheckFun, true) ->
Type0 = hocon_schema:field_schema(Schema, type),
Type =
Expand Down
18 changes: 13 additions & 5 deletions apps/emqx_management/src/emqx_mgmt_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
finalize_query/2,
mark_complete/2,
format_query_result/3,
format_query_result/4,
maybe_collect_total_from_tail_nodes/2
]).

Expand Down Expand Up @@ -619,10 +620,13 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
is_fuzzy_key(_) ->
false.

format_query_result(_FmtFun, _MetaIn, Error = {error, _Node, _Reason}) ->
format_query_result(FmtFun, MetaIn, ResultAcc) ->
format_query_result(FmtFun, MetaIn, ResultAcc, #{}).

format_query_result(_FmtFun, _MetaIn, Error = {error, _Node, _Reason}, _Opts) ->
Error;
format_query_result(
FmtFun, MetaIn, ResultAcc = #{hasnext := HasNext, rows := RowsAcc}
FmtFun, MetaIn, ResultAcc = #{hasnext := HasNext, rows := RowsAcc}, Opts
) ->
Meta =
case ResultAcc of
Expand All @@ -638,18 +642,22 @@ format_query_result(
data => lists:flatten(
lists:foldl(
fun({Node, Rows}, Acc) ->
[lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc]
[
lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row, Opts) end, Rows)
| Acc
]
end,
[],
RowsAcc
)
)
}.

exec_format_fun(FmtFun, Node, Row) ->
exec_format_fun(FmtFun, Node, Row, Opts) ->
case erlang:fun_info(FmtFun, arity) of
{arity, 1} -> FmtFun(Row);
{arity, 2} -> FmtFun(Node, Row)
{arity, 2} -> FmtFun(Node, Row);
{arity, 3} -> FmtFun(Node, Row, Opts)
end.

parse_pager_params(Params) ->
Expand Down
157 changes: 115 additions & 42 deletions apps/emqx_management/src/emqx_mgmt_api_clients.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
qs2ms/2,
run_fuzzy_filter/2,
format_channel_info/1,
format_channel_info/2
format_channel_info/2,
format_channel_info/3
]).

%% for batch operation
Expand All @@ -66,7 +67,10 @@

-define(CLIENT_QSCHEMA, [
{<<"node">>, atom},
%% list
{<<"username">>, binary},
%% list
{<<"clientid">>, binary},
{<<"ip_address">>, ip},
{<<"conn_state">>, atom},
{<<"clean_start">>, atom},
Expand Down Expand Up @@ -125,10 +129,13 @@ schema("/clients") ->
example => <<"emqx@127.0.0.1">>
})},
{username,
hoconsc:mk(binary(), #{
hoconsc:mk(hoconsc:array(binary()), #{
in => query,
required => false,
desc => <<"User name">>
desc => <<
"User name, multiple values can be specified by"
" repeating the parameter: username=u1&username=u2"
>>
})},
{ip_address,
hoconsc:mk(binary(), #{
Expand Down Expand Up @@ -202,7 +209,17 @@ schema("/clients") ->
"Search client connection creation time by less"
" than or equal method, rfc3339 or timestamp(millisecond)"
>>
})}
})},
{clientid,
hoconsc:mk(hoconsc:array(binary()), #{
in => query,
required => false,
desc => <<
"Client ID, multiple values can be specified by"
" repeating the parameter: clientid=c1&clientid=c2"
>>
})},
?R_REF(requested_client_fields)
],
responses => #{
200 =>
Expand Down Expand Up @@ -656,6 +673,30 @@ fields(message) ->
{from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})},
{from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})},
{payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}
];
fields(requested_client_fields) ->
%% NOTE: some Client fields actually returned in response are missing in schema:
%% enable_authn, is_persistent, listener, peerport
zmstone marked this conversation as resolved.
Show resolved Hide resolved
ClientFields = [element(1, F) || F <- fields(client)],
[
{fields,
hoconsc:mk(
hoconsc:union([all, hoconsc:array(hoconsc:enum(ClientFields))]),
#{
in => query,
required => false,
default => all,
desc => <<"Comma separated list of client fields to return in the response">>,
converter => fun
(all, _Opts) ->
all;
(<<"all">>, _Opts) ->
all;
(CsvFields, _Opts) when is_binary(CsvFields) ->
binary:split(CsvFields, <<",">>, [global, trim_all])
end
}
)}
].

%%%==============================================================================================
Expand Down Expand Up @@ -971,7 +1012,10 @@ list_clients_cluster_query(QString, Options) ->
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
),
Res = do_list_clients_cluster_query(Nodes, QueryState, ResultAcc),
emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
Opts = #{fields => maps:get(<<"fields">>, QString, all)},
emqx_mgmt_api:format_query_result(
fun ?MODULE:format_channel_info/3, Meta, Res, Opts
)
catch
throw:{bad_value_type, {Key, ExpectedType, AcutalValue}} ->
{error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}}
Expand Down Expand Up @@ -1023,7 +1067,8 @@ list_clients_node_query(Node, QString, Options) ->
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
),
Res = do_list_clients_node_query(Node, QueryState, ResultAcc),
emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
Opts = #{fields => maps:get(<<"fields">>, QString, all)},
emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/3, Meta, Res, Opts)
end.

add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
Expand Down Expand Up @@ -1190,19 +1235,36 @@ qs2ms(_Tab, {QString, FuzzyQString}) ->
-spec qs2ms(list()) -> ets:match_spec().
qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
[{{'$1', MtchHead, '_'}, Conds, ['$_']}].
[{{{'$1', '_'}, MtchHead, '_'}, Conds, ['$_']}].

qs2ms([], _, {MtchHead, Conds}) ->
{MtchHead, lists:reverse(Conds)};
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) when is_list(Value) ->
{Holder, NxtN} = holder_and_nxt(Key, N),
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Holder)),
qs2ms(Rest, NxtN, {NMtchHead, [orelse_cond(Holder, Value) | Conds]});
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
qs2ms(Rest, N, {NMtchHead, Conds});
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
Holder = holder(N),
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
NConds = put_conds(Qs, Holder, Conds),
qs2ms(Rest, N + 1, {NMtchHead, NConds}).

%% This is a special case: clientid is a part of the key (ClientId, Pid}, as the table is ordered_set,
%% using partially bound key optimizes traversal.
holder_and_nxt(clientid, N) ->
{'$1', N};
holder_and_nxt(_, N) ->
{holder(N), N + 1}.

holder(N) -> list_to_atom([$$ | integer_to_list(N)]).

orelse_cond(Holder, ValuesList) ->
Conds = [{'=:=', Holder, V} || V <- ValuesList],
erlang:list_to_tuple(['orelse' | Conds]).

put_conds({_, Op, V}, Holder, Conds) ->
[{Op, Holder, V} | Conds];
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
Expand All @@ -1212,8 +1274,8 @@ put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
| Conds
].

ms(clientid, X) ->
#{clientinfo => #{clientid => X}};
ms(clientid, _X) ->
#{};
ms(username, X) ->
#{clientinfo => #{username => X}};
ms(conn_state, X) ->
Expand Down Expand Up @@ -1257,7 +1319,11 @@ format_channel_info({ClientId, PSInfo}) ->
%% offline persistent session
format_persistent_session_info(ClientId, PSInfo).

format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
format_channel_info(WhichNode, ChanInfo) ->
DefaultOpts = #{fields => all},
format_channel_info(WhichNode, ChanInfo, DefaultOpts).

format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
Node = maps:get(node, ClientInfo0, WhichNode),
ClientInfo1 = emqx_utils_maps:deep_remove([conninfo, clientid], ClientInfo0),
ClientInfo2 = emqx_utils_maps:deep_remove([conninfo, username], ClientInfo1),
Expand All @@ -1276,45 +1342,17 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5),

RemoveList =
[
auth_result,
peername,
sockname,
peerhost,
conn_state,
send_pend,
conn_props,
peercert,
sockstate,
subscriptions,
receive_maximum,
protocol,
is_superuser,
sockport,
anonymous,
socktype,
active_n,
await_rel_timeout,
conn_mod,
sockname,
retry_interval,
upgrade_qos,
zone,
%% session_id, defined in emqx_session.erl
id,
acl
],
#{fields := RequestedFields} = Opts,
TimesKeys = [created_at, connected_at, disconnected_at],
%% format timestamp to rfc3339
result_format_undefined_to_null(
lists:foldl(
fun result_format_time_fun/2,
maps:without(RemoveList, ClientInfoMap),
with_client_info_fields(ClientInfoMap, RequestedFields),
TimesKeys
)
);
format_channel_info(undefined, {ClientId, PSInfo0 = #{}}) ->
format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
format_persistent_session_info(ClientId, PSInfo0).

format_persistent_session_info(ClientId, PSInfo0) ->
Expand All @@ -1337,6 +1375,41 @@ format_persistent_session_info(ClientId, PSInfo0) ->
),
result_format_undefined_to_null(PSInfo).

with_client_info_fields(ClientInfoMap, all) ->
RemoveList =
[
auth_result,
peername,
sockname,
peerhost,
peerport,
conn_state,
send_pend,
conn_props,
peercert,
sockstate,
subscriptions,
receive_maximum,
protocol,
is_superuser,
sockport,
anonymous,
socktype,
active_n,
await_rel_timeout,
conn_mod,
sockname,
retry_interval,
upgrade_qos,
zone,
%% session_id, defined in emqx_session.erl
id,
acl
],
maps:without(RemoveList, ClientInfoMap);
with_client_info_fields(ClientInfoMap, RequestedFields) when is_list(RequestedFields) ->
maps:with(RequestedFields, ClientInfoMap).

format_msgs_resp(MsgType, Msgs, Meta, QString) ->
#{
<<"payload">> := PayloadFmt,
Expand Down
Loading
Loading