Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: willmsg not published in takeover #11868

Merged
merged 14 commits into from Feb 29, 2024
Merged
199 changes: 173 additions & 26 deletions apps/emqx/src/emqx_channel.erl
Expand Up @@ -117,7 +117,17 @@
atom() => term()
}.

-type conn_state() :: idle | connecting | connected | reauthenticating | disconnected.
%% init
-type conn_state() ::
idle
%% mqtt connect recved but not acked
| connecting
%% mqtt connect acked
| connected
%% mqtt connected but reauthenticating
| reauthenticating
%% keepalive timeout or connection terminated
| disconnected.

-type reply() ::
{outgoing, emqx_types:packet()}
Expand All @@ -135,6 +145,7 @@
).

-define(LIMITER_ROUTING, message_routing).
-define(chan_terminating, chan_terminating).

-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).

Expand Down Expand Up @@ -863,6 +874,7 @@ do_unsubscribe(

%% MQTT-v5.0: 3.14.4 DISCONNECT Actions
maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
%% [MQTT-3.14.4-3]
Channel#channel{will_msg = undefined};
maybe_clean_will_msg(_ReasonCode, Channel) ->
Channel.
Expand Down Expand Up @@ -1134,16 +1146,14 @@ handle_call(
kick,
Channel = #channel{
conn_state = ConnState,
will_msg = WillMsg,
clientinfo = ClientInfo,
conninfo = #{proto_ver := ProtoVer}
}
) ->
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
Channel0 = maybe_publish_will_msg(kicked, Channel),
qzhuyan marked this conversation as resolved.
Show resolved Hide resolved
Channel1 =
case ConnState of
connected -> ensure_disconnected(kicked, Channel);
_ -> Channel
connected -> ensure_disconnected(kicked, Channel0);
_ -> Channel0
end,
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
true ->
Expand All @@ -1157,7 +1167,8 @@ handle_call(
shutdown(kicked, ok, Channel1)
end;
handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
Channel0 = maybe_publish_will_msg(discarded, Channel),
qzhuyan marked this conversation as resolved.
Show resolved Hide resolved
disconnect_and_shutdown(discarded, ok, Channel0);
%% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
reply(Session, Channel#channel{takeover = true});
Expand All @@ -1180,7 +1191,8 @@ handle_call(
emqx_channel_takeover_end,
#{clientid => ClientId}
),
disconnect_and_shutdown(takenover, AllPendings, Channel);
Channel0 = maybe_publish_will_msg(takenover, Channel),
disconnect_and_shutdown(takenover, AllPendings, Channel0);
handle_call(list_authz_cache, Channel) ->
{reply, emqx_authz_cache:list_authz_cache(), Channel};
handle_call(
Expand Down Expand Up @@ -1233,7 +1245,7 @@ handle_info(
ConnState =:= connected orelse ConnState =:= reauthenticating
->
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
Channel2 = Channel1#channel{session = Session1},
case maybe_shutdown(Reason, Intent, Channel2) of
{ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
Expand Down Expand Up @@ -1345,8 +1357,9 @@ handle_timeout(
handle_out(publish, Replies, Channel#channel{session = NSession})
end;
handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) ->
Channel0 = maybe_publish_will_msg(expired, Channel),
ok = emqx_session:destroy(Session),
shutdown(expired, Channel);
shutdown(expired, Channel0);
handle_timeout(
_TRef,
will_message = TimerName,
Expand Down Expand Up @@ -1424,19 +1437,16 @@ terminate(_, #channel{conn_state = idle} = _Channel) ->
ok;
terminate(normal, Channel) ->
run_terminate_hook(normal, Channel);
terminate({shutdown, kicked}, Channel) ->
run_terminate_hook(kicked, Channel);
terminate({shutdown, Reason}, Channel) when
Reason =:= discarded;
Reason =:= takenover
Reason =:= expired orelse
qzhuyan marked this conversation as resolved.
Show resolved Hide resolved
Reason =:= takenover orelse
Reason =:= kicked orelse
Reason =:= discarded
->
run_terminate_hook(Reason, Channel);
terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
%% since will_msg is set to undefined as soon as it is published,
%% if will_msg still exists when the session is terminated, it
%% must be published immediately.
WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
run_terminate_hook(Reason, Channel).
terminate(Reason, Channel) ->
qzhuyan marked this conversation as resolved.
Show resolved Hide resolved
Channel1 = maybe_publish_will_msg(?chan_terminating, Channel),
run_terminate_hook(Reason, Channel1).

run_terminate_hook(_Reason, #channel{session = undefined}) ->
ok;
Expand Down Expand Up @@ -2227,15 +2237,139 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) ->

%%--------------------------------------------------------------------
%% Maybe Publish will msg

maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
%% @doc Maybe publish will message [MQTT-3.1.2-8]
%% When willmsg presents the decision whether or when to publish the Will Message are effected by
%% the followings:
%% - connecion state
%% - If it is MQTT normal disconnection (RC: 0) or abnormal (RC != 0) from the *client*
%% - will delay interval (MQTT 5.0 only)
%% - session expire Session Expiry (MQTT 5.0 only)
%% - EMQX operations on the client
%% @NOTE:
%% Connection close with session expiry interval = 0 means session close.
%% @NOTE:
%% The caller does not need to take care of the case when process terminates while will_msg is published
%% as it is designed by the spec.
%% @NOTE:
%% this function should be safe to be called multiple times in the life time of the connecion process, the willmsg
%% must be delete from the state if it is published or cleared.
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when
Reason ::
%% Connection is terminating because session is taken over by another process.
takenover
%% Connection is terminating because of EMQX mgmt operation, the session state is deleted with none-zero RC code
| kicked
%% Connection is terminating because of client clean start new session.
| discarded
%% Connection is terminating because session is expired
| expired
%% Connection is terminating because of socket close/error
| sock_closed
%% Session is terminating, delay willmsg publish is impossible.
| ?chan_terminating.
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
%% No will message to publish
Channel;
maybe_publish_will_msg(
_Reason,
Channel = #channel{
conn_state = ConnState,
conninfo = #{clientid := ClientId}
zmstone marked this conversation as resolved.
Show resolved Hide resolved
}
) when
ConnState =:= idle orelse
ConnState =:= connecting orelse
ConnState =:= reauthenticating
qzhuyan marked this conversation as resolved.
Show resolved Hide resolved
->
%% Wrong state to publish, they are intermediate state
?tp(debug, willmsg_wrong_state, #{clientid => ClientId}),
zmstone marked this conversation as resolved.
Show resolved Hide resolved
Channel;
maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
maybe_publish_will_msg(
_Reason,
Channel = #channel{
conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg
}
) ->
%% Unconditionally publish will message for MQTT 3.1.1
?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}),
_ = publish_will_msg(Channel#channel.clientinfo, WillMsg),
Channel#channel{will_msg = undefined};
maybe_publish_will_msg(
Reason,
Channel = #channel{
clientinfo = ClientInfo,
conninfo = #{clientid := ClientId},
will_msg = WillMsg
}
) when
Reason =:= expired orelse
Reason =:= discarded orelse
Reason =:= kicked orelse
Reason =:= ?chan_terminating orelse
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this keeps the old behavior.

%% Depends on the session backend, we may lost the session
Reason =:= {shutdown, internal_error}
->
%% For the cases that session MUST be gone impiles that the will message MUST be published
%% a. expired (session expired)
%% b. discarded (Session ends because of clean start)
%% c. kicked. (kicked by operation, abnormal conn close)
%% d. internal_error (maybe not recoverable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. It seems a bit unsafe to assume that in the event of hitting internal error publishing should still work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just list them, may miss something, the entire call path, call stack is hard to follow.

Copy link
Contributor Author

@qzhuyan qzhuyan Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this in spec:

In the case of a Server shutdown or failure, the Server MAY defer publication of Will Messages until a subsequent restart. If this happens, there might be a delay between the time the Server experienced failure and when the Will Message is published.

So I think it is ok to either publish/not publish/delay publish willmsg when server error.

%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
%% OR fired but not yet handled
?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
maybe_publish_will_msg(
takenover,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) ->
%% TAKEOVER [MQTT-3.1.4-3]
%% MQTT 5, Non-normative comment:
%% """"
%% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server,
%% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start
%% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at
%% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent
%% because the Session ends.
%% """"
%% NOTE, above clean start=1 is `discard' scenarios not `takeover' scenario.
case will_delay_interval(WillMsg) of
0 ->
ok = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined};
I ->
?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}),
_ = publish_will_msg(ClientInfo, WillMsg);
I when I > 0 ->
%% @NOTE Non-normative comment in MQTT 5.0 spec
%% """
%% One use of this is to avoid publishing Will Messages if there is a temporary network
%% disconnection and the Client succeeds in reconnecting and continuing its Session
%% before the Will Message is published.
%% """
?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}),
skip
end,
remove_willmsg(Channel);
maybe_publish_will_msg(
Reason,
Channel = #channel{
clientinfo = ClientInfo,
will_msg = WillMsg,
conninfo = #{clientid := ClientId}
}
) ->
%% Default to handle other reasons
case will_delay_interval(WillMsg) of
0 ->
?tp(debug, maybe_publish_will_msg_other_publish, #{
clientid => ClientId, reason => Reason
}),
_ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel);
I when I > 0 ->
qzhuyan marked this conversation as resolved.
Show resolved Hide resolved
?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}),
ensure_timer(will_message, timer:seconds(I), Channel)
end.

