Skip to content

Commit

Permalink
Fix issue#1874 (#1964)
Browse files Browse the repository at this point in the history
* Fix issue#1874
Prior to this change, if user use one client connect emqx with mqtt
v3.1.1, the client subscribe the topic and publish message to this
topic, it would receive this message itself published, this commit
provide a configure option to let user ignore the message itself published.

This change fix issue 1874.

* Small Fix

* Fix bug

* Better design

* Fix compile warning and improve coverage

* Better design to solve the performance issue

* Fix typo

* Fix typo

* Delete spaces in end of lines.

* Do not use anonymous function

* Better performance
  • Loading branch information
Gilbert authored and turtleDeng committed Nov 19, 2018
1 parent bc1464a commit 1682149
Show file tree
Hide file tree
Showing 19 changed files with 68 additions and 35 deletions.
5 changes: 5 additions & 0 deletions etc/emqx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ mqtt.wildcard_subscription = true
## Value: boolean
mqtt.shared_subscription = true

## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
##
## Value: true | false
mqtt.ignore_loop_deliver = false

##--------------------------------------------------------------------
## Zones
##--------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions priv/emqx.schema
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,12 @@ end}.
{datatype, {enum, [true, false]}}
]}.

%% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
{mapping, "mqtt.ignore_loop_deliver", "emqx.mqtt_ignore_loop_deliver", [
{default, true},
{datatype, {enum, [true, false]}}
]}.

%%--------------------------------------------------------------------
%% Zones
%%--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ aggre(Routes) ->
lists:foldl(
fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
[{To, Node} | Acc];
(#route{topic = To, dest = {Group, _Node}}, Acc) ->
(#route{topic = To, dest = {Group, _Node}}, Acc) ->
lists:usort([{To, Group} | Acc])
end, [], Routes).

Expand Down
2 changes: 1 addition & 1 deletion src/emqx_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ populate(_App) ->
%% @doc Read the configuration of an application.
-spec(read(atom()) -> {ok, list(env())} | {error, term()}).
read(App) ->
%% TODO:
%% TODO:
%% 1. Read the app.conf from etc folder
%% 2. Cuttlefish to read the conf
%% 3. Return the terms and schema
Expand Down
2 changes: 1 addition & 1 deletion src/emqx_plugins.erl
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ find_plugin(Name) ->
find_plugin(Name, list()).

find_plugin(Name, Plugins) ->
lists:keyfind(Name, 2, Plugins).
lists:keyfind(Name, 2, Plugins).

%% @doc UnLoad a Plugin
-spec(unload(atom()) -> ok | {error, term()}).
Expand Down
23 changes: 14 additions & 9 deletions src/emqx_protocol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@
recv_stats,
send_stats,
connected,
connected_at
connected_at,
ignore_loop
}).

-type(state() :: #pstate{}).
-export_type([state/0]).

-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.

-define(NO_PROPS, undefined).
Expand Down Expand Up @@ -102,7 +104,8 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false}.
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}.

init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of
Expand Down Expand Up @@ -385,11 +388,14 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session =
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};

process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) ->
PState = #pstate{session = SPid, mountpoint = Mountpoint,
proto_ver = ProtoVer, is_bridge = IsBridge,
ignore_loop = IgnoreLoop}) ->
RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
case IsBridge of
true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters]
true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
end;
true ->
RawTopicFilters
Expand Down Expand Up @@ -626,7 +632,6 @@ try_open_session(PState = #pstate{zone = Zone,
clean_start => CleanStart,
will_msg => WillMsg
},

SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
case emqx_sm:open_session(SessAttrs1) of
{ok, SPid} ->
Expand Down Expand Up @@ -685,12 +690,12 @@ get_property(Name, Props, Default) ->
maps:get(Name, Props, Default).

make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer,
will_props = WillProps} = Connect) ->
emqx_packet:will_msg(if
will_props = WillProps} = Connect) ->
emqx_packet:will_msg(if
ProtoVer =:= ?MQTT_PROTO_V5 ->
WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
true ->
true ->
Connect
end).

Expand Down
5 changes: 4 additions & 1 deletion src/emqx_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
will_msg :: emqx:message(),

will_delay_timer :: reference() | undefined

}).

-type(spid() :: pid()).
Expand Down Expand Up @@ -575,7 +576,8 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->

