Skip to content

Commit

Permalink
Merge pull request #9104 from zmstone/1005-fix-shared-sub-dispatch
Browse files Browse the repository at this point in the history
fix(shared): re-dispatch inflight (QoS1) and mqueue messages
  • Loading branch information
zmstone committed Oct 9, 2022
2 parents d600c87 + 1c29e28 commit 63774ba
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGES-5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Fix GET /listeners API crash When some nodes still in initial configuration. [#9002](https://github.com/emqx/emqx/pull/9002)
* Fix empty variable interpolation in authentication and authorization. Placeholders for undefined variables are rendered now as empty strings and do not cause errors anymore. [#8963](https://github.com/emqx/emqx/pull/8963)
* Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8986](https://github.com/emqx/emqx/pull/8986)
* Redispatch shared subscription messages. [#9104](https://github.com/emqx/emqx/pull/9104)

# 5.0.8

Expand Down
14 changes: 12 additions & 2 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,13 @@ maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).

not_nacked({deliver, _Topic, Msg}) ->
not (emqx_shared_sub:is_ack_required(Msg) andalso
(ok == emqx_shared_sub:nack_no_connection(Msg))).
case emqx_shared_sub:is_ack_required(Msg) of
true ->
ok = emqx_shared_sub:nack_no_connection(Msg),
false;
false ->
true
end.

maybe_mark_as_delivered(Session, Delivers) ->
case emqx_session:info(is_persistent, Session) of
Expand Down Expand Up @@ -1222,6 +1227,8 @@ handle_call(
ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(alive_timer, NChannel));
handle_call(get_mqueue, Channel) ->
reply({ok, get_mqueue(Channel)}, Channel);
handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel).
Expand Down Expand Up @@ -2224,3 +2231,6 @@ get_mqtt_conf(Zone, Key, Default) ->
set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
setelement(Pos + 1, Channel, Value).

get_mqueue(#channel{session = Session}) ->
emqx_session:get_mqueue(Session).
22 changes: 18 additions & 4 deletions apps/emqx/src/emqx_mqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
in/2,
out/1,
stats/1,
dropped/1
dropped/1,
to_list/1
]).

-define(NO_PRIORITY_TABLE, disabled).
Expand Down Expand Up @@ -109,7 +110,7 @@
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq(),
q = emqx_pqueue:new() :: pq(),
shift_opts :: #shift_opts{},
last_prio :: non_neg_integer() | undefined,
p_credit :: non_neg_integer() | undefined
Expand All @@ -118,15 +119,15 @@
-type mqueue() :: #mqueue{}.

