diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 192335a251..51b66f4f9d 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -117,7 +117,17 @@ atom() => term() }. --type conn_state() :: idle | connecting | connected | reauthenticating | disconnected. +%% init +-type conn_state() :: + idle + %% mqtt connect recved but not acked + | connecting + %% mqtt connect acked + | connected + %% mqtt connected but reauthenticating + | reauthenticating + %% keepalive timeout or connection terminated + | disconnected. -type reply() :: {outgoing, emqx_types:packet()} @@ -135,6 +145,7 @@ ). -define(LIMITER_ROUTING, message_routing). +-define(chan_terminating, chan_terminating). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). @@ -863,6 +874,7 @@ do_unsubscribe( %% MQTT-v5.0: 3.14.4 DISCONNECT Actions maybe_clean_will_msg(?RC_SUCCESS, Channel) -> + %% [MQTT-3.14.4-3] Channel#channel{will_msg = undefined}; maybe_clean_will_msg(_ReasonCode, Channel) -> Channel. @@ -1134,16 +1146,14 @@ handle_call( kick, Channel = #channel{ conn_state = ConnState, - will_msg = WillMsg, - clientinfo = ClientInfo, conninfo = #{proto_ver := ProtoVer} } ) -> - (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), + Channel0 = maybe_publish_will_msg(kicked, Channel), Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel); - _ -> Channel + connected -> ensure_disconnected(kicked, Channel0); + _ -> Channel0 end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> @@ -1157,7 +1167,8 @@ handle_call( shutdown(kicked, ok, Channel1) end; handle_call(discard, Channel) -> - disconnect_and_shutdown(discarded, ok, Channel); + Channel0 = maybe_publish_will_msg(discarded, Channel), + disconnect_and_shutdown(discarded, ok, Channel0); %% Session Takeover handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> reply(Session, Channel#channel{takeover = true}); @@ -1180,7 +1191,8 @@ handle_call( emqx_channel_takeover_end, #{clientid => ClientId} ), - disconnect_and_shutdown(takenover, AllPendings, Channel); + Channel0 = maybe_publish_will_msg(takenover, Channel), + disconnect_and_shutdown(takenover, AllPendings, Channel0); handle_call(list_authz_cache, Channel) -> {reply, emqx_authz_cache:list_authz_cache(), Channel}; handle_call( @@ -1233,7 +1245,7 @@ handle_info( ConnState =:= connected orelse ConnState =:= reauthenticating -> {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), - Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), Channel2 = Channel1#channel{session = Session1}, case maybe_shutdown(Reason, Intent, Channel2) of {ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3}; @@ -1345,8 +1357,9 @@ handle_timeout( handle_out(publish, Replies, Channel#channel{session = NSession}) end; handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> + Channel0 = maybe_publish_will_msg(expired, Channel), ok = emqx_session:destroy(Session), - shutdown(expired, Channel); + shutdown(expired, Channel0); handle_timeout( _TRef, will_message = TimerName, @@ -1424,19 +1437,16 @@ terminate(_, #channel{conn_state = idle} = _Channel) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); -terminate({shutdown, kicked}, Channel) -> - run_terminate_hook(kicked, Channel); terminate({shutdown, Reason}, Channel) when - Reason =:= discarded; - Reason =:= takenover + Reason =:= expired orelse + Reason =:= takenover orelse + Reason =:= kicked orelse + Reason =:= discarded -> run_terminate_hook(Reason, Channel); -terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> - %% since will_msg is set to undefined as soon as it is published, - %% if will_msg still exists when the session is terminated, it - %% must be published immediately. - WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg), - run_terminate_hook(Reason, Channel). +terminate(Reason, Channel) -> + Channel1 = maybe_publish_will_msg(?chan_terminating, Channel), + run_terminate_hook(Reason, Channel1). run_terminate_hook(_Reason, #channel{session = undefined}) -> ok; @@ -2227,15 +2237,139 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg - -maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +%% @doc Maybe publish will message [MQTT-3.1.2-8] +%% When willmsg presents the decision whether or when to publish the Will Message are effected by +%% the followings: +%% - connecion state +%% - If it is MQTT normal disconnection (RC: 0) or abnormal (RC != 0) from the *client* +%% - will delay interval (MQTT 5.0 only) +%% - session expire Session Expiry (MQTT 5.0 only) +%% - EMQX operations on the client +%% @NOTE: +%% Connection close with session expiry interval = 0 means session close. +%% @NOTE: +%% The caller does not need to take care of the case when process terminates while will_msg is published +%% as it is designed by the spec. +%% @NOTE: +%% this function should be safe to be called multiple times in the life time of the connecion process, the willmsg +%% must be delete from the state if it is published or cleared. +-spec maybe_publish_will_msg(Reason, channel()) -> channel() when + Reason :: + %% Connection is terminating because session is taken over by another process. + takenover + %% Connection is terminating because of EMQX mgmt operation, the session state is deleted with none-zero RC code + | kicked + %% Connection is terminating because of client clean start new session. + | discarded + %% Connection is terminating because session is expired + | expired + %% Connection is terminating because of socket close/error + | sock_closed + %% Session is terminating, delay willmsg publish is impossible. + | ?chan_terminating. +maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> + %% No will message to publish + Channel; +maybe_publish_will_msg( + _Reason, + Channel = #channel{ + conn_state = ConnState, + conninfo = #{clientid := ClientId} + } +) when + ConnState =:= idle orelse + ConnState =:= connecting orelse + ConnState =:= reauthenticating +-> + %% Wrong state to publish, they are intermediate state + ?tp(debug, willmsg_wrong_state, #{clientid => ClientId}), Channel; -maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> +maybe_publish_will_msg( + _Reason, + Channel = #channel{ + conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg + } +) -> + %% Unconditionally publish will message for MQTT 3.1.1 + ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), + _ = publish_will_msg(Channel#channel.clientinfo, WillMsg), + Channel#channel{will_msg = undefined}; +maybe_publish_will_msg( + Reason, + Channel = #channel{ + clientinfo = ClientInfo, + conninfo = #{clientid := ClientId}, + will_msg = WillMsg + } +) when + Reason =:= expired orelse + Reason =:= discarded orelse + Reason =:= kicked orelse + Reason =:= ?chan_terminating orelse + %% Depends on the session backend, we may lost the session + Reason =:= {shutdown, internal_error} +-> + %% For the cases that session MUST be gone impiles that the will message MUST be published + %% a. expired (session expired) + %% b. discarded (Session ends because of clean start) + %% c. kicked. (kicked by operation, abnormal conn close) + %% d. internal_error (maybe not recoverable) + %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired + %% OR fired but not yet handled + ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), + _ = publish_will_msg(ClientInfo, WillMsg), + remove_willmsg(Channel); +maybe_publish_will_msg( + takenover, + Channel = #channel{ + clientinfo = ClientInfo, + will_msg = WillMsg, + conninfo = #{clientid := ClientId} + } +) -> + %% TAKEOVER [MQTT-3.1.4-3] + %% MQTT 5, Non-normative comment: + %% """" + %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, + %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start + %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at + %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent + %% because the Session ends. + %% """" + %% NOTE, above clean start=1 is `discard' scenarios not `takeover' scenario. case will_delay_interval(WillMsg) of 0 -> - ok = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined}; - I -> + ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), + _ = publish_will_msg(ClientInfo, WillMsg); + I when I > 0 -> + %% @NOTE Non-normative comment in MQTT 5.0 spec + %% """ + %% One use of this is to avoid publishing Will Messages if there is a temporary network + %% disconnection and the Client succeeds in reconnecting and continuing its Session + %% before the Will Message is published. + %% """ + ?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}), + skip + end, + remove_willmsg(Channel); +maybe_publish_will_msg( + Reason, + Channel = #channel{ + clientinfo = ClientInfo, + will_msg = WillMsg, + conninfo = #{clientid := ClientId} + } +) -> + %% Default to handle other reasons + case will_delay_interval(WillMsg) of + 0 -> + ?tp(debug, maybe_publish_will_msg_other_publish, #{ + clientid => ClientId, reason => Reason + }), + _ = publish_will_msg(ClientInfo, WillMsg), + remove_willmsg(Channel); + I when I > 0 -> + ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), ensure_timer(will_message, timer:seconds(I), Channel) end. @@ -2368,6 +2502,19 @@ get_mqtt_conf(Zone, Key) -> get_mqtt_conf(Zone, Key, Default) -> emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). +%% @doc unset will_msg and cancel the will_message timer +-spec remove_willmsg(Old :: channel()) -> New :: channel(). +remove_willmsg(Channel = #channel{timers = Timers}) -> + case maps:get(will_message, Timers, undefined) of + undefined -> + Channel#channel{will_msg = undefined}; + DelayedWillTimer -> + ok = erlang:cancel_timer(DelayedWillTimer, [{async, true}, {info, false}]), + Channel#channel{ + will_msg = undefined, + timers = maps:remove(will_message, Timers) + } + end. %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 96e4f54c79..c4671bc32e 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -728,7 +728,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> {shutdown, Reason, Reply, OutPacket, NChannel} -> NState = State#state{channel = NChannel}, ok = handle_outgoing(OutPacket, NState), - shutdown(Reason, Reply, NState) + NState2 = graceful_shutdown_transport(Reason, NState), + shutdown(Reason, Reply, NState2) end. %%-------------------------------------------------------------------- @@ -1234,6 +1235,12 @@ set_tcp_keepalive({Type, Id}) -> async_set_keepalive(Idle, Interval, Probes) end. +-spec graceful_shutdown_transport(atom(), state()) -> state(). +graceful_shutdown_transport(_Reason, S = #state{transport = Transport, socket = Socket}) -> + %% @TODO Reason is reserved for future use, quic transport + Transport:shutdown(Socket, read_write), + S#state{sockstate = closed}. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 42f153b28d..f27009888a 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -32,6 +32,7 @@ wait/1, getstat/2, fast_close/1, + shutdown/2, ensure_ok_or_exit/2, async_send/3, setopts/2, @@ -147,6 +148,10 @@ fast_close({quic, _Conn, Stream, _Info}) -> % quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), ok. +shutdown({quic, _Conn, Stream, _Info}, read_write) -> + %% A graceful shutdown means both side shutdown the read and write gracefully. + quicer:shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1, 5000). + -spec ensure_ok_or_exit(atom(), list(term())) -> term(). ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> case erlang:apply(?MODULE, Fun, Args) of diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 98d4e31021..74038a8fc8 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -32,6 +32,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> %% Meck Transport ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]), + ok = meck:expect(emqx_transport, shutdown, fun(_, _) -> ok end), %% Meck Channel ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index b732735bd2..14617d95fd 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -908,6 +908,8 @@ t_session_takeover(Config) when is_list(Config) -> ?assertMatch([_], emqx:publish(Message3)), ?assertMatch([_], emqx:publish(Message4)), {true, _} = last_message(<<"hello2">>, [ConnPid2]), + %% We may or may not recv dup hello2 due to QoS1 redelivery + _ = last_message(<<"hello2">>, [ConnPid2]), {true, _} = last_message(<<"hello3">>, [ConnPid2]), {true, _} = last_message(<<"hello4">>, [ConnPid2]), ?assertEqual([], collect_msgs(timer:seconds(2))), @@ -951,6 +953,8 @@ t_session_kicked(Config) when is_list(Config) -> %% on if it's picked as the first one for round_robin MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + + ct:pal("MsgRec1: ~p MsgRec2 ~p ~n", [MsgRec1, MsgRec2]), case MsgRec2 of <<"hello3">> -> ?assertEqual(<<"hello1">>, MsgRec1); diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 03a48e1743..4575634a9e 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -24,47 +24,83 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(TOPIC, <<"t">>). -define(CNT, 100). -define(SLEEP, 10). %%-------------------------------------------------------------------- %% Initial funcs -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, mqttv3}, + {group, mqttv5} + ]. init_per_suite(Config) -> Apps = emqx_cth_suite:start( [emqx], #{work_dir => emqx_cth_suite:work_dir(Config)} ), + emqx_logger:set_log_level(debug), [{apps, Apps} | Config]. end_per_suite(Config) -> Apps = ?config(apps, Config), ok = emqx_cth_suite:stop(Apps), ok. + +groups() -> + [ + {mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()}, + {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} + ]. + +tc_v5_only() -> + [ + t_session_expire_with_delayed_willmsg, + t_no_takeover_with_delayed_willmsg, + t_takeover_before_session_expire, + t_takeover_before_willmsg_expire, + t_takeover_before_session_expire_willdelay0, + t_takeover_session_then_normal_disconnect, + t_takeover_session_then_abnormal_disconnect, + t_takeover_session_then_abnormal_disconnect_2 + ]. + +init_per_group(mqttv3, Config) -> + lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); +init_per_group(mqttv5, Config) -> + lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}). + +end_per_group(_Group, _Config) -> + ok. + %%-------------------------------------------------------------------- %% Testcases -t_takeover(_) -> +t_takeover(Config) -> process_flag(trap_exit, true), - ClientId = <<"clientid">>, + ClientId = atom_to_binary(?FUNCTION_NAME), + ClientOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false} + ], Middle = ?CNT div 2, - Client1Msgs = messages(0, Middle), - Client2Msgs = messages(Middle, ?CNT div 2), + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), AllMsgs = Client1Msgs ++ Client2Msgs, - meck:new(emqx_cm, [non_strict, passthrough]), meck:expect(emqx_cm, takeover_session_end, fun(Arg) -> + %% trigger more complex takeover conditions during 2-phase takeover protocol: + %% when messages are accumulated in 2 processes simultaneously, + %% and need to be properly ordered / deduplicated after the protocol commences. ok = timer:sleep(?SLEEP * 2), meck:passthrough([Arg]) end), - Commands = - [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ [{fun stop_client/1, []}], @@ -78,30 +114,751 @@ t_takeover(_) -> ), #{client := [CPid2, CPid1]} = FCtx, - ?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}), - ?assertReceive({'EXIT', CPid2, normal}), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + ?assertReceive({'EXIT', CPid2, normal}), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("middle: ~p", [Middle]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(AllMsgs, Received), assert_messages_order(AllMsgs, Received), - meck:unload(emqx_cm), ok. -t_takover_in_cluster(_) -> - todo. +t_takeover_willmsg(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload">>}, + {will_qos, 0} + ], + Commands = + %% GIVEN client connect with will message + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN client reconnect with clean_start = false + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), + assert_messages_missed(AllMsgs, ReceivedNoWill), + assert_messages_order(AllMsgs, ReceivedNoWill), + %% THEN will message should be received + ?assert(IsWill), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assertReceive({'EXIT', CPid2, normal}), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_willmsg_clean_session(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_1">>}, + {will_qos, 1} + ], + WillOptsClean = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, true}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_2">>}, + {will_qos, 1} + ], + + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_1">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), + {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), + %% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">> + ?assert(IsWill1), + ?assertNot(IsWill2), + emqtt:stop(CPid2), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_clean_session_with_delayed_willmsg(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + %% mqttv5 only + {will_props, #{'Will-Delay-Interval' => 10}} + ], + WillOptsClean = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, true}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_2">>}, + {will_qos, 1} + ], + + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> and delay-interval 10s + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), + {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), + %% THEN: payload <<"willpayload_delay10">> should be published without delay + ?assert(IsWill1), + ?assertNot(IsWill2), + emqtt:stop(CPid2), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +t_no_takeover_with_delayed_willmsg(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay3">>}, + {will_qos, 1}, + %secs + {will_props, #{'Will-Delay-Interval' => 3}}, + % secs + {properties, #{'Session-Expiry-Interval' => 10}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + {IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>), + ?assertNot(IsWill0), + ?assertNotEqual([], ReceivedNoWill0), + #{client := [CPidSub, CPid1]} = FCtx, + %% WHEN: client disconnects abnormally AND no reconnect after 3s. + exit(CPid1, kill), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay3">>), + ?assertNot(IsWill1), + ?assertEqual([], ReceivedNoWill1), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay (3 secs). + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>), + ?assertEqual([], ReceivedNoWill11), + ?assert(IsWill11), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +t_session_expire_with_delayed_willmsg(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + assert_messages_missed(Client1Msgs, Received), + #{client := [CPidSub, CPid1]} = FCtx, + %% WHEN: client disconnects abnormally AND no reconnect after 3s. + exit(CPid1, kill), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill1), + ?assertEqual([], ReceivedNoWill1), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill2), + ?assert(IsWill12), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +%% @TODO 'Server-Keep-Alive' +%% t_no_takeover_keepalive_fired(Config) -> +%% ok. + +t_takeover_before_session_expire_willdelay0(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 0}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 0s session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(1000), + CTX + end, + [] + } + ] ++ + %% WHEN: client session is taken over within 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is published + ?assert(IsWill), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_before_session_expire(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: client session is taken over within 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + ct:pal("FCtx: ~p", [FCtx]), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: No Willmsg is published + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_session_then_normal_disconnect(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect normally. + emqtt:disconnect(CPid2, ?RC_SUCCESS), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is not published. + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + +t_takeover_session_then_abnormal_disconnect(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and will-delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect abnormally + emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is not published before session expiry + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + %% AND THEN: willmsg is published after session expiry + ?assert(IsWill1), + ?assertEqual([], ReceivedNoWill1), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + +t_takeover_session_then_abnormal_disconnect_2(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay1">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 1}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + WillOpts2 = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay2">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 0}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% GIVEN: client *reconnect* with willmsg payload <<"willpayload_delay2">> + %% and will-delay-interval 0s, session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts2]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect abnormally + emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), + Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay1">>), + %% THEN: willmsg1 of old conn is not published because will-delay-interval > 0 + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + %% THEN: willmsg1 is published because will-delay-interval is 0 + {IsWill2, _} = filter_payload(Received, <<"willpayload_delay2">>), + ?assert(IsWill2), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + +t_takeover_before_willmsg_expire(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 3}}, + {properties, #{'Session-Expiry-Interval' => 10}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and will-delay-interval 3s < session expiry 10s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: another client takeover the session with in 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + ct:pal("FCtx: ~p", [FCtx]), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published after 3s. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill11), + ?assertNot(IsWill11), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + +t_kick_session(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_kick">>}, + {will_qos, 1} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_kick">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [ + %% kick may fail (not found) without this delay + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: client is kicked with kick_session + [{fun kick_client/2, [ClientId]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), kicked), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + %% THEN: payload <<"willpayload_kick">> should be published + {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), + ?assert(IsWill), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +%% t_takover_in_cluster(_) -> +%% todo. %%-------------------------------------------------------------------- %% Commands - -start_client(Ctx, ClientId, Topic, Qos) -> - {ok, CPid} = emqtt:start_link([ - {clientid, ClientId}, - {proto_ver, v5}, - {clean_start, false} - ]), +start_client(Ctx, ClientId, Topic, Qos, Opts) -> + {ok, CPid} = emqtt:start_link([{clientid, ClientId} | Opts]), _ = erlang:spawn_link(fun() -> {ok, _} = emqtt:connect(CPid), ct:pal("CLIENT: connected ~p", [CPid]), @@ -109,6 +866,10 @@ start_client(Ctx, ClientId, Topic, Qos) -> end), Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. +kick_client(Ctx, ClientId) -> + ok = emqx_cm:kick_session(ClientId), + Ctx. + publish_msg(Ctx, Msg) -> ok = timer:sleep(rand:uniform(?SLEEP)), case emqx:publish(Msg) of @@ -157,8 +918,8 @@ assert_messages_order([Msg | Expected], Received) -> assert_messages_order(Expected, Rest) end. -messages(Offset, Cnt) -> - [emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. +messages(Topic, Offset, Cnt) -> + [emqx_message:make(ct, ?QOS_1, Topic, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. payload(I) -> % NOTE @@ -170,3 +931,29 @@ payload(I) -> emqx_utils_calendar:now_to_rfc3339(millisecond) ]) ). + +%% @doc Filter out the message with matching target payload from the list of messages. +%% return '{IsTargetFound, ListOfOtherMessages}' +%% @end +-spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) -> + {IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}. +filter_payload(List, Payload) when is_binary(Payload) -> + Filtered = [ + Msg + || #{payload := P} = Msg <- List, + P =/= Payload + ], + {length(List) =/= length(Filtered), Filtered}. + +%% @doc assert emqtt *client* process exits as expected. +assert_client_exit(Pid, v5, takenover) -> + %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3] + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}); +assert_client_exit(Pid, v3, takenover) -> + ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}); +assert_client_exit(Pid, v3, kicked) -> + ?assertReceive({'EXIT', Pid, _}); +assert_client_exit(Pid, v5, kicked) -> + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}); +assert_client_exit(Pid, _, killed) -> + ?assertReceive({'EXIT', Pid, killed}). diff --git a/changes/ce/fix-11868.en.md b/changes/ce/fix-11868.en.md new file mode 100644 index 0000000000..da0a09dff6 --- /dev/null +++ b/changes/ce/fix-11868.en.md @@ -0,0 +1,2 @@ +Fix a bug that willmsg is not published after session takeover. +