Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and optimize persistent session #12251

Merged
merged 19 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
181 changes: 71 additions & 110 deletions apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ init_per_testcase(TestCase, Config) when
init_per_testcase(t_session_gc = TestCase, Config) ->
Opts = #{
n => 3,
roles => [core, core, replicant],
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
roles => [core, core, core],
extra_emqx_conf =>
"\n session_persistence {"
"\n last_alive_update_interval = 500ms "
"\n session_gc_interval = 2s "
"\n session_gc_batch_size = 1 "
"\n session_gc_interval = 1s "
"\n session_gc_batch_size = 2 "
"\n }"
},
Cluster = cluster(Opts),
Expand Down Expand Up @@ -91,7 +91,7 @@ end_per_testcase(_TestCase, _Config) ->
ok.

%%------------------------------------------------------------------------------
%% Helper fns
%% Helper functions
%%------------------------------------------------------------------------------

cluster(#{n := N} = Opts) ->
Expand Down Expand Up @@ -147,9 +147,10 @@ start_client(Opts0 = #{}) ->
proto_ver => v5,
properties => #{'Session-Expiry-Interval' => 300}
},
Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
ct:pal("starting client with opts:\n ~p", [Opts]),
{ok, Client} = emqtt:start_link(Opts),
Opts = emqx_utils_maps:deep_merge(Defaults, Opts0),
?tp(notice, "starting client", Opts),
{ok, Client} = emqtt:start_link(maps:to_list(Opts)),
unlink(Client),
on_exit(fun() -> catch emqtt:stop(Client) end),
Client.

Expand All @@ -164,59 +165,27 @@ is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}})
EI > 0.

list_all_sessions(Node) ->
erpc:call(Node, emqx_persistent_session_ds, list_all_sessions, []).
erpc:call(Node, emqx_persistent_session_ds_state, list_sessions, []).

list_all_subscriptions(Node) ->
erpc:call(Node, emqx_persistent_session_ds, list_all_subscriptions, []).
Sessions = list_all_sessions(Node),
lists:flatmap(
fun(ClientId) ->
#{s := #{subscriptions := Subs}} = erpc:call(
Node, emqx_persistent_session_ds, print_session, [ClientId]
),
maps:to_list(Subs)
end,
Sessions
).

list_all_pubranges(Node) ->
erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).

prop_only_cores_run_gc(CoreNodes) ->
{"only core nodes run gc", fun(Trace) -> ?MODULE:prop_only_cores_run_gc(Trace, CoreNodes) end}.
prop_only_cores_run_gc(Trace, CoreNodes) ->
GCNodes = lists:usort([
N
|| #{
?snk_kind := K,
?snk_meta := #{node := N}
} <- Trace,
lists:member(K, [ds_session_gc, ds_session_gc_lock_taken]),
N =/= node()
]),
?assertEqual(lists:usort(CoreNodes), GCNodes).

%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------

t_non_persistent_session_subscription(_Config) ->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this testcase because it tests in-memory session. Not sure if it's relevant for this testsuite. Also, testcases that verify absence of certain trace events are unreliable on their own: if a trace event is renamed or removed, the test will always pass.