-spec init(options()) -> mqueue().
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
init(Opts = #{max_len := MaxLen0, store_qos0 := Qos0}) ->
MaxLen =
case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
true -> MaxLen0;
false -> ?MAX_LEN_INFINITY
end,
#mqueue{
max_len = MaxLen,
store_qos0 = QoS_0,
store_qos0 = Qos0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
Expand All @@ -152,6 +153,19 @@ len(#mqueue{len = Len}) -> Len.

max_len(#mqueue{max_len = MaxLen}) -> MaxLen.

%% @doc Return all queued items in a list.
-spec to_list(mqueue()) -> list().
to_list(MQ) ->
to_list(MQ, []).

to_list(MQ, Acc) ->
case out(MQ) of
{empty, _MQ} ->
lists:reverse(Acc);
{{value, Msg}, Q1} ->
to_list(Q1, [Msg | Acc])
end.

%% @doc Return number of dropped messages.
-spec dropped(mqueue()) -> count().
dropped(#mqueue{dropped = Dropped}) -> Dropped.
Expand Down
46 changes: 21 additions & 25 deletions apps/emqx/src/emqx_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
info/2,
is_session/1,
stats/1,
obtain_next_pkt_id/1
obtain_next_pkt_id/1,
get_mqueue/1
]).

-export([
Expand Down Expand Up @@ -801,7 +802,8 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
-spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session),
redispatch_shared_messages(Session),
Reason =/= takenover andalso
redispatch_shared_messages(Session),
ok.

run_terminate_hooks(ClientInfo, discarded, Session) ->
Expand All @@ -811,29 +813,20 @@ run_terminate_hooks(ClientInfo, takenover, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).

redispatch_shared_messages(#session{inflight = Inflight}) ->
InflightList = emqx_inflight:to_list(Inflight),
lists:foreach(
fun
%% Only QoS1 messages get redispatched, because QoS2 messages
%% must be sent to the same client, once they're in flight
({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) ->
?SLOG(warning, #{msg => qos2_lost_no_redispatch}, #{message => Msg});
({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) ->
case emqx_shared_sub:get_group(Msg) of
{ok, Group} ->
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]);
_ ->
false
end;
(_) ->
ok
end,
InflightList
).
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight),
F = fun
({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) ->
%% For QoS 2, here is what the spec says:
%% If the Client's Session terminates before the Client reconnects,
%% the Server MUST NOT send the Application Message to any other
%% subscribed Client [MQTT-4.8.2-5].
{true, Msg};
({_PacketId, #inflight_data{}}) ->
false
end,
InflightList = lists:filtermap(F, AllInflights),
emqx_shared_sub:redispatch(InflightList ++ emqx_mqueue:to_list(Q)).

-compile({inline, [run_hook/2]}).
run_hook(Name, Args) ->
Expand Down Expand Up @@ -925,3 +918,6 @@ age(Now, Ts) -> Now - Ts.
set_field(Name, Value, Session) ->
Pos = emqx_misc:index_of(Name, record_info(fields, session)),
setelement(Pos + 1, Session, Value).

get_mqueue(#session{mqueue = Q}) ->
emqx_mqueue:to_list(Q).
63 changes: 52 additions & 11 deletions apps/emqx/src/emqx_shared_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
-export([
dispatch/3,
dispatch/4,
do_dispatch_with_ack/4
do_dispatch_with_ack/4,
redispatch/1
]).

-export([
maybe_ack/1,
maybe_nack_dropped/1,
nack_no_connection/1,
is_ack_required/1,
get_group/1
is_ack_required/1
]).

%% for testing
Expand Down Expand Up @@ -96,6 +96,9 @@
-define(ACK, shared_sub_ack).
-define(NACK(Reason), {shared_sub_nack, Reason}).
-define(NO_ACK, no_ack).
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).

-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).

-record(state, {pmon}).

Expand Down Expand Up @@ -144,7 +147,8 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
false ->
{error, no_subscribers};
{Type, SubPid} ->
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
Msg1 = with_redispatch_to(Msg, Group, Topic),
case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
ok ->
{ok, 1};
{error, _Reason} ->
Expand Down Expand Up @@ -223,16 +227,53 @@ without_group_ack(Msg) ->
get_group_ack(Msg) ->
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).

-spec is_ack_required(emqx_types:message()) -> boolean().
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
Msg;
with_redispatch_to(Msg, Group, Topic) ->
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).

%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
is_redispatch_needed(#message{} = Msg) ->
case get_redispatch_to(Msg) of
?REDISPATCH_TO(_, _) ->
true;
_ ->
false
end.

-spec get_group(emqx_types:message()) -> {ok, any()} | error.
get_group(Msg) ->
case get_group_ack(Msg) of
?NO_ACK -> error;
{Group, _Sender, _Ref} -> {ok, Group}
%% @doc Redispatch shared deliveries to other members in the group.
redispatch(Messages0) ->
Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
case length(Messages) of
L when L > 0 ->
?SLOG(info, #{
msg => "redispatching_shared_subscription_message",
count => L
}),
lists:foreach(fun redispatch_shared_message/1, Messages);
_ ->
ok
end.

redispatch_shared_message(#message{} = Msg) ->
%% As long as it's still a #message{} record in inflight,
%% we should try to re-dispatch
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
dispatch(Group, Topic, Delivery, [self()]).

%% @hidden Return the `redispatch_to` group-topic in the message header.
%% `false` is returned if the message is not a shared dispatch.
%% or when it's a QoS 0 message.
-spec get_redispatch_to(emqx_types:message()) -> redispatch_to() | false.
get_redispatch_to(Msg) ->
emqx_message:get_header(redispatch_to, Msg, false).

-spec is_ack_required(emqx_types:message()) -> boolean().
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).

%% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec maybe_nack_dropped(emqx_types:message()) -> boolean().
maybe_nack_dropped(Msg) ->
Expand Down

0 comments on commit 63774ba

Please sign in to comment.