Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in topic alias maximum #2074

Merged
merged 4 commits into from Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 95 additions & 54 deletions src/emqx_protocol.erl
Expand Up @@ -64,7 +64,8 @@
send_stats,
connected,
connected_at,
ignore_loop
ignore_loop,
topic_alias_maximum
}).

-type(state() :: #pstate{}).
Expand All @@ -84,28 +85,29 @@
-spec(init(map(), list()) -> state()).
init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
Zone = proplists:get_value(zone, Options),
#pstate{zone = Zone,
sendfun = SendFun,
peername = Peername,
peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}.
#pstate{zone = Zone,
sendfun = SendFun,
peername = Peername,
peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
topic_alias_maximum = #{to_client => 0, from_client => 0}}.

init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of
Expand Down Expand Up @@ -212,12 +214,16 @@ received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
{error, proto_unexpected_connect, PState};

received(Packet = ?PACKET(Type), PState) ->
PState1 = set_protover(Packet, PState),
PState1 = set_protover(Packet, PState),
trace(recv, Packet),
try emqx_packet:validate(Packet) of
true ->
{Packet1, PState2} = preprocess_properties(Packet, PState1),
process_packet(Packet1, inc_stats(recv, Type, PState2))
case preprocess_properties(Packet, PState1) of
{error, ReasonCode} ->
{error, ReasonCode, PState1};
{Packet1, PState2} ->
process_packet(Packet1, inc_stats(recv, Type, PState2))
end
catch
error : protocol_error ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState1),
Expand All @@ -242,6 +248,13 @@ received(Packet = ?PACKET(Type), PState) ->
%%------------------------------------------------------------------------------
%% Preprocess MQTT Properties
%%------------------------------------------------------------------------------
preprocess_properties(Packet = #mqtt_packet{
variable = #mqtt_packet_connect{
properties = #{'Topic-Alias-Maximum' := ToClient}
}
},
PState = #pstate{topic_alias_maximum = TopicAliasMaximum}) ->
{Packet, PState#pstate{topic_alias_maximum = TopicAliasMaximum#{to_client => ToClient}}};

%% Subscription Identifier
preprocess_properties(Packet = #mqtt_packet{
Expand All @@ -255,30 +268,53 @@ preprocess_properties(Packet = #mqtt_packet{
{Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState};

%% Topic Alias Mapping
preprocess_properties(#mqtt_packet{
variable = #mqtt_packet_publish{
properties = #{'Topic-Alias' := 0}}
},
PState) ->
deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
{error, ?RC_TOPIC_ALIAS_INVALID};

preprocess_properties(Packet = #mqtt_packet{
variable = Publish = #mqtt_packet_publish{
topic_name = <<>>,
properties = #{'Topic-Alias' := AliasId}}
},
PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
{Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{
topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState};
PState = #pstate{proto_ver = ?MQTT_PROTO_V5,
topic_aliases = Aliases,
topic_alias_maximum = #{from_client := TopicAliasMaximum}}) ->
case AliasId =< TopicAliasMaximum of
true ->
{Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{
topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState};
false ->
deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
{error, ?RC_TOPIC_ALIAS_INVALID}
end;

preprocess_properties(Packet = #mqtt_packet{
variable = #mqtt_packet_publish{
topic_name = Topic,
properties = #{'Topic-Alias' := AliasId}}
},
PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
{Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}};
variable = #mqtt_packet_publish{
topic_name = Topic,
properties = #{'Topic-Alias' := AliasId}}
},
PState = #pstate{proto_ver = ?MQTT_PROTO_V5,
topic_aliases = Aliases,
topic_alias_maximum = #{from_client := TopicAliasMaximum}}) ->
case AliasId =< TopicAliasMaximum of
true ->
{Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}};
false ->
deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
{error, ?RC_TOPIC_ALIAS_INVALID}
end;

preprocess_properties(Packet, PState) ->
{Packet, PState}.

%%------------------------------------------------------------------------------
%% Process MQTT Packet
%%------------------------------------------------------------------------------

process_packet(?CONNECT_PACKET(
#mqtt_packet_connect{proto_name = ProtoName,
proto_ver = ProtoVer,
Expand Down Expand Up @@ -308,6 +344,7 @@ process_packet(?CONNECT_PACKET(
will_msg = WillMsg,
is_bridge = IsBridge,
connected_at = os:timestamp()}),

connack(
case check_connect(Connect, PState1) of
{ok, PState2} ->
Expand Down Expand Up @@ -342,9 +379,6 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
{error, ?RC_TOPIC_ALIAS_INVALID} ->
?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID]),
{error, ?RC_TOPIC_ALIAS_INVALID, PState};
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
Expand Down Expand Up @@ -523,7 +557,8 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
proto_ver = ?MQTT_PROTO_V5,
client_id = ClientId,
conn_props = ConnProps,
is_assigned = IsAssigned}) ->
is_assigned = IsAssigned,
topic_alias_maximum = TopicAliasMaximum}) ->
ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of
{ok, 1} ->
iolist_to_binary(emqx_config:get_env(response_topic_prefix));
Expand Down Expand Up @@ -561,17 +596,30 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
undefined -> Props2;
Keepalive -> Props2#{'Server-Keep-Alive' => Keepalive}
end,
send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState);

