Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0111 unregister session with timestamp #12326

Merged
merged 9 commits into from
Feb 2, 2024
9 changes: 8 additions & 1 deletion apps/emqx/include/emqx_cm.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).

%% Mria/Mnesia Tables for channel management.
%% Mria table for session registraition.
zmstone marked this conversation as resolved.
Show resolved Hide resolved
-define(CHAN_REG_TAB, emqx_channel_registry).

-define(T_KICK, 5_000).
Expand All @@ -32,4 +32,11 @@

-define(CM_POOL, emqx_cm_pool).

%% Registered sessions.
-record(channel, {
chid :: emqx_types:clientid() | '_',
%% pid field is extended in 5.6.0 to support recording unregistration timestamp.
pid :: pid() | non_neg_integer() | '$1'
}).

-endif.
3 changes: 2 additions & 1 deletion apps/emqx/src/emqx_cm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@
{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'},
{?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'}
]).

%% Batch drain
Expand Down
150 changes: 132 additions & 18 deletions apps/emqx/src/emqx_cm_registry.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,9 @@

-behaviour(gen_server).

-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").

-export([start_link/0]).

-export([is_enabled/0]).
-export([is_enabled/0, is_hist_enabled/0]).

-export([
register_channel/1,
Expand All @@ -50,10 +45,13 @@
do_cleanup_channels/1
]).

-define(REGISTRY, ?MODULE).
-define(LOCK, {?MODULE, cleanup_down}).
-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").

-record(channel, {chid, pid}).
-define(REGISTRY, ?MODULE).
-define(NODE_DOWN_CLEANUP_LOCK, {?MODULE, cleanup_down}).

%% @doc Start the global channel registry.
-spec start_link() -> startlink_ret().
Expand All @@ -69,6 +67,11 @@ start_link() ->
is_enabled() ->
emqx:get_config([broker, enable_session_registry]).

%% @doc Is the global session registration history enabled?
-spec is_hist_enabled() -> boolean().
is_hist_enabled() ->
retain_duration() > 0.

%% @doc Register a global channel.
-spec register_channel(
emqx_types:clientid()
Expand All @@ -78,8 +81,11 @@ register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, self()});
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
true ->
ok = when_hist_enabled(fun() -> delete_hist_d(ClientId) end),
mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false ->
ok
end.

%% @doc Unregister a global channel.
Expand All @@ -91,18 +97,45 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, self()});
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
true ->
mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)),
zmstone marked this conversation as resolved.
Show resolved Hide resolved
%% insert unregistration history after unregstration
ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end);
false ->
ok
end.

%% @doc Lookup the global channels.
-spec lookup_channels(emqx_types:clientid()) -> list(pid()).
lookup_channels(ClientId) ->
[ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)].
lists:filtermap(
fun
(#channel{pid = ChanPid}) when is_pid(ChanPid) ->
case is_pid_down(ChanPid) of
true ->
false;
_ ->
{true, ChanPid}
end;
(_) ->
false
end,
mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
).

%% Return 'true' or 'false' if it's a local pid.
%% Otherwise return 'unknown'.
is_pid_down(Pid) when node(Pid) =:= node() ->
not erlang:is_process_alive(Pid);
is_pid_down(_) ->
unknown.

record(ClientId, ChanPid) ->
#channel{chid = ClientId, pid = ChanPid}.

hist(ClientId) ->
#channel{chid = ClientId, pid = now_ts()}.

%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -158,15 +191,96 @@ code_change(_OldVsn, State, _Extra) ->

cleanup_channels(Node) ->
global:trans(
{?LOCK, self()},
{?NODE_DOWN_CLEANUP_LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node])
end
).

do_cleanup_channels(Node) ->
Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
Pat = [
{
#channel{pid = '$1', _ = '_'},
_Match = [{'andalso', {is_pid, '$1'}, {'==', {node, '$1'}, Node}}],
_Return = ['$_']
}
],
lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)).

delete_channel(Chan) ->
mnesia:delete_object(?CHAN_REG_TAB, Chan, write).
mnesia:delete_object(?CHAN_REG_TAB, Chan, write),
ok = when_hist_enabled(fun() -> insert_hist_t(Chan#channel.chid) end).

%%--------------------------------------------------------------------
%% History entry operations
%%--------------------------------------------------------------------

when_hist_enabled(F) ->
case is_hist_enabled() of
true ->
_ = F();
false ->
ok
end,
ok.

%% Insert unregistration history in a transaction when unregistering the last channel for a clientid.
insert_hist_t(ClientId) ->
case delete_hist_t(ClientId) of
true ->
ok;
false ->
mnesia:write(?CHAN_REG_TAB, hist(ClientId), write)
end.

%% Dirty insert unregistration history.
%% Since dirty opts are used, async pool workers may race deletes and inserts,
%% so there could be more than one history records for a clientid,
%% but it should be eventually consistent after the client re-registers or the periodic cleanup.
insert_hist_d(ClientId) ->
%% delete old hist records first
case delete_hist_d(ClientId) of
true ->
ok;
false ->
mria:dirty_write(?CHAN_REG_TAB, hist(ClientId))
end.

%% Current timestamp in seconds.
now_ts() ->
erlang:system_time(seconds).

%% Delete all history records for a clientid, return true if there is a Pid found.
delete_hist_t(ClientId) ->
fold_hist(
fun(Hist) -> mnesia:delete_object(?CHAN_REG_TAB, Hist, write) end,
mnesia:read(?CHAN_REG_TAB, ClientId, write)
).

%% Delete all history records for a clientid, return true if there is a Pid found.
delete_hist_d(ClientId) ->
fold_hist(
fun(Hist) -> mria:dirty_delete_object(?CHAN_REG_TAB, Hist) end,
mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
).

%% Fold over the history records, return true if there is a Pid found.
fold_hist(F, List) ->
lists:foldl(
fun(#channel{pid = Ts} = Record, HasPid) ->
case is_integer(Ts) of
true ->
ok = F(Record),
HasPid;
false ->
true
end
end,
false,
List
).

%% Return the session registration history retain duration.
-spec retain_duration() -> non_neg_integer().
retain_duration() ->
emqx:get_config([broker, session_history_retain]).