Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format the listeners API fields #8571

Merged
merged 6 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES-5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* Remove `/configs/listeners` API, use `/listeners/` instead. [#8485](https://github.com/emqx/emqx/pull/8485)
* Optimize performance of builtin database operations in processes with long message queue [#8439](https://github.com/emqx/emqx/pull/8439)
* Improve authentication tracing. [#8554](https://github.com/emqx/emqx/pull/8554)
* Standardize the '/listeners' and `/gateway/<name>/listeners` API fields.
It will introduce some incompatible updates, see [#8571](https://github.com/emqx/emqx/pull/8571)

# 5.0.3

Expand Down
45 changes: 30 additions & 15 deletions apps/emqx/src/emqx_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

-export([pre_config_update/3, post_config_update/5]).

-export([format_addr/1]).
-export([format_bind/1]).

-define(CONF_KEY_PATH, [listeners, '?', '?']).
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
Expand Down Expand Up @@ -201,14 +201,14 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
?tp(listener_started, #{type => Type, bind => Bind}),
console_print(
"Listener ~ts on ~ts started.~n",
[listener_id(Type, ListenerName), format_addr(Bind)]
[listener_id(Type, ListenerName), format_bind(Bind)]
),
ok;
{error, {already_started, Pid}} ->
{error, {already_started, Pid}};
{error, Reason} ->
ListenerId = listener_id(Type, ListenerName),
BindStr = format_addr(Bind),
BindStr = format_bind(Bind),
?ELOG(
"Failed to start listener ~ts on ~ts: ~0p.~n",
[ListenerId, BindStr, Reason]
Expand Down Expand Up @@ -261,19 +261,19 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
ok ->
console_print(
"Listener ~ts on ~ts stopped.~n",
[listener_id(Type, ListenerName), format_addr(Bind)]
[listener_id(Type, ListenerName), format_bind(Bind)]
),
ok;
{error, not_found} ->
?ELOG(
"Failed to stop listener ~ts on ~ts: ~0p~n",
[listener_id(Type, ListenerName), format_addr(Bind), already_stopped]
[listener_id(Type, ListenerName), format_bind(Bind), already_stopped]
),
ok;
{error, Reason} ->
?ELOG(
"Failed to stop listener ~ts on ~ts: ~0p~n",
[listener_id(Type, ListenerName), format_addr(Bind), Reason]
[listener_id(Type, ListenerName), format_bind(Bind), Reason]
),
{error, Reason}
end.
Expand Down Expand Up @@ -492,17 +492,32 @@ merge_default(Options) ->
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
end.

format_addr(Port) when is_integer(Port) ->
io_lib:format("~w", [Port]);
-spec format_bind(
integer() | {tuple(), integer()} | string() | binary()
) -> io_lib:chars().
format_bind(Port) when is_integer(Port) ->
io_lib:format(":~w", [Port]);
%% Print only the port number when bound on all interfaces
format_addr({{0, 0, 0, 0}, Port}) ->
format_addr(Port);
format_addr({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) ->
format_addr(Port);
format_addr({Addr, Port}) when is_list(Addr) ->
format_bind({{0, 0, 0, 0}, Port}) ->
format_bind(Port);
format_bind({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) ->
format_bind(Port);
format_bind({Addr, Port}) when is_list(Addr) ->
io_lib:format("~ts:~w", [Addr, Port]);
format_addr({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 4 ->
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 8 ->
io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]);
%% Support string, binary type for Port or IP:Port
format_bind(Str) when is_list(Str) ->
case emqx_schema:to_ip_port(Str) of
{ok, {Ip, Port}} ->
format_bind({Ip, Port});
{error, _} ->
format_bind(list_to_integer(Str))
end;
format_bind(Bin) when is_binary(Bin) ->
format_bind(binary_to_list(Bin)).

listener_id(Type, ListenerName) ->
list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_dashboard/etc/emqx_dashboard.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dashboard {
listeners.http {
bind: 18083
bind = 18083
}
default_username: "admin"
default_password: "public"
default_username = "admin"
default_password = "public"
}
4 changes: 2 additions & 2 deletions apps/emqx_dashboard/src/emqx_dashboard.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ start_listeners(Listeners) ->
case minirest:start(Name, RanchOptions, Minirest) of
{ok, _} ->
?ULOG("Listener ~ts on ~ts started.~n", [
Name, emqx_listeners:format_addr(Bind)
Name, emqx_listeners:format_bind(Bind)
]),
Acc;
{error, _Reason} ->
Expand All @@ -114,7 +114,7 @@ stop_listeners(Listeners) ->
case minirest:stop(Name) of
ok ->
?ULOG("Stop listener ~ts on ~ts successfully.~n", [
Name, emqx_listeners:format_addr(Port)
Name, emqx_listeners:format_bind(Port)
]);
{error, not_found} ->
?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port})
Expand Down
7 changes: 7 additions & 0 deletions apps/emqx_gateway/i18n/emqx_gateway_api_listeners_i18n.conf
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ emqx_gateway_api_listeners {
}
}

listener_status {
desc {
en: """listener status """
zh: """监听器状态"""
}
}

listener_node_status {
desc {
en: """listener status of each node in the cluster"""
Expand Down
83 changes: 62 additions & 21 deletions apps/emqx_gateway/src/emqx_gateway_api_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ paths() ->

listeners(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
Result = get_cluster_listeners_info(GwName),
Result = lists:map(fun bind2str/1, get_cluster_listeners_info(GwName)),
{200, Result}
end);
listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
Expand Down Expand Up @@ -119,7 +119,7 @@ listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_conf:listener(ListenerId) of
{ok, Listener} ->
{200, Listener};
{200, bind2str(Listener)};
{error, not_found} ->
return_http_error(404, "Listener not found");
{error, Reason} ->
Expand Down Expand Up @@ -266,11 +266,14 @@ get_cluster_listeners_info(GwName) ->
ClusterStatus
),

{MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus),
{MaxCons, CurrCons, Running} = aggregate_listener_status(NodeStatus),

Listener#{
max_connections => MaxCons,
current_connections => CurrCons,
status => #{
running => Running,
max_connections => MaxCons,
current_connections => CurrCons
},
node_status => NodeStatus
}
end,
Expand All @@ -292,20 +295,23 @@ do_listeners_cluster_status(Listeners) ->
fun({Id, ListenOn}, Acc) ->
BinId = erlang:atom_to_binary(Id),
{ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
Curr =
{Running, Curr} =
try esockd:get_current_connections({Id, ListenOn}) of
Int -> Int
Int -> {true, Int}
catch
%% not started
error:not_found ->
0
{false, 0}
end,
Acc#{
Id => #{
node => Node,
current_connections => Curr,
%% XXX: Since it is taken from raw-conf, it is possible a string
max_connections => int(Max)
status => #{
running => Running,
current_connections => Curr,
%% XXX: Since it is taken from raw-conf, it is possible a string
max_connections => int(Max)
}
}
}
end,
Expand All @@ -317,6 +323,31 @@ int(B) when is_binary(B) ->
binary_to_integer(B);
int(I) when is_integer(I) ->
I.
aggregate_listener_status(NodeStatus) ->
aggregate_listener_status(NodeStatus, 0, 0, undefined).

aggregate_listener_status(
[
#{status := #{running := Running, max_connections := Max, current_connections := Current}}
| T
],
MaxAcc,
CurrAcc,
RunningAcc
) ->
NRunning = aggregate_running(Running, RunningAcc),
aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning);
aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) ->
{MaxAcc, CurrAcc, RunningAcc}.

