Skip to content

Commit

Permalink
Merge branch 'release-57' into sync-r57-m-20240508
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed May 9, 2024
2 parents 6e5d04e + 55f2728 commit 401f0fa
Show file tree
Hide file tree
Showing 119 changed files with 2,779 additions and 674 deletions.
5 changes: 5 additions & 0 deletions apps/emqx/include/emqx_trace.hrl
Expand Up @@ -35,6 +35,11 @@
end_at :: integer() | undefined | '_'
}).

-record(emqx_trace_format_func_data, {
function :: fun((any()) -> any()),
data :: any()
}).

-define(SHARD, ?COMMON_SHARD).
-define(MAX_SIZE, 30).

Expand Down
12 changes: 11 additions & 1 deletion apps/emqx/src/emqx_broker_helper.erl
Expand Up @@ -110,14 +110,24 @@ reclaim_seq(Topic) ->

stats_fun() ->
safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'),
safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').

safe_update_stats(undefined, _Stat, _MaxStat) ->
ok;
safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
emqx_stats:setstat(Stat, MaxStat, Val).

subscription_count() ->
NonPSCount = table_size(?SUBSCRIPTION),
PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
case is_integer(NonPSCount) of
true ->
NonPSCount + PSCount;
false ->
PSCount
end.

subscriber_val() ->
sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).

Expand Down
48 changes: 37 additions & 11 deletions apps/emqx/src/emqx_channel.erl
Expand Up @@ -1075,7 +1075,7 @@ handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) -
Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
{ok, [?REPLY_OUTGOING(Packet), ?REPLY_CLOSE(ReasonName)], Channel};
handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
{ok, {close, ReasonName}, Channel};
{ok, ?REPLY_CLOSE(ReasonName), Channel};
handle_out(auth, {ReasonCode, Properties}, Channel) ->
{ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
handle_out(Type, Data, Channel) ->
Expand Down Expand Up @@ -1406,6 +1406,16 @@ handle_timeout(
{_, Quota2} ->
{ok, clean_timer(TimerName, Channel#channel{quota = Quota2})}
end;
handle_timeout(
_TRef,
connection_expire,
#channel{conn_state = ConnState} = Channel0
) ->
Channel1 = clean_timer(connection_expire, Channel0),
case ConnState of
disconnected -> {ok, Channel1};
_ -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel1)
end;
handle_timeout(TRef, Msg, Channel) ->
case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of
[] ->
Expand Down Expand Up @@ -1810,18 +1820,23 @@ log_auth_failure(Reason) ->
%% Merge authentication result into ClientInfo
%% Authentication result may include:
%% 1. `is_superuser': The superuser flag from various backends
%% 2. `acl': ACL rules from JWT, HTTP auth backend
%% 3. `client_attrs': Extra client attributes from JWT, HTTP auth backend
%% 4. Maybe more non-standard fields used by hook callbacks
%% 2. `expire_at`: Authentication validity deadline, the client will be disconnected after this time
%% 3. `acl': ACL rules from JWT, HTTP auth backend
%% 4. `client_attrs': Extra client attributes from JWT, HTTP auth backend
%% 5. Maybe more non-standard fields used by hook callbacks
merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) ->
IsSuperuser = maps:get(is_superuser, AuthResult0, false),
AuthResult = maps:without([client_attrs], AuthResult0),
ExpireAt = maps:get(expire_at, AuthResult0, undefined),
AuthResult = maps:without([client_attrs, expire_at], AuthResult0),
Attrs0 = maps:get(client_attrs, ClientInfo, #{}),
Attrs1 = maps:get(client_attrs, AuthResult0, #{}),
Attrs = maps:merge(Attrs0, Attrs1),
NewClientInfo = maps:merge(
ClientInfo#{client_attrs => Attrs},
AuthResult#{is_superuser => IsSuperuser}
AuthResult#{
is_superuser => IsSuperuser,
auth_expire_at => ExpireAt
}
),
fix_mountpoint(NewClientInfo).

Expand Down Expand Up @@ -2228,10 +2243,16 @@ ensure_connected(
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
schedule_connection_expire(Channel#channel{
conninfo = trim_conninfo(NConnInfo),
conn_state = connected
}.
}).

schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := undefined}}) ->
Channel;
schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := ExpireAt}}) ->
Interval = max(0, ExpireAt - erlang:system_time(millisecond)),
ensure_timer(connection_expire, Interval, Channel).

trim_conninfo(ConnInfo) ->
maps:without(
Expand Down Expand Up @@ -2615,10 +2636,15 @@ disconnect_and_shutdown(
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
disconnect_and_shutdown(Reason, Reply, Channel) ->
%% mqtt v3/v4 connected sessions
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, NChannel).
shutdown(Reason, Reply, NChannel);
%% other conn_state sessions
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).

-compile({inline, [sp/1, flag/1]}).
sp(true) -> 1;
Expand Down
4 changes: 3 additions & 1 deletion apps/emqx/src/emqx_cm_sup.erl
Expand Up @@ -53,6 +53,7 @@ init([]) ->
RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
DSSessionBookkeeper = child_spec(emqx_persistent_session_bookkeeper, 5_000, worker),
Children =
[
Banned,
Expand All @@ -62,7 +63,8 @@ init([]) ->
Registry,
RegistryKeeper,
Manager,
DSSessionGCSup
DSSessionGCSup,
DSSessionBookkeeper
],
{ok, {SupFlags, Children}}.

Expand Down
4 changes: 2 additions & 2 deletions apps/emqx/src/emqx_listeners.erl
Expand Up @@ -1036,8 +1036,8 @@ to_quicer_listener_opts(Opts) ->
SSLOpts = maps:from_list(ssl_opts(Opts)),
Opts1 = maps:filter(
fun
(cacertfile, undefined) -> fasle;
(password, undefined) -> fasle;
(cacertfile, undefined) -> false;
(password, undefined) -> false;
(_, _) -> true
end,
Opts
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_logger_jsonfmt.erl
Expand Up @@ -229,7 +229,7 @@ best_effort_json_obj(Map, Config) ->
do_format_msg("~p", [Map], Config)
end.

json(A, _) when is_atom(A) -> atom_to_binary(A, utf8);
json(A, _) when is_atom(A) -> A;
json(I, _) when is_integer(I) -> I;
json(F, _) when is_float(F) -> F;
json(P, C) when is_pid(P) -> json(pid_to_list(P), C);
Expand Down
107 changes: 107 additions & 0 deletions apps/emqx/src/emqx_persistent_session_bookkeeper.erl
@@ -0,0 +1,107 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_persistent_session_bookkeeper).

-behaviour(gen_server).

%% API
-export([
start_link/0,
get_subscription_count/0
]).

%% `gen_server' API
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2
]).

%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------

%% call/cast/info events
-record(tally_subs, {}).
-record(get_subscription_count, {}).

%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------

-spec start_link() -> gen_server:start_ret().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []).