Expand Down Expand Up @@ -2368,6 +2502,19 @@ get_mqtt_conf(Zone, Key) ->
get_mqtt_conf(Zone, Key, Default) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key], Default).

%% @doc unset will_msg and cancel the will_message timer
-spec remove_willmsg(Old :: channel()) -> New :: channel().
remove_willmsg(Channel = #channel{timers = Timers}) ->
case maps:get(will_message, Timers, undefined) of
undefined ->
Channel#channel{will_msg = undefined};
DelayedWillTimer ->
ok = erlang:cancel_timer(DelayedWillTimer, [{async, true}, {info, false}]),
Channel#channel{
will_msg = undefined,
timers = maps:remove(will_message, Timers)
}
end.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------
Expand Down
9 changes: 8 additions & 1 deletion apps/emqx/src/emqx_connection.erl
Expand Up @@ -728,7 +728,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
{shutdown, Reason, Reply, OutPacket, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState),
shutdown(Reason, Reply, NState)
NState2 = graceful_shutdown_transport(Reason, NState),
shutdown(Reason, Reply, NState2)
end.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -1234,6 +1235,12 @@ set_tcp_keepalive({Type, Id}) ->
async_set_keepalive(Idle, Interval, Probes)
end.

-spec graceful_shutdown_transport(atom(), state()) -> state().
graceful_shutdown_transport(_Reason, S = #state{transport = Transport, socket = Socket}) ->
%% @TODO Reason is reserved for future use, quic transport
Transport:shutdown(Socket, read_write),
S#state{sockstate = closed}.

%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions apps/emqx/src/emqx_quic_stream.erl
Expand Up @@ -32,6 +32,7 @@
wait/1,
getstat/2,
fast_close/1,
shutdown/2,
ensure_ok_or_exit/2,
async_send/3,
setopts/2,
Expand Down Expand Up @@ -147,6 +148,10 @@ fast_close({quic, _Conn, Stream, _Info}) ->
% quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
ok.

shutdown({quic, _Conn, Stream, _Info}, read_write) ->
%% A graceful shutdown means both side shutdown the read and write gracefully.
quicer:shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1, 5000).

