Skip to content

Commit

Permalink
Merge branch 'shutdown-policy' into emqx30
Browse files Browse the repository at this point in the history
  • Loading branch information
emqplus committed Sep 21, 2018
2 parents 721f237 + 71c556b commit a4cbe4d
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 15 deletions.
24 changes: 22 additions & 2 deletions priv/emqx.schema
Expand Up @@ -828,10 +828,21 @@ end}.
%% messages | bytes passed through.
%% Numbers delimited by `|'. Zero or negative is to disable.
{mapping, "zone.$name.force_gc_policy", "emqx.zones", [
{default, "0|0"},
{default, "0 | 0MB"},
{datatype, string}
]}.

%% @doc Max message queue length and total heap size to force shutdown
%% connection/session process.
%% Message queue here is the Erlang process mailbox, but not the number
%% of queued MQTT messages of QoS 1 and 2.
%% Total heap size is the in Erlang 'words' not in 'bytes'.
%% Zero or negative is to disable.
{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
{default, "0 | 0MB"},
{datatype, string}
]}.

{translation, "emqx.zones", fun(Conf) ->
Mapping = fun("retain_available", Val) ->
{mqtt_retain_available, Val};
Expand All @@ -843,6 +854,16 @@ end}.
[Count, Bytes] = string:tokens(Val, "| "),
{force_gc_policy, #{count => list_to_integer(Count),
bytes => list_to_integer(Bytes)}};
("force_shutdown_policy", Val) ->
[Len, Siz] = string:tokens(Val, "| "),
ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of
{error, Reason} ->
error(Reason);
Siz1 ->
#{message_queue_len => list_to_integer(Len),
total_heap_size => Siz1}
end,
{force_shutdown_policy, ShutdownPolicy};
(Opt, Val) ->
{list_to_atom(Opt), Val}
end,
Expand Down Expand Up @@ -1763,4 +1784,3 @@ end}.
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
end}.


16 changes: 13 additions & 3 deletions src/emqx_connection.erl
Expand Up @@ -152,6 +152,7 @@ init([Transport, RawSocket, Options]) ->
}),
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
ok = emqx_gc:init(GcPolicy),
erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
State, self(), IdleTimout);
{error, Reason} ->
Expand Down Expand Up @@ -214,9 +215,18 @@ handle_info({timeout, Timer, emit_stats},
proto_state = ProtoState
}) ->
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
ok = emqx_gc:reset(),
{noreply, State#state{stats_timer = undefined}, hibernate};

NewState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy),
case emqx_misc:conn_proc_mng_policy(Limits) of
continue ->
{noreply, NewState};
hibernate ->
ok = emqx_gc:reset(),
{noreply, NewState, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason], NewState),
shutdown(Reason, NewState)
end;
handle_info(timeout, State) ->
shutdown(idle_timeout, State);

Expand Down
58 changes: 52 additions & 6 deletions src/emqx_misc.erl
Expand Up @@ -15,7 +15,7 @@
-module(emqx_misc).

-export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
proc_name/2, proc_stats/0, proc_stats/1]).
proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]).

%% @doc Merge options
-spec(merge_opts(list(), list()) -> list()).
Expand All @@ -36,14 +36,13 @@ start_timer(Interval, Dest, Msg) ->
erlang:start_timer(Interval, Dest, Msg).

-spec(cancel_timer(undefined | reference()) -> ok).
cancel_timer(undefined) ->
ok;
cancel_timer(Timer) ->
case catch erlang:cancel_timer(Timer) of
cancel_timer(Timer) when is_reference(Timer) ->
case erlang:cancel_timer(Timer) of
false ->
receive {timeout, Timer, _} -> ok after 0 -> ok end;
_ -> ok
end.
end;
cancel_timer(_) -> ok.

-spec(proc_name(atom(), pos_integer()) -> atom()).
proc_name(Mod, Id) ->
Expand All @@ -59,3 +58,50 @@ proc_stats(Pid) ->
{value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats),
[{mailbox_len, V} | Stats1].

-define(DISABLED, 0).

%% @doc Check self() process status against connection/session process management policy,
%% return `continue | hibernate | {shutdown, Reason}' accordingly.
%% `continue': There is nothing out of the ordinary.
%% `hibernate': Nothing to process in my mailbox, and since this check is triggered
%% by a timer, we assume it is a fat chance to continue idel, hence hibernate.
%% `shutdown': Some numbers (message queue length or heap size have hit the limit),
%% hence shutdown for greater good (system stability).
-spec(conn_proc_mng_policy(#{message_queue_len := integer(),
total_heap_size := integer()
} | undefined) -> continue | hibernate | {shutdown, _}).
conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen,
total_heap_size := MaxTotalHeapSize
}) ->
Qlength = proc_info(message_queue_len),
Checks =
[{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end,
{shutdown, message_queue_too_long}},
{fun() -> is_heap_size_too_large(MaxTotalHeapSize) end,
{shutdown, total_heap_size_too_large}},
{fun() -> Qlength > 0 end, continue},
{fun() -> true end, hibernate}
],
check(Checks);
conn_proc_mng_policy(_) ->
%% disable by default
conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}).

check([{Pred, Result} | Rest]) ->
case Pred() of
true -> Result;
false -> check(Rest)
end.

is_message_queue_too_long(Qlength, Max) ->
is_enabled(Max) andalso Qlength > Max.

is_heap_size_too_large(Max) ->
is_enabled(Max) andalso proc_info(total_heap_size) > Max.

is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.

proc_info(Key) ->
{Key, Value} = erlang:process_info(self(), Key),
Value.

16 changes: 13 additions & 3 deletions src/emqx_session.erl
Expand Up @@ -357,6 +357,7 @@ init([Parent, #{zone := Zone,
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
ok = emqx_gc:init(GcPolicy),
erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
ok = proc_lib:init_ack(Parent, {ok, self()}),
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).

Expand Down Expand Up @@ -574,9 +575,18 @@ handle_info({timeout, Timer, emit_stats},
State = #state{client_id = ClientId,
stats_timer = Timer}) ->
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
{noreply, State#state{stats_timer = undefined}, hibernate};

NewState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy),
case emqx_misc:conn_proc_mng_policy(Limits) of
continue ->
{noreply, NewState};
hibernate ->
ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
{noreply, NewState, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason], NewState),
shutdown(Reason, NewState)
end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
?LOG(info, "expired, shutdown now:(", [], State),
shutdown(expired, State);
Expand Down
46 changes: 46 additions & 0 deletions test/emqx_misc_tests.erl
@@ -0,0 +1,46 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(emqx_misc_tests).
-include_lib("eunit/include/eunit.hrl").

timer_cancel_flush_test() ->
Timer = emqx_misc:start_timer(0, foo),
ok = emqx_misc:cancel_timer(Timer),
receive {timeout, Timer, foo} -> error(unexpected)
after 0 -> ok
end.

shutdown_disabled_test() ->
self() ! foo,
?assertEqual(continue, conn_proc_mng_policy(0, 0)),
receive foo -> ok end,
?assertEqual(hibernate, conn_proc_mng_policy(0, 0)).

message_queue_too_long_test() ->
self() ! foo,
self() ! bar,
?assertEqual({shutdown, message_queue_too_long},
conn_proc_mng_policy(1, 0)),
receive foo -> ok end,
?assertEqual(continue, conn_proc_mng_policy(1, 0)),
receive bar -> ok end.

total_heap_size_too_large_test() ->
?assertEqual({shutdown, total_heap_size_too_large},
conn_proc_mng_policy(0, 1)).

conn_proc_mng_policy(L, S) ->
emqx_misc:conn_proc_mng_policy(#{message_queue_len => L,
total_heap_size => S}).
2 changes: 1 addition & 1 deletion test/emqx_session_SUITE.erl
Expand Up @@ -25,7 +25,7 @@ all() -> [t_session_all].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.

end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().

Expand Down

0 comments on commit a4cbe4d

Please sign in to comment.