ClientId = atom_to_binary(?FUNCTION_NAME),
SubTopicFilter = <<"t/#">>,
?check_trace(
#{timetrap => 30_000},
begin
?tp(notice, "starting", #{}),
Client = start_client(#{
clientid => ClientId,
properties => #{'Session-Expiry-Interval' => 0}
}),
{ok, _} = emqtt:connect(Client),
?tp(notice, "subscribing", #{}),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),

ok = emqtt:stop(Client),

ok
end,
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
?assertEqual([], ?of_kind(ds_session_subscription_added, Trace)),
ok
end
),
ok.

t_session_subscription_idempotency(Config) ->
[Node1Spec | _] = ?config(node_specs, Config),
[Node1] = ?config(nodes, Config),
Expand Down Expand Up @@ -288,10 +257,10 @@ t_session_unsubscription_idempotency(Config) ->
?check_trace(
#{timetrap => 30_000},
begin
#{timetrap => 20_000},
?force_ordering(
#{
?snk_kind := persistent_session_ds_subscription_delete,
?snk_span := {complete, _}
?snk_kind := persistent_session_ds_subscription_delete
},
_NEvents0 = 1,
#{?snk_kind := will_restart_node},
Expand Down Expand Up @@ -409,27 +378,26 @@ do_t_session_discard(Params) ->
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
true = map_size(emqx_persistent_session_ds:list_all_streams()) > 0
#{} = emqx_persistent_session_ds_state:print_session(ClientId)
),
ok = emqtt:stop(Client0),
?tp(notice, "disconnected", #{}),

?tp(notice, "reconnecting", #{}),
%% we still have streams
?assert(map_size(emqx_persistent_session_ds:list_all_streams()) > 0),
%% we still have the session:
?assertMatch(#{}, emqx_persistent_session_ds_state:print_session(ClientId)),
Client1 = start_client(ReconnectOpts),
{ok, _} = emqtt:connect(Client1),
?assertEqual([], emqtt:subscriptions(Client1)),
case is_persistent_connect_opts(ReconnectOpts) of
true ->
?assertMatch(#{ClientId := _}, emqx_persistent_session_ds:list_all_sessions());
?assertMatch(#{}, emqx_persistent_session_ds_state:print_session(ClientId));
false ->
?assertEqual(#{}, emqx_persistent_session_ds:list_all_sessions())
?assertEqual(
undefined, emqx_persistent_session_ds_state:print_session(ClientId)
)
end,
?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()),
?assertEqual([], emqx_persistent_session_ds_router:topics()),
?assertEqual(#{}, emqx_persistent_session_ds:list_all_streams()),
?assertEqual(#{}, emqx_persistent_session_ds:list_all_pubranges()),
ok = emqtt:stop(Client1),
?tp(notice, "disconnected", #{}),

Expand All @@ -443,6 +411,8 @@ do_t_session_discard(Params) ->
ok.

t_session_expiration1(Config) ->
%% This testcase verifies that the properties passed in the
%% CONNECT packet are respected by the GC process:
ClientId = atom_to_binary(?FUNCTION_NAME),
Opts = #{
clientid => ClientId,
Expand All @@ -455,6 +425,9 @@ t_session_expiration1(Config) ->
do_t_session_expiration(Config, Opts).

t_session_expiration2(Config) ->
%% This testcase updates the expiry interval for the session in
%% the _DISCONNECT_ packet. This setting should be respected by GC
%% process:
ClientId = atom_to_binary(?FUNCTION_NAME),
Opts = #{
clientid => ClientId,
Expand All @@ -469,6 +442,8 @@ t_session_expiration2(Config) ->
do_t_session_expiration(Config, Opts).

do_t_session_expiration(_Config, Opts) ->
%% Sequence is a list of pairs of properties passed through the
%% CONNECT and for the DISCONNECT for each session:
#{
clientid := ClientId,
sequence := [
Expand All @@ -486,7 +461,7 @@ do_t_session_expiration(_Config, Opts) ->
Client0 = start_client(Params0),
{ok, _} = emqtt:connect(Client0),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2),
Subs0 = emqx_persistent_session_ds:list_all_subscriptions(),
#{s := #{subscriptions := Subs0}} = emqx_persistent_session_ds:print_session(ClientId),
?assertEqual(1, map_size(Subs0), #{subs => Subs0}),
Info0 = maps:from_list(emqtt:info(Client0)),
?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}),
Expand All @@ -501,7 +476,7 @@ do_t_session_expiration(_Config, Opts) ->
?assertEqual([], Subs1),
emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn),

ct:sleep(1_500),
ct:sleep(2_500),

Params2 = maps:merge(CommonParams, ThirdConn),
Client2 = start_client(Params2),
Expand All @@ -513,9 +488,9 @@ do_t_session_expiration(_Config, Opts) ->
emqtt:publish(Client2, Topic, <<"payload">>),
?assertNotReceive({publish, #{topic := Topic}}),
%% ensure subscriptions are absent from table.
?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()),
#{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId),
?assertEqual([], maps:to_list(Subs3)),
emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn),

ok
end,
[]
Expand All @@ -531,6 +506,7 @@ t_session_gc(Config) ->
Port2,
Port3
] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes),
ct:pal("Ports: ~p", [[Port1, Port2, Port3]]),
CommonParams = #{
clean_start => false,
proto_ver => v5
Expand All @@ -549,14 +525,14 @@ t_session_gc(Config) ->
?check_trace(
#{timetrap => 30_000},
begin
ClientId0 = <<"session_gc0">>,
Client0 = StartClient(ClientId0, Port1, 30),

ClientId1 = <<"session_gc1">>,
Client1 = StartClient(ClientId1, Port2, 1),
Client1 = StartClient(ClientId1, Port1, 30),

ClientId2 = <<"session_gc2">>,
Client2 = StartClient(ClientId2, Port3, 1),
Client2 = StartClient(ClientId2, Port2, 1),

ClientId3 = <<"session_gc3">>,
Client3 = StartClient(ClientId3, Port3, 1),

lists:foreach(
fun(Client) ->
Expand All @@ -566,78 +542,63 @@ t_session_gc(Config) ->
{ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1),
ok
end,
[Client0, Client1, Client2]
[Client1, Client2, Client3]
),

%% Clients are still alive; no session is garbage collected.
Res0 = ?block_until(
#{
?snk_kind := ds_session_gc,
?snk_span := {complete, _},
?snk_meta := #{node := N}
} when
N =/= node(),
3 * GCInterval + 1_000
?assertMatch(
{ok, _},
?block_until(
#{
?snk_kind := ds_session_gc,
?snk_span := {complete, _},
?snk_meta := #{node := N}
} when N =/= node()
)
),
?assertMatch({ok, _}, Res0),
{ok, #{?snk_meta := #{time := T0}}} = Res0,
Sessions0 = list_all_sessions(Node1),
Subs0 = list_all_subscriptions(Node1),
?assertEqual(3, map_size(Sessions0), #{sessions => Sessions0}),
?assertEqual(3, map_size(Subs0), #{subs => Subs0}),
?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),

%% Now we disconnect 2 of them; only those should be GC'ed.

?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client1),
#{?snk_kind := terminate},
1_000
emqtt:stop(Client2),
#{?snk_kind := terminate}
)
),
ct:pal("disconnected client1"),
?tp(notice, "disconnected client1", #{}),
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client2),
#{?snk_kind := terminate},
1_000
emqtt:stop(Client3),
#{?snk_kind := terminate}
)
),
ct:pal("disconnected client2"),
?tp(notice, "disconnected client2", #{}),
?assertMatch(
{ok, _},
?block_until(
#{
?snk_kind := ds_session_gc_cleaned,
?snk_meta := #{node := N, time := T},
session_ids := [ClientId1]
} when
N =/= node() andalso T > T0,
4 * GCInterval + 1_000
session_id := ClientId2
}
)
),
?assertMatch(
{ok, _},
?block_until(
#{
?snk_kind := ds_session_gc_cleaned,
?snk_meta := #{node := N, time := T},
session_ids := [ClientId2]
} when
N =/= node() andalso T > T0,
4 * GCInterval + 1_000
session_id := ClientId3
}
)
),
Sessions1 = list_all_sessions(Node1),
Subs1 = list_all_subscriptions(Node1),
?assertEqual(1, map_size(Sessions1), #{sessions => Sessions1}),
?assertEqual(1, map_size(Subs1), #{subs => Subs1}),

?assertMatch([ClientId1], list_all_sessions(Node1), sessions),
?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
ok
end,
[
prop_only_cores_run_gc(CoreNodes)
]
[]
),
ok.
6 changes: 4 additions & 2 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -191,7 +191,9 @@ info(topic_aliases, #channel{topic_aliases = Aliases}) ->
info(alias_maximum, #channel{alias_maximum = Limits}) ->
Limits;
info(timers, #channel{timers = Timers}) ->
Timers.
Timers;
info(session_state, #channel{session = Session}) ->
Session.

set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_cm.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%-------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down