-spec ensure_ok_or_exit(atom(), list(term())) -> term().
ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
case erlang:apply(?MODULE, Fun, Args) of
Expand Down
1 change: 1 addition & 0 deletions apps/emqx/test/emqx_connection_SUITE.erl
Expand Up @@ -32,6 +32,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
%% Meck Transport
ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_transport, shutdown, fun(_, _) -> ok end),
%% Meck Channel
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
%% Meck Cm
Expand Down
4 changes: 4 additions & 0 deletions apps/emqx/test/emqx_shared_sub_SUITE.erl
Expand Up @@ -908,6 +908,8 @@ t_session_takeover(Config) when is_list(Config) ->
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
{true, _} = last_message(<<"hello2">>, [ConnPid2]),
%% We may or may not recv dup hello2 due to QoS1 redelivery
_ = last_message(<<"hello2">>, [ConnPid2]),
{true, _} = last_message(<<"hello3">>, [ConnPid2]),
{true, _} = last_message(<<"hello4">>, [ConnPid2]),
?assertEqual([], collect_msgs(timer:seconds(2))),
Expand Down Expand Up @@ -951,6 +953,8 @@ t_session_kicked(Config) when is_list(Config) ->
%% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),

ct:pal("MsgRec1: ~p MsgRec2 ~p ~n", [MsgRec1, MsgRec2]),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1);
Expand Down