aggregate_running(R, R) -> R;
aggregate_running(R, undefined) -> R;
aggregate_running(_, _) -> inconsistent.

bind2str(Listener = #{bind := Bind}) ->
Listener#{bind := iolist_to_binary(emqx_listeners:format_bind(Bind))};
bind2str(Listener = #{<<"bind">> := Bind}) ->
Listener#{<<"bind">> := iolist_to_binary(emqx_listeners:format_bind(Bind))}.

%%--------------------------------------------------------------------
%% Swagger defines
Expand Down Expand Up @@ -590,22 +621,25 @@ params_paging_in_qs() ->
roots() ->
[listener].

fields(listener_node_status) ->
fields(listener_status) ->
[
{current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})},
{status,
mk(ref(emqx_mgmt_api_listeners, status), #{
desc => ?DESC(listener_status)
})},
{node_status,
mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{
desc => ?DESC(listener_node_status)
})}
];
fields(tcp_listener) ->
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_status);
fields(ssl_listener) ->
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_status);
fields(udp_listener) ->
emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(udp_listener) ++ fields(listener_status);
fields(dtls_listener) ->
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status);
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_status);
fields(_) ->
[].

Expand All @@ -623,12 +657,19 @@ listener_node_status_schema() ->
examples_listener_list() ->
Convert = fun(Cfg) ->
Cfg#{
current_connections => 0,
status => #{
running => true,
max_connections => 1024000,
current_connections => 10
},
node_status => [
#{
node => <<"127.0.0.1">>,
current_connections => 0,
max_connections => 1024000
node => <<"emqx@127.0.0.1">>,
status => #{
running => true,
current_connections => 10,
max_connections => 1024000
}
}
]
}
Expand Down
23 changes: 5 additions & 18 deletions apps/emqx_gateway/src/emqx_gateway_conf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,11 @@ do_convert_listener(GwName, LType, Conf) ->