%% Dispatch message
handle_info({dispatch, Topic, Msg = #message{headers = Headers}},
State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) ->
State = #state{subscriptions = SubMap,
topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) ->
TopicAlias = maps:get('Topic-Alias', Headers, undefined),
if
TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum ->
Expand All @@ -591,6 +593,7 @@ handle_info({dispatch, Topic, Msg = #message{headers = Headers}},
noreply(State)
end;


%% Do nothing if the client has been disconnected.
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
noreply(State#state{retry_timer = undefined});
Expand Down
2 changes: 1 addition & 1 deletion src/emqx_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
end,
emqx_sm_locker:trans(ClientId, CleanStart);

open_session(SessAttrs = #{clean_start := false,
open_session(SessAttrs = #{clean_start := false,
client_id := ClientId}) ->
ResumeStart = fun(_) ->
case resume_session(ClientId, SessAttrs) of
Expand Down
1 change: 1 addition & 0 deletions src/emqx_ws_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ websocket_info(Info, State) ->
terminate(SockError, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->

?LOG(debug, "Terminated for ~p, sockerror: ~p",
[Shutdown, SockError]),
emqx_keepalive:cancel(Keepalive),
Expand Down
2 changes: 1 addition & 1 deletion test/emqx_broker_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ subscribe_unsubscribe(_) ->
ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
true = emqx:subscribed(<<"topic">>, <<"clientId">>),
Topics = emqx:topics(),
lists:foreach(fun(Topic) ->
lists:foreach(fun(Topic) ->
?assert(lists:member(Topic, Topics))
end, Topics),
ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
Expand Down
3 changes: 1 addition & 2 deletions test/emqx_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]);
Other ->
ct:log("~p~n", [Other]),
_Other ->
receive_messages(Count, Msgs)
after 10 ->
Msgs
Expand Down
2 changes: 1 addition & 1 deletion test/emqx_mod_sup_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

all() -> [t_child_all].

t_child_all(_) ->
t_child_all(_) ->
{ok, _Pid} = emqx_mod_sup:start_link(),
{ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker),
timer:sleep(10),
Expand Down
4 changes: 2 additions & 2 deletions test/emqx_mqtt_caps_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ t_check_sub(_) ->
[{<<"client/stat">>, Opts}],
[{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]),

ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false},
[{<<"vlient/+/dsofi">>, Opts}],
ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false},
[{<<"vlient/+/dsofi">>, Opts}],
[{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]),
emqx_zone:stop().

Expand Down
2 changes: 1 addition & 1 deletion test/emqx_mqtt_packet_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

all() -> [{group, connect}].

groups() -> [{connect, [sequence],
groups() -> [{connect, [sequence],
[case1_protocol_name,
case2_protocol_ver%,
%TOTO case3_invalid_reserved
Expand Down
2 changes: 1 addition & 1 deletion test/emqx_pqueue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ t_priority_queues(_) ->
PQueue6 = ?PQ:in(f, 1, PQueue5),

{{value, e}, PQueue7} = ?PQ:out(PQueue6),
{empty, _} = ?PQ:out(0, ?PQ:new()),
{empty, _} = ?PQ:out(0, ?PQ:new()),

{empty, Q0} = ?PQ:out_p(Q0),

Expand Down
2 changes: 1 addition & 1 deletion test/emqx_protocol_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ connect_v5(_) ->
raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),

% test clean start
% test clean start
with_connection(fun(Sock) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
Expand Down
17 changes: 16 additions & 1 deletion test/emqx_session_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("eunit/include/eunit.hrl").

-include_lib("common_test/include/ct.hrl").

all() -> [t_session_all].
all() -> [ignore_loop, t_session_all].

init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Expand All @@ -29,6 +31,19 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().

ignore_loop(_Config) ->
application:set_env(emqx, mqtt_ignore_loop_deliver, true),
{ok, Client} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(Client),
TestTopic = <<"Self">>,
{ok, _, [2]} = emqx_client:subscribe(Client, TestTopic, qos2),
ok = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 0),
{ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 1),
{ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2),
?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))),
ok = emqx_client:disconnect(Client),
application:set_env(emqx, mqtt_ignore_loop_deliver, false).

t_session_all(_) ->
ClientId = <<"ClientId">>,
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
Expand Down
13 changes: 6 additions & 7 deletions test/emqx_sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ all() -> [t_open_close_session].
t_open_close_session(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
Attrs = #{clean_start => true,
client_id => <<"client">>,
Attrs = #{clean_start => true,
client_id => <<"client">>,
conn_pid => ClientPid,
zone => internal,
username => <<"zhou">>,
expiry_interval => 0,
max_inflight => 0,
zone => internal,
username => <<"emqx">>,
expiry_interval => 0,
max_inflight => 0,
topic_alias_maximum => 0,
will_msg => undefined},
{ok, SPid} = emqx_sm:open_session(Attrs),
Expand All @@ -47,4 +47,3 @@ t_open_close_session(_) ->
ok = emqx_sm:close_session(SPid),
[] = emqx_sm:lookup_session(<<"client">>),
emqx_ct_broker_helpers:run_teardown_steps().

8 changes: 4 additions & 4 deletions test/emqx_vm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@


all() ->
[load, systeminfo, mem_info, process_list, process_info, process_gc,
[load, systeminfo, mem_info, process_list, process_info, process_gc,
get_ets_list, get_ets_info, get_ets_object, get_port_types, get_port_info,
scheduler_usage, get_memory, microsecs, schedulers, get_process_group_leader_info,
get_process_limit].
Expand All @@ -121,13 +121,13 @@ process_list(_Config) ->
true = lists:member({pid, Pid}, lists:concat(ProcessInfo)).

process_info(_Config) ->
ProcessInfos = emqx_vm:get_process_info(),
ProcessInfos = emqx_vm:get_process_info(),
ProcessInfo = lists:last(ProcessInfos),
Keys = [K || {K, _V}<- ProcessInfo],
?PROCESS_INFO = Keys.

process_gc(_Config) ->
ProcessGcs = emqx_vm:get_process_gc(),
ProcessGcs = emqx_vm:get_process_gc(),
ProcessGc = lists:last(ProcessGcs),
Keys = [K || {K, _V}<- ProcessGc],
?PROCESS_GC = Keys.
Expand All @@ -137,7 +137,7 @@ get_ets_list(_Config) ->
Ets = emqx_vm:get_ets_list(),
true = lists:member(test, Ets).

get_ets_info(_Config) ->
get_ets_info(_Config) ->
ets:new(test, [named_table]),
[] = emqx_vm:get_ets_info(test1),
EtsInfo = emqx_vm:get_ets_info(test),
Expand Down

0 comments on commit 1682149

Please sign in to comment.