Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0304 improve text log formatter #12641

Merged
merged 3 commits into from Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 11 additions & 9 deletions apps/emqx/src/emqx_connection.erl
Expand Up @@ -186,6 +186,8 @@
-define(LIMITER_BYTES_IN, bytes).
-define(LIMITER_MESSAGE_IN, messages).

-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).

-dialyzer({no_match, [info/2]}).
-dialyzer(
{nowarn_function, [
Expand Down Expand Up @@ -282,7 +284,7 @@ async_set_keepalive(OS, Pid, Idle, Interval, Probes) ->
{ok, Options} ->
async_set_socket_options(Pid, Options);
{error, {unsupported_os, OS}} ->
?SLOG(warning, #{
?LOG(warning, #{
msg => "Unsupported operation: set TCP keepalive",
os => OS
}),
Expand Down Expand Up @@ -774,7 +776,7 @@ handle_timeout(TRef, Msg, State) ->
%% Parse incoming data
-compile({inline, [when_bytes_in/3]}).
when_bytes_in(Oct, Data, State) ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "raw_bin_received",
size => Oct,
bin => binary_to_list(binary:encode_hex(Data)),
Expand Down Expand Up @@ -810,15 +812,15 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
parse_incoming(Rest, [Packet | Packets], NState)
catch
throw:{?FRAME_PARSE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data,
parsed_packets => Packets
}),
{[{frame_error, Reason} | Packets], State};
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data,
parsed_packets => Packets,
Expand Down Expand Up @@ -873,7 +875,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> ->
?SLOG(warning, #{
?LOG(warning, #{
msg => "packet_is_discarded",
reason => "frame_is_too_large",
packet => emqx_packet:format(Packet, hidden)
Expand All @@ -889,13 +891,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
catch
%% Maybe Never happen.
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
input_packet => Packet
}),
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
input_packet => Packet,
exception => Reason,
stacktrace => Stacktrace
Expand Down Expand Up @@ -1018,7 +1020,7 @@ check_limiter(
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
{pause, Time, Limiter2} ->
?SLOG(debug, #{
msg => "pause_time_dueto_rate_limit",
msg => "pause_time_due_to_rate_limit",
needs => Needs,
time_in_ms => Time
}),
Expand Down Expand Up @@ -1070,7 +1072,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
);
{pause, Time, Limiter2} ->
?SLOG(debug, #{
msg => "pause_time_dueto_rate_limit",
msg => "pause_time_due_to_rate_limit",
types => Types,
time_in_ms => Time
}),
Expand Down
20 changes: 15 additions & 5 deletions apps/emqx/src/emqx_logger_textfmt.erl
Expand Up @@ -22,7 +22,11 @@

check_config(X) -> logger_formatter:check_config(X).

%% Principle here is to delegate the formatting to logger_formatter:format/2
%% as much as possible, and only enrich the report with clientid, peername, topic, username
format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(ReportMap) ->
%% The most common case, when entering from SLOG macro
%% i.e. logger:log(Level, #{msg => "my_msg", foo => bar})
ReportList = enrich_report(ReportMap, Meta),
Report =
case is_list_report_acceptable(Meta) of
Expand All @@ -33,13 +37,17 @@ format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(
end,
logger_formatter:format(Event#{msg := {report, Report}}, Config);
format(#{msg := {string, String}} = Event, Config) ->
%% copied from logger_formatter:format/2
%% unsure how this case is triggered
format(Event#{msg => {"~ts ", [String]}}, Config);
%% trace
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
%% For format strings like logger:log(Level, "~p", [Var])
%% and logger:log(Level, "message", #{key => value})
Msg1 = enrich_client_info(Msg0, Meta),
Msg2 = enrich_topic(Msg1, Meta),
logger_formatter:format(Event#{msg := Msg2}, Config).

%% Other report callbacks may only accept map() reports such as gen_server formatter
is_list_report_acceptable(#{report_cb := Cb}) ->
Cb =:= fun logger:format_otp_report/1 orelse Cb =:= fun logger:format_report/1;
is_list_report_acceptable(_) ->
Expand All @@ -61,19 +69,21 @@ enrich_report(ReportRaw, Meta) ->
ClientId = maps:get(clientid, Meta, undefined),
Peer = maps:get(peername, Meta, undefined),
Msg = maps:get(msg, ReportRaw, undefined),
Tag = maps:get(tag, ReportRaw, undefined),
%% turn it into a list so that the order of the fields is determined
lists:foldl(
fun
({_, undefined}, Acc) -> Acc;
(Item, Acc) -> [Item | Acc]
end,
maps:to_list(maps:without([topic, msg, clientid, username], ReportRaw)),
maps:to_list(maps:without([topic, msg, clientid, username, tag], ReportRaw)),
[
{username, try_format_unicode(Username)},
{topic, try_format_unicode(Topic)},
{clientid, try_format_unicode(ClientId)},
{username, try_format_unicode(Username)},
{peername, Peer},
{msg, Msg}
{msg, Msg},
{clientid, try_format_unicode(ClientId)},
{tag, Tag}
]
).

Expand Down
16 changes: 9 additions & 7 deletions apps/emqx/src/emqx_ws_connection.erl
Expand Up @@ -128,6 +128,8 @@
-dialyzer({no_match, [info/2]}).
-dialyzer({nowarn_function, [websocket_init/1]}).

-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).

%%--------------------------------------------------------------------
%% Info, Stats
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -401,7 +403,7 @@ get_peer_info(Type, Listener, Req, Opts) ->
websocket_handle({binary, Data}, State) when is_list(Data) ->
websocket_handle({binary, iolist_to_binary(Data)}, State);
websocket_handle({binary, Data}, State) ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "raw_bin_received",
size => iolist_size(Data),
bin => binary_to_list(binary:encode_hex(Data)),
Expand All @@ -428,7 +430,7 @@ websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong ->
return(State);
websocket_handle({Frame, _}, State) ->
%% TODO: should not close the ws connection
?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
shutdown(unexpected_ws_frame, State).
websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State);
Expand Down Expand Up @@ -714,15 +716,15 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
catch
throw:{?FRAME_PARSE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data
}),
FrameError = {frame_error, Reason},
{[{incoming, FrameError} | Packets], State};
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data,
exception => Reason,
Expand Down Expand Up @@ -812,7 +814,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> ->
?SLOG(warning, #{
?LOG(warning, #{
msg => "packet_discarded",
reason => "frame_too_large",
packet => emqx_packet:format(Packet)
Expand All @@ -828,13 +830,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
catch
%% Maybe Never happen.
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
input_packet => Packet
}),
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
input_packet => Packet,
exception => Reason,
stacktrace => Stacktrace
Expand Down
3 changes: 3 additions & 0 deletions changes/ce/feat-12641.en.md
@@ -0,0 +1,3 @@
Improve text log formatter fields order.

`tag` > `clientid` > `msg` > `peername` > `username` > `topic` > [other fields]