PState1 = PState#pstate{topic_alias_maximum = TopicAliasMaximum#{from_client => MaxAlias}},

send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState1);

deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);

deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, Msg2), PState);

deliver({publish, PacketId, Msg = #message{headers = Headers}},
PState = #pstate{topic_alias_maximum = #{to_client := TopicAliasMaximum},
mountpoint = MountPoint}) ->
TopicAlias = maps:get('Topic-Alias', Headers, undefined),
case TopicAlias =:= undefined
orelse (TopicAlias =/= 0 andalso TopicAlias =< TopicAliasMaximum) of
true ->
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, Msg2), PState);
false ->
deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
{error, topic_alias_invalid}
end;

deliver({puback, PacketId, ReasonCode}, PState) ->
send(?PUBACK_PACKET(PacketId, ReasonCode), PState);

Expand Down Expand Up @@ -758,18 +806,11 @@ check_publish(Packet, PState) ->
run_check_steps([fun check_pub_caps/2,
fun check_pub_acl/2], Packet, PState).

check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain},
variable = #mqtt_packet_publish{
properties = #{'Topic-Alias' := TopicAlias}
}},
#pstate{zone = Zone}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic_alias => TopicAlias});
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain},
variable = #mqtt_packet_publish{ properties = _Properties}},
#pstate{zone = Zone}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).


check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
ok;
Expand Down
107 changes: 107 additions & 0 deletions test/emqx_protocol_SUITE.erl
Expand Up @@ -55,6 +55,7 @@ groups() ->

init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_zone:set_env(external, max_topic_alias, 20),
Config.

end_per_suite(_Config) ->
Expand Down Expand Up @@ -154,6 +155,112 @@ connect_v5(_) ->
#{'Response-Information' := _RespInfo}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),

% topic alias = 0
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Topic-Alias-Maximum' => 10}}),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Topic-Alias-Maximum' := 20}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),

emqx_client_sock:send(Sock,
raw_send_serialize(
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 0}, <<"hello">>),
#{version => ?MQTT_PROTO_V5}
)),

{ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5)
end),

% topic alias maximum
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Topic-Alias-Maximum' => 10}}),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Topic-Alias-Maximum' := 20}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),

emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),

{ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),

emqx_client_sock:send(Sock,
raw_send_serialize(
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 9}, <<"hello">>),
#{version => ?MQTT_PROTO_V5}
)),

{ok, Data3} = gen_tcp:recv(Sock, 0),

{ok, ?PUBACK_PACKET(1, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),

{ok, Data4} = gen_tcp:recv(Sock, 0),

{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5),

emqx_client_sock:send(Sock,
raw_send_serialize(
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 2, #{'Topic-Alias' => 18}, <<"hello">>),
#{version => ?MQTT_PROTO_V5}
)),

{ok, Data5} = gen_tcp:recv(Sock, 0),

{ok, ?PUBACK_PACKET(2, 0), _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5),

{ok, Data6} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data6, ?MQTT_PROTO_V5)
end),

% topic alias maximum 2
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Topic-Alias-Maximum' => 10}}),
#{version => ?MQTT_PROTO_V5}
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Topic-Alias-Maximum' := 20}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),

emqx_client_sock:send(Sock,
raw_send_serialize(
?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 21}, <<"hello">>),
#{version => ?MQTT_PROTO_V5}
)),

{ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5)
end),

% test clean start
with_connection(fun([Sock]) ->
Expand Down