Skip to content

Commit

Permalink
refactor: rename 'hint' to 'cause' for MQTT fram parse failure reason
Browse files Browse the repository at this point in the history
'reason' is maybe the wrapping field's name, so it was not used.
'hint' however, per our logging convention, is usually a free text
description for human to read.
change to 'cause' here because the field is always an atom and
it's use as shutdown counter in esockd_connection_sup
  • Loading branch information
zmstone committed Feb 19, 2024
1 parent 29c5c37 commit 8b0e15e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 53 deletions.
8 changes: 4 additions & 4 deletions apps/emqx/src/emqx_channel.erl
Expand Up @@ -541,15 +541,15 @@ handle_in(?AUTH_PACKET(), Channel) ->
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(shutdown_count(frame_error, Reason), Channel);
handle_in(
{frame_error, #{hint := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
{frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
handle_in(
{frame_error, #{hint := frame_too_large}}, Channel = #channel{conn_state = ConnState}
{frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
Expand Down Expand Up @@ -2331,8 +2331,8 @@ shutdown(Reason, Reply, Packet, Channel) ->

%% process exits with {shutdown, #{shutdown_count := Kind}} will trigger
%% the connection supervisor (esockd) to keep a shutdown-counter grouped by Kind
shutdown_count(_Kind, #{hint := Hint} = Reason) when is_atom(Hint) ->
Reason#{shutdown_count => Hint};
shutdown_count(_Kind, #{cause := Cause} = Reason) when is_atom(Cause) ->
Reason#{shutdown_count => Cause};
shutdown_count(Kind, Reason) when is_map(Reason) ->
Reason#{shutdown_count => Kind};
shutdown_count(Kind, Reason) ->
Expand Down
60 changes: 30 additions & 30 deletions apps/emqx/src/emqx_frame.erl
Expand Up @@ -168,7 +168,7 @@ parse_remaining_len(Rest, Header, Options) ->
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when
Length > MaxSize
->
?PARSE_ERR(#{hint => frame_too_large, limit => MaxSize, received => Length});
?PARSE_ERR(#{cause => frame_too_large, limit => MaxSize, received => Length});
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
%% Match DISCONNECT without payload
Expand All @@ -189,12 +189,12 @@ parse_remaining_len(
parse_remaining_len(
<<0:8, _Rest/binary>>, _Header = #mqtt_packet_header{type = ?PINGRESP}, 1, 0, _Options
) ->
?PARSE_ERR(#{hint => unexpected_packet, header_type => 'PINGRESP'});
?PARSE_ERR(#{cause => unexpected_packet, header_type => 'PINGRESP'});
%% All other types of messages should not have a zero remaining length.
parse_remaining_len(
<<0:8, _Rest/binary>>, Header, 1, 0, _Options
) ->
?PARSE_ERR(#{hint => zero_remaining_len, header_type => Header#mqtt_packet_header.type});
?PARSE_ERR(#{cause => zero_remaining_len, header_type => Header#mqtt_packet_header.type});
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options);
Expand All @@ -213,7 +213,7 @@ parse_remaining_len(
) ->
FrameLen = Value + Len * Multiplier,
case FrameLen > MaxSize of
true -> ?PARSE_ERR(#{hint => frame_too_large, limit => MaxSize, received => FrameLen});
true -> ?PARSE_ERR(#{cause => frame_too_large, limit => MaxSize, received => FrameLen});
false -> parse_frame(Rest, Header, FrameLen, Options)
end.

Expand Down Expand Up @@ -267,7 +267,7 @@ packet(Header, Variable, Payload) ->
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.

parse_connect(FrameBin, StrictMode) ->
{ProtoName, Rest} = parse_utf8_string_with_hint(FrameBin, StrictMode, invalid_proto_name),
{ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
case ProtoName of
<<"MQTT">> ->
ok;
Expand All @@ -277,7 +277,7 @@ parse_connect(FrameBin, StrictMode) ->
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
?PARSE_ERR(#{
hint => invalid_proto_name,
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
})
Expand All @@ -296,7 +296,7 @@ parse_connect2(
1 -> ?PARSE_ERR(reserved_connect_flag)
end,
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_hint(Rest3, StrictMode, invalid_clientid),
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
ConnPacket = #mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
Expand All @@ -315,14 +315,14 @@ parse_connect2(
{Username, Rest6} = parse_optional(
Rest5,
fun(Bin) ->
parse_utf8_string_with_hint(Bin, StrictMode, invalid_username)
parse_utf8_string_with_cause(Bin, StrictMode, invalid_username)
end,
bool(UsernameFlag)
),
{Password, Rest7} = parse_optional(
Rest6,
fun(Bin) ->
parse_utf8_string_with_hint(Bin, StrictMode, invalid_password)
parse_utf8_string_with_cause(Bin, StrictMode, invalid_password)
end,
bool(PasswordFlag)
),
Expand All @@ -331,13 +331,13 @@ parse_connect2(
ConnPacket1#mqtt_packet_connect{username = Username, password = Password};
_ ->
?PARSE_ERR(#{
hint => malformed_connect,
cause => malformed_connect,
unexpected_trailing_bytes => size(Rest7)
})
end;
parse_connect2(_ProtoName, Bin, _StrictMode) ->
%% sent less than 32 bytes
?PARSE_ERR(#{hint => malformed_connect, header_bytes => Bin}).
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).

parse_packet(
#mqtt_packet_header{type = ?CONNECT},
Expand Down Expand Up @@ -366,7 +366,7 @@ parse_packet(
Bin,
#{strict_mode := StrictMode, version := Ver}
) ->
{TopicName, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_topic),
{TopicName, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_topic),
{PacketId, Rest1} =
case QoS of
?QOS_0 -> {undefined, Rest};
Expand Down Expand Up @@ -478,7 +478,7 @@ parse_packet(
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode),
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties};
parse_packet(Header, _FrameBin, _Options) ->
?PARSE_ERR(#{hint => malformed_packet, header_type => Header#mqtt_packet_header.type}).
?PARSE_ERR(#{cause => malformed_packet, header_type => Header#mqtt_packet_header.type}).

parse_will_message(
Packet = #mqtt_packet_connect{
Expand All @@ -489,7 +489,7 @@ parse_will_message(
StrictMode
) ->
{Props, Rest} = parse_properties(Bin, Ver, StrictMode),
{Topic, Rest1} = parse_utf8_string_with_hint(Rest, StrictMode, invalid_topic),
{Topic, Rest1} = parse_utf8_string_with_cause(Rest, StrictMode, invalid_topic),
{Payload, Rest2} = parse_will_payload(Rest1),
{
Packet#mqtt_packet_connect{
Expand Down Expand Up @@ -522,7 +522,7 @@ parse_properties(Bin, ?MQTT_PROTO_V5, StrictMode) ->
{parse_property(PropsBin, #{}, StrictMode), Rest1};
_ ->
?PARSE_ERR(#{
hint => user_property_not_enough_bytes,
cause => user_property_not_enough_bytes,
parsed_key_length => Len,
remaining_bytes_length => byte_size(Rest)
})
Expand All @@ -535,10 +535,10 @@ parse_property(<<16#01, Val, Bin/binary>>, Props, StrictMode) ->
parse_property(<<16#02, Val:32/big, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}, StrictMode);
parse_property(<<16#03, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_content_type),
{Val, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_content_type),
parse_property(Rest, Props#{'Content-Type' => Val}, StrictMode);
parse_property(<<16#08, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_response_topic),
{Val, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_response_topic),
parse_property(Rest, Props#{'Response-Topic' => Val}, StrictMode);
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Correlation-Data' => Val}, StrictMode);
Expand All @@ -548,12 +548,12 @@ parse_property(<<16#0B, Bin/binary>>, Props, StrictMode) ->
parse_property(<<16#11, Val:32/big, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}, StrictMode);
parse_property(<<16#12, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_assigned_client_id),
{Val, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_assigned_client_id),
parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}, StrictMode);
parse_property(<<16#13, Val:16, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Server-Keep-Alive' => Val}, StrictMode);
parse_property(<<16#15, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_authn_method),
{Val, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_authn_method),
parse_property(Rest, Props#{'Authentication-Method' => Val}, StrictMode);
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Authentication-Data' => Val}, StrictMode);
Expand All @@ -564,13 +564,13 @@ parse_property(<<16#18, Val:32, Bin/binary>>, Props, StrictMode) ->
parse_property(<<16#19, Val, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Request-Response-Information' => Val}, StrictMode);
parse_property(<<16#1A, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_response_info),
{Val, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_response_info),
parse_property(Rest, Props#{'Response-Information' => Val}, StrictMode);
parse_property(<<16#1C, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_server_reference),
{Val, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_server_reference),
parse_property(Rest, Props#{'Server-Reference' => Val}, StrictMode);
parse_property(<<16#1F, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_reason_string),
{Val, Rest} = parse_utf8_string_with_cause(Bin, StrictMode, invalid_reason_string),
parse_property(Rest, Props#{'Reason-String' => Val}, StrictMode);
parse_property(<<16#21, Val:16/big, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Receive-Maximum' => Val}, StrictMode);
Expand Down Expand Up @@ -639,7 +639,7 @@ parse_utf8_pair(<<LenK:16/big, Rest/binary>>, _StrictMode) when
LenK > byte_size(Rest)
->
?PARSE_ERR(#{
hint => user_property_not_enough_bytes,
cause => user_property_not_enough_bytes,
parsed_key_length => LenK,
remaining_bytes_length => byte_size(Rest)
});
Expand All @@ -648,7 +648,7 @@ parse_utf8_pair(<<LenK:16/big, _Key:LenK/binary, LenV:16/big, Rest/binary>>, _St
LenV > byte_size(Rest)
->
?PARSE_ERR(#{
hint => malformed_user_property_value,
cause => malformed_user_property_value,
parsed_key_length => LenK,
parsed_value_length => LenV,
remaining_bytes_length => byte_size(Rest)
Expand All @@ -657,16 +657,16 @@ parse_utf8_pair(Bin, _StrictMode) when
4 > byte_size(Bin)
->
?PARSE_ERR(#{
hint => user_property_not_enough_bytes,
cause => user_property_not_enough_bytes,
total_bytes => byte_size(Bin)
}).

parse_utf8_string_with_hint(Bin, StrictMode, Hint) ->
parse_utf8_string_with_cause(Bin, StrictMode, Cause) ->
try
parse_utf8_string(Bin, StrictMode)
catch
throw:{?FRAME_PARSE_ERROR, Reason} when is_map(Reason) ->
?PARSE_ERR(Reason#{hint => Hint})
?PARSE_ERR(Reason#{cause => Cause})
end.

parse_optional(Bin, F, true) ->
Expand All @@ -682,7 +682,7 @@ parse_utf8_string(<<Len:16/big, Rest/binary>>, _) when
Len > byte_size(Rest)
->
?PARSE_ERR(#{
hint => malformed_utf8_string,
cause => malformed_utf8_string,
parsed_length => Len,
remaining_bytes_length => byte_size(Rest)
});
Expand All @@ -697,15 +697,15 @@ parse_will_payload(<<Len:16/big, Rest/binary>>) when
Len > byte_size(Rest)
->
?PARSE_ERR(#{
hint => malformed_will_payload,
cause => malformed_will_payload,
parsed_length => Len,
remaining_bytes => byte_size(Rest)
});
parse_will_payload(Bin) when
2 > byte_size(Bin)
->
?PARSE_ERR(#{
hint => malformed_will_payload,
cause => malformed_will_payload,
length_bytes => size(Bin),
expected_bytes => 2
}).
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx/src/emqx_packet.erl
Expand Up @@ -493,8 +493,8 @@ format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, Pa
"" -> [HeaderIO, ")"];
VarIO -> [HeaderIO, ", ", VarIO, ")"]
end;
%% receive a frame error packet, such as {frame_error,#{hint := frame_too_large}} or
%% {frame_error,#{expected => <<"'MQTT' or 'MQIsdp'">>,hint => invalid_proto_name,received => <<"bad_name">>}}
%% receive a frame error packet, such as {frame_error,#{cause := frame_too_large}} or
%% {frame_error,#{expected => <<"'MQTT' or 'MQIsdp'">>,cause => invalid_proto_name,received => <<"bad_name">>}}
format(FrameError, _PayloadEncode) ->
lists:flatten(io_lib:format("~tp", [FrameError])).

Expand Down
16 changes: 9 additions & 7 deletions apps/emqx/test/emqx_channel_SUITE.erl
Expand Up @@ -427,30 +427,32 @@ t_handle_in_auth(_) ->

t_handle_in_frame_error(_) ->
IdleChannel = channel(#{conn_state => idle}),
{shutdown, #{shutdown_count := frame_too_large, hint := frame_too_large}, _Chan} =
emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, IdleChannel),
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} =
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel),
ConnectingChan = channel(#{conn_state => connecting}),
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
{shutdown,
#{
shutdown_count := frame_too_large,
hint := frame_too_large,
cause := frame_too_large,
limit := 100,
received := 101
},
ConnackPacket,
_} =
emqx_channel:handle_in(
{frame_error, #{reason => frame_too_large, received => 101, limit => 100}},
{frame_error, #{cause => frame_too_large, received => 101, limit => 100}},
ConnectingChan
),
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
ConnectedChan = channel(#{conn_state => connected}),
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} =
emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, ConnectedChan),
?assertMatch(
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _},
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, ConnectedChan)
),
DisconnectedChan = channel(#{conn_state => disconnected}),
{ok, DisconnectedChan} =
emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, DisconnectedChan).
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, DisconnectedChan).

t_handle_in_expected_packet(_) ->
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
Expand Down
20 changes: 11 additions & 9 deletions apps/emqx/test/emqx_frame_SUITE.erl
Expand Up @@ -139,8 +139,8 @@ t_parse_cont(_) ->

t_parse_frame_too_large(_) ->
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
?ASSERT_FRAME_THROW(#{hint := frame_too_large}, parse_serialize(Packet, #{max_size => 256})),
?ASSERT_FRAME_THROW(#{hint := frame_too_large}, parse_serialize(Packet, #{max_size => 512})),
?ASSERT_FRAME_THROW(#{cause := frame_too_large}, parse_serialize(Packet, #{max_size => 256})),
?ASSERT_FRAME_THROW(#{cause := frame_too_large}, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).

t_parse_frame_malformed_variable_byte_integer(_) ->
Expand Down Expand Up @@ -298,13 +298,13 @@ t_serialize_parse_connect_with_malformed_will(_) ->
%% too short
BadBin1 = <<16, 45, Body/binary, 0>>,
?ASSERT_FRAME_THROW(
#{hint := malformed_will_payload, length_bytes := 1, expected_bytes := 2},
#{cause := malformed_will_payload, length_bytes := 1, expected_bytes := 2},
emqx_frame:parse(BadBin1)
),
%% too long
BadBin2 = <<16, 47, Body/binary, 0, 2, 0>>,
?ASSERT_FRAME_THROW(
#{hint := malformed_will_payload, parsed_length := 2, remaining_bytes := 1},
#{cause := malformed_will_payload, parsed_length := 2, remaining_bytes := 1},
emqx_frame:parse(BadBin2)
),
ok.
Expand Down Expand Up @@ -617,7 +617,7 @@ t_serialize_parse_pingresp(_) ->
Packet = serialize_to_binary(PingResp),
?assertException(
throw,
{frame_parse_error, #{hint := unexpected_packet, header_type := 'PINGRESP'}},
{frame_parse_error, #{cause := unexpected_packet, header_type := 'PINGRESP'}},
emqx_frame:parse(Packet)
).

Expand Down Expand Up @@ -664,7 +664,9 @@ t_serialize_parse_auth_v5(_) ->

t_parse_invalid_remaining_len(_) ->
?assertException(
throw, {frame_parse_error, #{hint := zero_remaining_len}}, emqx_frame:parse(<<?CONNECT, 0>>)
throw,
{frame_parse_error, #{cause := zero_remaining_len}},
emqx_frame:parse(<<?CONNECT, 0>>)
).

t_parse_malformed_properties(_) ->
Expand All @@ -676,13 +678,13 @@ t_parse_malformed_properties(_) ->

t_malformed_connect_header(_) ->
?ASSERT_FRAME_THROW(
#{hint := malformed_connect, header_bytes := _},
#{cause := malformed_connect, header_bytes := _},
emqx_frame:parse(<<16, 11, 0, 6, 77, 81, 73, 115, 100, 112, 3, 130, 1, 6>>)
).

t_malformed_connect_data(_) ->
?ASSERT_FRAME_THROW(
#{hint := malformed_connect, unexpected_trailing_bytes := _},
#{cause := malformed_connect, unexpected_trailing_bytes := _},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>)
).

Expand All @@ -696,7 +698,7 @@ t_reserved_connect_flag(_) ->
t_invalid_clientid(_) ->
?assertException(
throw,
{frame_parse_error, #{hint := invalid_clientid}},
{frame_parse_error, #{cause := invalid_clientid}},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 1, 0, 0>>)
).

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/test/emqx_ws_connection_SUITE.erl
Expand Up @@ -532,7 +532,7 @@ t_parse_incoming_frame_error(_) ->
{incoming,
{frame_error, #{
header_type := _,
hint := malformed_packet
cause := malformed_packet
}}}
],
Packets
Expand Down

0 comments on commit 8b0e15e

Please sign in to comment.