Skip to content

Commit

Permalink
Fix Message-Expiry-Interval not work (#2791)
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu authored and turtleDeng committed Aug 16, 2019
1 parent ffef64a commit 31671f5
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
8 changes: 7 additions & 1 deletion src/emqx_session.erl
Expand Up @@ -1020,7 +1020,13 @@ drain_q(Cnt, Msgs, Q) ->
case emqx_mqueue:out(Q) of case emqx_mqueue:out(Q) of
{empty, _Q} -> {Msgs, Q}; {empty, _Q} -> {Msgs, Q};
{{value, Msg}, Q1} -> {{value, Msg}, Q1} ->
drain_q(Cnt-1, [Msg|Msgs], Q1) case emqx_message:is_expired(Msg) of
true ->
ok = emqx_metrics:inc('messages.expired'),
drain_q(Cnt-1, Msgs, Q1);
false ->
drain_q(Cnt-1, [Msg|Msgs], Q1)
end
end. end.


%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
Expand Down
62 changes: 61 additions & 1 deletion test/emqx_session_SUITE.erl
Expand Up @@ -21,7 +21,7 @@


-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").


all() -> [ignore_loop, t_session_all]. all() -> [ignore_loop, t_session_all, t_message_expiry_interval_1, t_message_expiry_interval_2].


init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
Expand Down Expand Up @@ -66,3 +66,63 @@ t_session_all(_) ->
timer:sleep(200), timer:sleep(200),
[] = emqx:subscriptions(SPid), [] = emqx:subscriptions(SPid),
emqx_mock_client:close_session(ConnPid). emqx_mock_client:close_session(ConnPid).

t_message_expiry_interval_1(_) ->
ClientA = message_expiry_interval_init(),
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]].

t_message_expiry_interval_2(_) ->
ClientA = message_expiry_interval_init(),
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].

message_expiry_interval_init() ->
{ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqx_client:connect(ClientA),
{ok, _} = emqx_client:connect(ClientB),
%% subscribe and disconnect client-b
emqx_client:subscribe(ClientB, <<"t/a">>, 1),
emqx_client:stop(ClientB),
ClientA.

message_expiry_interval_exipred(ClientA, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a and waiting for the message expired
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
ct:sleep(1000),

%% resume the session for client-b
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqx_client:connect(ClientB1),

%% verify client-b could not receive the publish message
receive
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
ct:fail(should_have_expired)
after 300 ->
ok
end,
emqx_client:stop(ClientB1).

message_expiry_interval_not_exipred(ClientA, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),

%% wait for 1s and then resume the session for client-b, the message should not expires
%% as Message-Expiry-Interval = 20s
ct:sleep(1000),
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqx_client:connect(ClientB1),

%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
receive
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
when MsgExpItvl < 20 -> ok;
{publish, _} = Msg ->
ct:fail({incorrect_publish, Msg})
after 300 ->
ct:fail(no_publish_received)
end,
emqx_client:stop(ClientB1).

0 comments on commit 31671f5

Please sign in to comment.