diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8f845a94ef..d3e0e9f974 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -64,7 +64,8 @@ send_stats, connected, connected_at, - ignore_loop + ignore_loop, + topic_alias_maximum }). -type(state() :: #pstate{}). @@ -84,28 +85,29 @@ -spec(init(map(), list()) -> state()). init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), - #pstate{zone = Zone, - sendfun = SendFun, - peername = Peername, - peercert = Peercert, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - client_id = <<>>, - is_assigned = false, - conn_pid = self(), - username = init_username(Peercert, Options), - is_super = false, - clean_start = false, - topic_aliases = #{}, - packet_size = emqx_zone:get_env(Zone, max_packet_size), - mountpoint = emqx_zone:get_env(Zone, mountpoint), - is_bridge = false, - enable_ban = emqx_zone:get_env(Zone, enable_ban, false), - enable_acl = emqx_zone:get_env(Zone, enable_acl), - recv_stats = #{msg => 0, pkt => 0}, - send_stats = #{msg => 0, pkt => 0}, - connected = false, - ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}. + #pstate{zone = Zone, + sendfun = SendFun, + peername = Peername, + peercert = Peercert, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + is_assigned = false, + conn_pid = self(), + username = init_username(Peercert, Options), + is_super = false, + clean_start = false, + topic_aliases = #{}, + packet_size = emqx_zone:get_env(Zone, max_packet_size), + mountpoint = emqx_zone:get_env(Zone, mountpoint), + is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), + enable_acl = emqx_zone:get_env(Zone, enable_acl), + recv_stats = #{msg => 0, pkt => 0}, + send_stats = #{msg => 0, pkt => 0}, + connected = false, + ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false), + topic_alias_maximum = #{to_client => 0, from_client => 0}}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -212,12 +214,16 @@ received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> {error, proto_unexpected_connect, PState}; received(Packet = ?PACKET(Type), PState) -> - PState1 = set_protover(Packet, PState), + PState1 = set_protover(Packet, PState), trace(recv, Packet), try emqx_packet:validate(Packet) of true -> - {Packet1, PState2} = preprocess_properties(Packet, PState1), - process_packet(Packet1, inc_stats(recv, Type, PState2)) + case preprocess_properties(Packet, PState1) of + {error, ReasonCode} -> + {error, ReasonCode, PState1}; + {Packet1, PState2} -> + process_packet(Packet1, inc_stats(recv, Type, PState2)) + end catch error : protocol_error -> deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState1), @@ -242,6 +248,13 @@ received(Packet = ?PACKET(Type), PState) -> %%------------------------------------------------------------------------------ %% Preprocess MQTT Properties %%------------------------------------------------------------------------------ +preprocess_properties(Packet = #mqtt_packet{ + variable = #mqtt_packet_connect{ + properties = #{'Topic-Alias-Maximum' := ToClient} + } + }, + PState = #pstate{topic_alias_maximum = TopicAliasMaximum}) -> + {Packet, PState#pstate{topic_alias_maximum = TopicAliasMaximum#{to_client => ToClient}}}; %% Subscription Identifier preprocess_properties(Packet = #mqtt_packet{ @@ -255,22 +268,46 @@ preprocess_properties(Packet = #mqtt_packet{ {Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState}; %% Topic Alias Mapping +preprocess_properties(#mqtt_packet{ + variable = #mqtt_packet_publish{ + properties = #{'Topic-Alias' := 0}} + }, + PState) -> + deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState), + {error, ?RC_TOPIC_ALIAS_INVALID}; + preprocess_properties(Packet = #mqtt_packet{ variable = Publish = #mqtt_packet_publish{ topic_name = <<>>, properties = #{'Topic-Alias' := AliasId}} }, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> - {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{ - topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState}; + PState = #pstate{proto_ver = ?MQTT_PROTO_V5, + topic_aliases = Aliases, + topic_alias_maximum = #{from_client := TopicAliasMaximum}}) -> + case AliasId =< TopicAliasMaximum of + true -> + {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{ + topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState}; + false -> + deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState), + {error, ?RC_TOPIC_ALIAS_INVALID} + end; preprocess_properties(Packet = #mqtt_packet{ - variable = #mqtt_packet_publish{ - topic_name = Topic, - properties = #{'Topic-Alias' := AliasId}} - }, - PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) -> - {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; + variable = #mqtt_packet_publish{ + topic_name = Topic, + properties = #{'Topic-Alias' := AliasId}} + }, + PState = #pstate{proto_ver = ?MQTT_PROTO_V5, + topic_aliases = Aliases, + topic_alias_maximum = #{from_client := TopicAliasMaximum}}) -> + case AliasId =< TopicAliasMaximum of + true -> + {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}}; + false -> + deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState), + {error, ?RC_TOPIC_ALIAS_INVALID} + end; preprocess_properties(Packet, PState) -> {Packet, PState}. @@ -278,7 +315,6 @@ preprocess_properties(Packet, PState) -> %%------------------------------------------------------------------------------ %% Process MQTT Packet %%------------------------------------------------------------------------------ - process_packet(?CONNECT_PACKET( #mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, @@ -308,6 +344,7 @@ process_packet(?CONNECT_PACKET( will_msg = WillMsg, is_bridge = IsBridge, connected_at = os:timestamp()}), + connack( case check_connect(Connect, PState1) of {ok, PState2} -> @@ -342,9 +379,6 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); - {error, ?RC_TOPIC_ALIAS_INVALID} -> - ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID]), - {error, ?RC_TOPIC_ALIAS_INVALID, PState}; {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), @@ -523,7 +557,8 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, proto_ver = ?MQTT_PROTO_V5, client_id = ClientId, conn_props = ConnProps, - is_assigned = IsAssigned}) -> + is_assigned = IsAssigned, + topic_alias_maximum = TopicAliasMaximum}) -> ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of {ok, 1} -> iolist_to_binary(emqx_config:get_env(response_topic_prefix)); @@ -561,17 +596,20 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, undefined -> Props2; Keepalive -> Props2#{'Server-Keep-Alive' => Keepalive} end, - send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState); + + PState1 = PState#pstate{topic_alias_maximum = TopicAliasMaximum#{from_client => MaxAlias}}, + + send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState1); deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> +deliver({publish, PacketId, Msg = #message{headers = Headers}}, PState = #pstate{mountpoint = MountPoint}) -> _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg), Msg1 = emqx_message:update_expiry(Msg), Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), - send(emqx_packet:from_message(PacketId, Msg2), PState); - + send(emqx_packet:from_message(PacketId, Msg2#message{headers = maps:remove('Topic-Alias', Headers)}), PState); + deliver({puback, PacketId, ReasonCode}, PState) -> send(?PUBACK_PACKET(PacketId, ReasonCode), PState); @@ -758,18 +796,11 @@ check_publish(Packet, PState) -> run_check_steps([fun check_pub_caps/2, fun check_pub_acl/2], Packet, PState). -check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain}, - variable = #mqtt_packet_publish{ - properties = #{'Topic-Alias' := TopicAlias} - }}, - #pstate{zone = Zone}) -> - emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic_alias => TopicAlias}); check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{ properties = _Properties}}, #pstate{zone = Zone}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). - check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> ok; diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index ae308ea420..27137d9565 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -55,6 +55,7 @@ groups() -> init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), + emqx_zone:set_env(external, max_topic_alias, 20), Config. end_per_suite(_Config) -> @@ -154,6 +155,82 @@ connect_v5(_) -> #{'Response-Information' := _RespInfo}), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), + + % topic alias = 0 + with_connection(fun([Sock]) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = + #{'Topic-Alias-Maximum' => 10}}), + #{version => ?MQTT_PROTO_V5} + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, + #{'Topic-Alias-Maximum' := 20}), _} = + raw_recv_parse(Data, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock, + raw_send_serialize( + ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 0}, <<"hello">>), + #{version => ?MQTT_PROTO_V5} + )), + + {ok, Data2} = gen_tcp:recv(Sock, 0), + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5) + end), + + % topic alias maximum + with_connection(fun([Sock]) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = + #{'Topic-Alias-Maximum' => 10}}), + #{version => ?MQTT_PROTO_V5} + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, + #{'Topic-Alias-Maximum' := 20}), _} = + raw_recv_parse(Data, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5})), + + {ok, Data2} = gen_tcp:recv(Sock, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock, + raw_send_serialize( + ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 1, #{'Topic-Alias' => 15}, <<"hello">>), + #{version => ?MQTT_PROTO_V5} + )), + + {ok, Data3} = gen_tcp:recv(Sock, 0), + + {ok, ?PUBACK_PACKET(1, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + + {ok, Data4} = gen_tcp:recv(Sock, 0), + + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"hello">>), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock, + raw_send_serialize( + ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, 2, #{'Topic-Alias' => 21}, <<"hello">>), + #{version => ?MQTT_PROTO_V5} + )), + + {ok, Data5} = gen_tcp:recv(Sock, 0), + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_ALIAS_INVALID), _} = raw_recv_parse(Data5, ?MQTT_PROTO_V5) + end), % test clean start with_connection(fun([Sock]) ->