do_convert_listener2(GwName, LType, LName, LConf) ->
ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName),
Running = emqx_gateway_utils:is_running(ListenerId, LConf),
bind2str(
LConf#{
id => ListenerId,
type => LType,
name => LName,
running => Running
}
).

bind2str(LConf = #{bind := Bind}) when is_integer(Bind) ->
maps:put(bind, integer_to_binary(Bind), LConf);
bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) ->
maps:put(<<"bind">>, integer_to_binary(Bind), LConf);
bind2str(LConf = #{bind := Bind}) when is_binary(Bind) ->
LConf;
bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) ->
LConf.
LConf#{
id => ListenerId,
type => LType,
name => LName
}.

get_bind(#{bind := Bind}) ->
emqx_gateway_utils:parse_listenon(Bind);
Expand Down
12 changes: 2 additions & 10 deletions apps/emqx_gateway/src/emqx_gateway_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

-export([
apply/2,
format_listenon/1,
parse_listenon/1,
unix_ts_to_rfc3339/1,
unix_ts_to_rfc3339/2,
Expand Down Expand Up @@ -165,7 +164,7 @@ start_listener(
{Type, LisName, ListenOn, SocketOpts, Cfg},
ModCfg
) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
ListenOnStr = emqx_listeners:format_bind(ListenOn),
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName),

NCfg = maps:merge(Cfg, ModCfg),
Expand Down Expand Up @@ -243,7 +242,7 @@ stop_listeners(GwName, Listeners) ->
-spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok.
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
ListenOnStr = emqx_listeners:format_bind(ListenOn),
case StopRet of
ok ->
console_print(
Expand Down Expand Up @@ -287,13 +286,6 @@ apply(F, A2) when
->
erlang:apply(F, A2).

format_listenon(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format_listenon({Addr, Port}) when is_list(Addr) ->
io_lib:format("~ts:~w", [Addr, Port]);
format_listenon({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).

parse_listenon(Port) when is_integer(Port) ->
Port;
parse_listenon(IpPort) when is_tuple(IpPort) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
)}
]
end,
ListenOnStr = emqx_listeners:format_addr(ListenOn),
ListenOnStr = emqx_listeners:format_bind(ListenOn),
case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of
{ok, _SvrPid} ->
console_print(
Expand Down