Skip to content

Commit

Permalink
Merge pull request #12326 from zmstone/0111-unregister-session-with-t…
Browse files Browse the repository at this point in the history
…imestamp

0111 unregister session with timestamp
  • Loading branch information
zmstone committed Feb 2, 2024
2 parents e0b5d9f + f0569d8 commit b1a05c7
Show file tree
Hide file tree
Showing 23 changed files with 623 additions and 65 deletions.
9 changes: 8 additions & 1 deletion apps/emqx/include/emqx_cm.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).

%% Mria/Mnesia Tables for channel management.
%% Mria table for session registration.
-define(CHAN_REG_TAB, emqx_channel_registry).

-define(T_KICK, 5_000).
Expand All @@ -32,4 +32,11 @@

-define(CM_POOL, emqx_cm_pool).

%% Registered sessions.
-record(channel, {
chid :: emqx_types:clientid() | '_',
%% pid field is extended in 5.6.0 to support recording unregistration timestamp.
pid :: pid() | non_neg_integer() | '$1'
}).

-endif.
3 changes: 2 additions & 1 deletion apps/emqx/src/emqx_cm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@
{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'},
{?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'}
]).

%% Batch drain
Expand Down
172 changes: 151 additions & 21 deletions apps/emqx/src/emqx_cm_registry.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand All @@ -19,18 +19,15 @@

-behaviour(gen_server).

-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").

-export([start_link/0]).

-export([is_enabled/0]).
-export([is_enabled/0, is_hist_enabled/0]).

-export([
register_channel/1,
unregister_channel/1
register_channel2/1,
unregister_channel/1,
unregister_channel2/1
]).

-export([lookup_channels/1]).
Expand All @@ -50,10 +47,13 @@
do_cleanup_channels/1
]).

-define(REGISTRY, ?MODULE).
-define(LOCK, {?MODULE, cleanup_down}).
-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").

-record(channel, {chid, pid}).
-define(REGISTRY, ?MODULE).
-define(NODE_DOWN_CLEANUP_LOCK, {?MODULE, cleanup_down}).

%% @doc Start the global channel registry.
-spec start_link() -> startlink_ret().
Expand All @@ -69,6 +69,11 @@ start_link() ->
is_enabled() ->
emqx:get_config([broker, enable_session_registry]).

%% @doc Is the global session registration history enabled?
-spec is_hist_enabled() -> boolean().
is_hist_enabled() ->
retain_duration() > 0.

%% @doc Register a global channel.
-spec register_channel(
emqx_types:clientid()
Expand All @@ -77,11 +82,21 @@ is_enabled() ->
register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, self()});
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
IsHistEnabled = is_hist_enabled(),
case is_enabled() of
true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
true when IsHistEnabled ->
mria:async_dirty(?CM_SHARD, fun ?MODULE:register_channel2/1, [record(ClientId, ChanPid)]);
true ->
mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false ->
ok
end.

%% @private
register_channel2(#channel{chid = ClientId} = Record) ->
_ = delete_hist_d(ClientId),
mria:dirty_write(?CHAN_REG_TAB, Record).

%% @doc Unregister a global channel.
-spec unregister_channel(
emqx_types:clientid()
Expand All @@ -90,19 +105,54 @@ register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid)
unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, self()});
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
IsHistEnabled = is_hist_enabled(),
case is_enabled() of
true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
true when IsHistEnabled ->
mria:async_dirty(?CM_SHARD, fun ?MODULE:unregister_channel2/1, [
record(ClientId, ChanPid)
]);
true ->
mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
false ->
ok
end.

%% @private
unregister_channel2(#channel{chid = ClientId} = Record) ->
mria:dirty_delete_object(?CHAN_REG_TAB, Record),
ok = insert_hist_d(ClientId).

%% @doc Lookup the global channels.
-spec lookup_channels(emqx_types:clientid()) -> list(pid()).
lookup_channels(ClientId) ->
[ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)].
lists:filtermap(
fun
(#channel{pid = ChanPid}) when is_pid(ChanPid) ->
case is_pid_down(ChanPid) of
true ->
false;
_ ->
{true, ChanPid}
end;
(_) ->
false
end,
mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
).

%% Return 'true' or 'false' if it's a local pid.
%% Otherwise return 'unknown'.
is_pid_down(Pid) when node(Pid) =:= node() ->
not erlang:is_process_alive(Pid);
is_pid_down(_) ->
unknown.

record(ClientId, ChanPid) ->
#channel{chid = ClientId, pid = ChanPid}.

hist(ClientId) ->
#channel{chid = ClientId, pid = now_ts()}.

%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -158,15 +208,95 @@ code_change(_OldVsn, State, _Extra) ->

cleanup_channels(Node) ->
global:trans(
{?LOCK, self()},
{?NODE_DOWN_CLEANUP_LOCK, self()},
fun() ->
mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node])
end
).

do_cleanup_channels(Node) ->
Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)).
Pat = [
{
#channel{pid = '$1', _ = '_'},
_Match = [{'andalso', {is_pid, '$1'}, {'==', {node, '$1'}, Node}}],
_Return = ['$_']
}
],
IsHistEnabled = is_hist_enabled(),
lists:foreach(
fun(Chan) -> delete_channel(IsHistEnabled, Chan) end,
mnesia:select(?CHAN_REG_TAB, Pat, write)
).

delete_channel(IsHistEnabled, Chan) ->
mnesia:delete_object(?CHAN_REG_TAB, Chan, write),
case IsHistEnabled of
true ->
insert_hist_t(Chan#channel.chid);
false ->
ok
end.

%%--------------------------------------------------------------------
%% History entry operations

%% Insert unregistration history in a transaction when unregistering the last channel for a clientid.
insert_hist_t(ClientId) ->
case delete_hist_t(ClientId) of
true ->
ok;
false ->
mnesia:write(?CHAN_REG_TAB, hist(ClientId), write)
end.

%% Dirty insert unregistration history.
%% Since dirty opts are used, async pool workers may race deletes and inserts,
%% so there could be more than one history records for a clientid,
%% but it should be eventually consistent after the client re-registers or the periodic cleanup.
insert_hist_d(ClientId) ->
%% delete old hist records first
case delete_hist_d(ClientId) of
true ->
ok;
false ->
mria:dirty_write(?CHAN_REG_TAB, hist(ClientId))
end.

%% Current timestamp in seconds.
now_ts() ->
erlang:system_time(seconds).

%% Delete all history records for a clientid, return true if there is a Pid found.
delete_hist_t(ClientId) ->
fold_hist(
fun(Hist) -> mnesia:delete_object(?CHAN_REG_TAB, Hist, write) end,
mnesia:read(?CHAN_REG_TAB, ClientId, write)
).

%% Delete all history records for a clientid, return true if there is a Pid found.
delete_hist_d(ClientId) ->
fold_hist(
fun(Hist) -> mria:dirty_delete_object(?CHAN_REG_TAB, Hist) end,
mnesia:dirty_read(?CHAN_REG_TAB, ClientId)
).

%% Fold over the history records, return true if there is a Pid found.
fold_hist(F, List) ->
lists:foldl(
fun(#channel{pid = Ts} = Record, HasPid) ->
case is_integer(Ts) of
true ->
ok = F(Record),
HasPid;
false ->
true
end
end,
false,
List
).

delete_channel(Chan) ->
mnesia:delete_object(?CHAN_REG_TAB, Chan, write).
%% Return the session registration history retain duration.
-spec retain_duration() -> non_neg_integer().
retain_duration() ->
emqx:get_config([broker, session_history_retain]).

0 comments on commit b1a05c7

Please sign in to comment.