Skip to content

Commit

Permalink
fix: maybe send willmsg
Browse files Browse the repository at this point in the history
  • Loading branch information
qzhuyan committed Nov 22, 2023
1 parent b5d4b60 commit e11bd4a
Show file tree
Hide file tree
Showing 2 changed files with 422 additions and 90 deletions.
187 changes: 139 additions & 48 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
).

-define(LIMITER_ROUTING, message_routing).
-define(chan_terminating, chan_terminating).

-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).

Expand Down Expand Up @@ -856,6 +857,7 @@ do_unsubscribe(

%% MQTT-v5.0: 3.14.4 DISCONNECT Actions
maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
%% [MQTT-3.14.4-3]
Channel#channel{will_msg = undefined};
maybe_clean_will_msg(_ReasonCode, Channel) ->
Channel.
Expand Down Expand Up @@ -1143,7 +1145,8 @@ handle_call(
shutdown(kicked, ok, Channel1)
end;
handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
Channel0 = maybe_publish_will_msg(discarded, Channel),
disconnect_and_shutdown(discarded, ok, Channel0);
%% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
reply(Session, Channel#channel{takeover = true});
Expand All @@ -1166,7 +1169,8 @@ handle_call(
emqx_channel_takeover_end,
#{clientid => ClientId}
),
disconnect_and_shutdown(takenover, AllPendings, Channel);
Channel0 = maybe_publish_will_msg(takenover, Channel),
disconnect_and_shutdown(takenover, AllPendings, Channel0);
handle_call(list_authz_cache, Channel) ->
{reply, emqx_authz_cache:list_authz_cache(), Channel};
handle_call(
Expand Down Expand Up @@ -1218,7 +1222,7 @@ handle_info(
ConnState =:= connected orelse ConnState =:= reauthenticating
->
{Intent, Session1} = emqx_session:disconnect(ClientInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Reason, Channel)),
Channel2 = Channel1#channel{session = Session1},
case maybe_shutdown(Reason, Intent, Channel2) of
{ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
Expand Down Expand Up @@ -1328,7 +1332,8 @@ handle_timeout(
handle_out(publish, Replies, Channel#channel{session = NSession})
end;
handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
Channel0 = maybe_publish_will_msg(expired, Channel),
shutdown(expired, Channel0);
handle_timeout(
_TRef,
will_message = TimerName,
Expand Down Expand Up @@ -1412,10 +1417,9 @@ terminate({shutdown, Reason}, Channel) when
Reason =:= kicked orelse
Reason =:= discarded
->
Channel1 = maybe_publish_will_msg(Reason, Channel),
run_terminate_hook(Reason, Channel1);
run_terminate_hook(Reason, Channel);
terminate(Reason, Channel) ->
Channel1 = maybe_publish_will_msg(Reason, Channel),
Channel1 = maybe_publish_will_msg(?chan_terminating, Channel),
run_terminate_hook(Reason, Channel1).

run_terminate_hook(_Reason, #channel{session = undefined}) ->
Expand Down Expand Up @@ -2194,74 +2198,149 @@ ensure_disconnected(

%%--------------------------------------------------------------------
%% Maybe Publish will msg
%% @doc May publish will message [MQTT-3.1.2-8]
%% When willmsg presents the decision whether or when to publish the Will Message are effected by
%% the followings:
%% - connecion state
%% - If it is MQTT normal disconnection (RC: 0)
%% - If it is MQTT normal disconnection (RC: 4)
%% - will delay interval (MQTT 5.0 only)
%% - session expire Session Expiry (MQTT 5.0 only)
%% - EMQX operations on the client
%% @NOTE:
%% Connection close with session expiry interval = 0 means session close.
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when
Reason :: takenover | kicked | discarded | expired | sock_closed | {shutdown, atom()}.
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
Channel;
maybe_publish_will_msg({shutdown, not_authorized}, Channel) ->
Reason ::
%% Connection will terminate because session is taken over by another process.
takenover
%% Connection will terminate because of EMQX mgmt operation, also delete the session.
| kicked
%% Connection will terminate because session is taken over by another process.
| discarded
%% Connection will terminate because session is expired
| expired
%% Connection will terminate because of socket close/error
| sock_closed
%% Connection will terminate with Reasons
| {shutdown, atom()}
%% Connection will terminate soon, delay willmsg publish is impossible.
| ?chan_terminating
%% Connection will terminate because of normal MQTT disconnection, implies delete the session.
| normal.
maybe_publish_will_msg(normal, Channel) ->
%% [MQTT-3.1.2-8]
Channel;
maybe_publish_will_msg(not_authorized, Channel) ->
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
%% No will message to publish
Channel;
maybe_publish_will_msg(_Reason, Channel = #channel{conn_state = ConnState}) when
maybe_publish_will_msg(
_Reason,
Channel = #channel{
conn_state = ConnState,
conninfo = #{clientid := ClientId}
}
) when
ConnState =:= idle orelse
ConnState =:= connecting orelse
ConnState =:= reauthenticating
->
%% Wrong state to publish
?tp(debug, willmsg_wrong_state, #{clientid => ClientId}),
Channel;
maybe_publish_will_msg(
_Reason,
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg}
Channel = #channel{
conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg
}
) ->
%% Unconditionally publish will message for MQTT 3.1.1
ok = publish_will_msg(Channel#channel.clientinfo, WillMsg),
?tp(debug, willmsg_v3, #{clientid => ClientId}),
_ = publish_will_msg(Channel#channel.clientinfo, WillMsg),
Channel#channel{will_msg = undefined};
maybe_publish_will_msg(
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
Reason,
Channel = #channel{
clientinfo = ClientInfo,
conninfo = #{clientid := ClientId},
will_msg = WillMsg
}
) when
Reason =:= expired orelse
Reason =:= discarded orelse
%% Unsure...
Reason =:= {shutdown, internal_error} orelse
Reason =:= kicked
Reason =:= kicked orelse
Reason =:= ?chan_terminating orelse
%% Depends on the session backend, we may lost the session
Reason =:= {shutdown, internal_error}
->
%% For the cases that session MUST be gone.
%% For the cases that session MUST be gone impiles that the will message MUST be published
%% a. expired (session expired)
%% c. discarded (Session ends because another process starts new session with the same clientid)
%% b. kicked. (kicked by operation)
%% b. discarded (Session ends because another process starts new session with the same clientid)
%% c. kicked. (kicked by operation)
%% d. internal_error (maybe not recoverable)
%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
%% OR fired but not yet handled
DelayedWillTimer = maps:get(will_message, Timers, undefined),
DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]),
?tp(debug, willmsg_session_ends, #{clientid => ClientId}),
_ = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)};
remove_willmsg(Channel);
maybe_publish_will_msg(
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
takenover,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) ->
%% For the cases that session MAY/MAY NOT be gone, we don't care about session expired or not.
%% willmsg publish could be defered.
IsSessionExpirationInProgress = maps:is_key(expire_session, Timers),
IsWillmsgScheduled = maps:is_key(will_message, Timers),
%% TAKEOVER [MQTT-3.1.4-3]
?tp(debug, willmsg_takeover, #{clientid => ClientId}),
case will_delay_interval(WillMsg) of
0 ->
%% [MQTT-3.1.2-8], 0 means will delay Will Delay Interval has elapsed
false = IsWillmsgScheduled,
ok = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined};
I when IsSessionExpirationInProgress andalso not IsWillmsgScheduled ->
%% We delay the will message publishing
%% Willmsg will be published whatever which timer fired first
ensure_timer(will_message, timer:seconds(I), Channel);
_ when IsSessionExpirationInProgress andalso IsWillmsgScheduled ->
%% Willmsg will be published whatever which timer fired first [MQTT-3.1.3-9].
Channel;
_I when Reason =:= takenover ->
%% don't see the point to delay the willmsg
Channel;
_I when not IsSessionExpirationInProgress andalso IsWillmsgScheduled ->
Channel;
I when not IsSessionExpirationInProgress andalso not IsWillmsgScheduled ->
%% @FIXME: process may terminate before the timer fired
%% MQTT 5, Non-normative comment:
%% """"
%% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server,
%% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start
%% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at
%% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent
%% because the Session ends.
%% """"
_ = publish_will_msg(ClientInfo, WillMsg);
I when I > 0 ->
%% @NOTE: We do not publish willmsg when EI > 0 but that does not mean we shall delay the willmsg publish
%% because the session is already takenover by another process. If we delay the willmsg publish, the willmsg
%% will be published as there is no chance to get it cancelled. This is not stated clearly in the MQTT spec.
skip
end,
remove_willmsg(Channel);
maybe_publish_will_msg(
{shutdown, _},
Channel = #channel{
conninfo = #{expiry_interval := 0, clientid := ClientId},
clientinfo = ClientInfo,
will_msg = WillMsg
}
) ->
%% MQTT 5: 3.1.2.11.2 Session Expiry Interval
%% If the Session Expiry Interval is absent the value 0 is used.
%% If it is set to 0, or is absent, the Session ends when the Network Connection is closed.
%% Expire_interval == 0, means session is over at the time of calling with shutdown.
?tp(debug, willmsg_takeover, #{clientid => ClientId}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
maybe_publish_will_msg(
Reason,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) ->
%% Handles other Unknown Reasons.
case will_delay_interval(WillMsg) of
0 ->
?tp(debug, willmsg_other_publish, #{clientid => ClientId, reason => Reason}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
I when I > 0 ->
?tp(debug, willmsg_other_delay, #{clientid => ClientId, reason => Reason}),
ensure_timer(will_message, timer:seconds(I), Channel)
end.

Expand Down Expand Up @@ -2392,6 +2471,18 @@ get_mqtt_conf(Zone, Key) ->
get_mqtt_conf(Zone, Key, Default) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key], Default).

-spec remove_willmsg(Old :: channel()) -> New :: channel().
remove_willmsg(Channel = #channel{timers = Timers}) ->
case maps:get(will_message, Timers, undefined) of
undefined ->
Channel#channel{will_msg = undefined};
DelayedWillTimer ->
ok = erlang:cancel_timer(DelayedWillTimer, [{async, true}, {info, false}]),
Channel#channel{
will_msg = undefined,
timers = maps:remove(will_message, Timers)
}
end.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------
Expand Down

0 comments on commit e11bd4a

Please sign in to comment.