%% @doc Gets a cached view of the cluster-global count of persistent subscriptions.
-spec get_subscription_count() -> non_neg_integer().
get_subscription_count() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
gen_server:call(?MODULE, #get_subscription_count{}, infinity);
false ->
0
end.

%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------

init(_Opts) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
State = #{subs_count => 0},
{ok, State, {continue, #tally_subs{}}};
false ->
ignore
end.

handle_continue(#tally_subs{}, State0) ->
State = tally_persistent_subscriptions(State0),
ensure_subs_tally_timer(),
{noreply, State}.

handle_call(#get_subscription_count{}, _From, State) ->
#{subs_count := N} = State,
{reply, N, State};
handle_call(_Call, _From, State) ->
{reply, {error, bad_call}, State}.

handle_cast(_Cast, State) ->
{noreply, State}.

handle_info(#tally_subs{}, State0) ->
State = tally_persistent_subscriptions(State0),
ensure_subs_tally_timer(),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.

%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

tally_persistent_subscriptions(State0) ->
N = emqx_persistent_session_ds_state:total_subscription_count(),
State0#{subs_count := N}.

ensure_subs_tally_timer() ->
Timeout = emqx_config:get([session_persistence, subscription_count_refresh_interval]),
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
ok.
28 changes: 21 additions & 7 deletions apps/emqx/src/emqx_persistent_session_ds.erl
Expand Up @@ -658,16 +658,17 @@ replay_batch(Srs0, Session0, ClientInfo) ->
%%--------------------------------------------------------------------

-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{s := S0}, ConnInfo) ->
S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0),
S2 =
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
S1 = maybe_set_offline_info(S0, Id),
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
S3 =
case ConnInfo of
#{expiry_interval := EI} when is_number(EI) ->
emqx_persistent_session_ds_state:set_expiry_interval(EI, S1);
emqx_persistent_session_ds_state:set_expiry_interval(EI, S2);
_ ->
S1
S2
end,
S = emqx_persistent_session_ds_state:commit(S2),
S = emqx_persistent_session_ds_state:commit(S3),
{shutdown, Session#{s => S}}.

-spec terminate(Reason :: term(), session()) -> ok.
Expand Down Expand Up @@ -702,7 +703,7 @@ list_client_subscriptions(ClientId) ->
maps:fold(
fun(Topic, #{current_state := CS}, Acc) ->
#{subopts := SubOpts} = maps:get(CS, SStates),
Elem = {Topic, SubOpts},
Elem = {Topic, SubOpts#{durable => true}},
[Elem | Acc]
end,
[],
Expand Down Expand Up @@ -1175,6 +1176,19 @@ try_get_live_session(ClientId) ->
not_found
end.

-spec maybe_set_offline_info(emqx_persistent_session_ds_state:t(), emqx_types:clientid()) ->
emqx_persistent_session_ds_state:t().
maybe_set_offline_info(S, Id) ->
case emqx_cm:lookup_client({clientid, Id}) of
[{_Key, ChannelInfo, Stats}] ->
emqx_persistent_session_ds_state:set_offline_info(
#{chan_info => ChannelInfo, stats => Stats},
S
);
_ ->
S
end.

%%--------------------------------------------------------------------
%% SeqNo tracking
%% --------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions apps/emqx/src/emqx_persistent_session_ds.hrl
Expand Up @@ -81,5 +81,6 @@
-define(will_message, will_message).
-define(clientinfo, clientinfo).
-define(protocol, protocol).
-define(offline_info, offline_info).

-endif.
12 changes: 12 additions & 0 deletions apps/emqx/src/emqx_persistent_session_ds_state.erl
Expand Up @@ -35,6 +35,7 @@
-export([get_expiry_interval/1, set_expiry_interval/2]).
-export([get_clientinfo/1, set_clientinfo/2]).
-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
-export([set_offline_info/2]).
-export([get_peername/1, set_peername/2]).
-export([get_protocol/1, set_protocol/2]).
-export([new_id/1]).
Expand All @@ -53,6 +54,7 @@
cold_get_subscription/2,
fold_subscriptions/3,
n_subscriptions/1,
total_subscription_count/0,
put_subscription/3,
del_subscription/2
]).
Expand Down Expand Up @@ -372,6 +374,10 @@ clear_will_message_now(SessionId) when is_binary(SessionId) ->
clear_will_message(Rec) ->
set_will_message(undefined, Rec).

-spec set_offline_info(_Info :: map(), t()) -> t().
set_offline_info(Info, Rec) ->
set_meta(?offline_info, Info, Rec).

-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
new_id(Rec) ->
LastId =
Expand Down Expand Up @@ -401,6 +407,12 @@ fold_subscriptions(Fun, Acc, Rec) ->
n_subscriptions(Rec) ->
gen_size(?subscriptions, Rec).

-spec total_subscription_count() -> non_neg_integer().
total_subscription_count() ->
mria:async_dirty(?DS_MRIA_SHARD, fun() ->
mnesia:foldl(fun(#kv{}, Acc) -> Acc + 1 end, 0, ?subscription_tab)
end).

-spec put_subscription(
emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds_subs:subscription(),
Expand Down
12 changes: 11 additions & 1 deletion apps/emqx/src/emqx_router_helper.erl
Expand Up @@ -189,7 +189,17 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------

stats_fun() ->
emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)).
PSRouteCount = persistent_route_count(),
NonPSRouteCount = emqx_router:stats(n_routes),
emqx_stats:setstat('topics.count', 'topics.max', PSRouteCount + NonPSRouteCount).

persistent_route_count() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
emqx_persistent_session_ds_router:stats(n_routes);
false ->
0
end.

cleanup_routes(Node) ->
emqx_router:cleanup_routes(Node).

0 comments on commit 401f0fa

Please sign in to comment.