Skip to content

Commit

Permalink
chore: add tag for logs from MQTT connection modules
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Mar 4, 2024
1 parent 6c9eb16 commit 6863127
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
22 changes: 12 additions & 10 deletions apps/emqx/src/emqx_connection.erl
Expand Up @@ -186,6 +186,8 @@
-define(LIMITER_BYTES_IN, bytes).
-define(LIMITER_MESSAGE_IN, messages).

-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).

-dialyzer({no_match, [info/2]}).
-dialyzer(
{nowarn_function, [
Expand Down Expand Up @@ -282,7 +284,7 @@ async_set_keepalive(OS, Pid, Idle, Interval, Probes) ->
{ok, Options} ->
async_set_socket_options(Pid, Options);
{error, {unsupported_os, OS}} ->
?SLOG(warning, #{
?LOG(warning, #{
msg => "Unsupported operation: set TCP keepalive",
os => OS
}),
Expand Down Expand Up @@ -774,7 +776,7 @@ handle_timeout(TRef, Msg, State) ->
%% Parse incoming data
-compile({inline, [when_bytes_in/3]}).
when_bytes_in(Oct, Data, State) ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "raw_bin_received",
size => Oct,
bin => binary_to_list(binary:encode_hex(Data)),
Expand Down Expand Up @@ -810,15 +812,15 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
parse_incoming(Rest, [Packet | Packets], NState)
catch
throw:{?FRAME_PARSE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data,
parsed_packets => Packets
}),
{[{frame_error, Reason} | Packets], State};
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data,
parsed_packets => Packets,
Expand Down Expand Up @@ -873,7 +875,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> ->
?SLOG(warning, #{
?LOG(warning, #{
msg => "packet_is_discarded",
reason => "frame_is_too_large",
packet => emqx_packet:format(Packet, hidden)
Expand All @@ -889,13 +891,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
catch
%% Maybe Never happen.
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
input_packet => Packet
}),
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
input_packet => Packet,
exception => Reason,
stacktrace => Stacktrace
Expand Down Expand Up @@ -938,7 +940,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
end;
handle_info({sock_error, Reason}, State) ->
case Reason =/= closed andalso Reason =/= einval of
true -> ?SLOG(warning, #{msg => "socket_error", reason => Reason});
true -> ?LOG(warning, #{msg => "socket_error", reason => Reason});
false -> ok
end,
handle_info({sock_closed, Reason}, close_socket(State));
Expand Down Expand Up @@ -1017,7 +1019,7 @@ check_limiter(
{ok, Limiter2} ->
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
{pause, Time, Limiter2} ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "pause_time_dueto_rate_limit",
needs => Needs,
time_in_ms => Time
Expand Down Expand Up @@ -1069,7 +1071,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
}
);
{pause, Time, Limiter2} ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "pause_time_dueto_rate_limit",
types => Types,
time_in_ms => Time
Expand Down
28 changes: 15 additions & 13 deletions apps/emqx/src/emqx_ws_connection.erl
Expand Up @@ -128,6 +128,8 @@
-dialyzer({no_match, [info/2]}).
-dialyzer({nowarn_function, [websocket_init/1]}).

-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).

%%--------------------------------------------------------------------
%% Info, Stats
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -210,7 +212,7 @@ init(Req, #{listener := {Type, Listener}} = Opts) ->
},
case check_origin_header(Req, Opts) of
{error, Reason} ->
?SLOG(error, #{msg => "invalid_origin_header", reason => Reason}),
?LOG(error, #{msg => "invalid_origin_header", reason => Reason}),
{ok, cowboy_req:reply(403, Req), WsOpts};
ok ->
parse_sec_websocket_protocol(Req, Opts, WsOpts)
Expand Down Expand Up @@ -362,10 +364,10 @@ get_ws_cookie(Req) ->
cowboy_req:parse_cookies(Req)
catch
error:badarg ->
?SLOG(error, #{msg => "bad_cookie"}),
?LOG(error, #{msg => "bad_cookie"}),
undefined;
Error:Reason ->
?SLOG(error, #{
?LOG(error, #{
msg => "failed_to_parse_cookie",
exception => Error,
reason => Reason
Expand Down Expand Up @@ -401,7 +403,7 @@ get_peer_info(Type, Listener, Req, Opts) ->
websocket_handle({binary, Data}, State) when is_list(Data) ->
websocket_handle({binary, iolist_to_binary(Data)}, State);
websocket_handle({binary, Data}, State) ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "raw_bin_received",
size => iolist_size(Data),
bin => binary_to_list(binary:encode_hex(Data)),
Expand All @@ -428,7 +430,7 @@ websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong ->
return(State);
websocket_handle({Frame, _}, State) ->
%% TODO: should not close the ws connection
?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
shutdown(unexpected_ws_frame, State).
websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State);
Expand Down Expand Up @@ -604,7 +606,7 @@ check_limiter(
{ok, Limiter2} ->
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
{pause, Time, Limiter2} ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "pause_time_due_to_rate_limit",
needs => Needs,
time_in_ms => Time
Expand Down Expand Up @@ -657,7 +659,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
}
);
{pause, Time, Limiter2} ->
?SLOG(debug, #{
?LOG(debug, #{
msg => "pause_time_due_to_rate_limit",
types => Types,
time_in_ms => Time
Expand Down Expand Up @@ -714,15 +716,15 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
catch
throw:{?FRAME_PARSE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data
}),
FrameError = {frame_error, Reason},
{[{incoming, FrameError} | Packets], State};
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
input_bytes => Data,
exception => Reason,
Expand Down Expand Up @@ -812,7 +814,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> ->
?SLOG(warning, #{
?LOG(warning, #{
msg => "packet_discarded",
reason => "frame_too_large",
packet => emqx_packet:format(Packet)
Expand All @@ -828,13 +830,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
catch
%% Maybe Never happen.
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
?SLOG(info, #{
?LOG(info, #{
reason => Reason,
input_packet => Packet
}),
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
error:Reason:Stacktrace ->
?SLOG(error, #{
?LOG(error, #{
input_packet => Packet,
exception => Reason,
stacktrace => Stacktrace
Expand Down Expand Up @@ -1050,7 +1052,7 @@ check_max_connection(Type, Listener) ->
current => Curr,
msg => "websocket_max_connections_limited"
},
?SLOG(warning, Reason),
?LOG(warning, Reason),
{denny, Reason}
end
end.
Expand Down

0 comments on commit 6863127

Please sign in to comment.