Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitors and alarm handler #2266

Merged
merged 9 commits into from Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Expand Up @@ -37,7 +37,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_os_mon emqx_vm_mon \
emqx_alarm_handler

CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
Expand Down
57 changes: 57 additions & 0 deletions etc/emqx.conf
Expand Up @@ -2044,4 +2044,61 @@ sysmon.busy_port = false
## Value: true | false
sysmon.busy_dist_port = true

## The time interval for the periodic cpu check
##
## Value: Duration
## -h: hour, e.g. '2h' for 2 hours
## -m: minute, e.g. '5m' for 5 minutes
## -s: second, e.g. '30s' for 30 seconds
##
## Default: 60s
os_mon.cpu_check_interval = 60s

## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is set.
##
## Default: 80%
os_mon.cpu_high_watermark = 80%

## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is clear.
##
## Default: 60%
os_mon.cpu_low_watermark = 60%

## The time interval for the periodic memory check
##
## Value: Duration
## -h: hour, e.g. '2h' for 2 hours
## -m: minute, e.g. '5m' for 5 minutes
## -s: second, e.g. '30s' for 30 seconds
##
## Default: 60s
os_mon.mem_check_interval = 60s

## The threshold, as percentage of system memory, for how much system memory can be allocated before the corresponding alarm is set.
##
## Default: 70%
os_mon.sysmem_high_watermark = 70%

## The threshold, as percentage of system memory, for how much system memory can be allocated by one Erlang process before the corresponding alarm is set.
##
## Default: 5%
os_mon.procmem_high_watermark = 5%

## The time interval for the periodic process limit check
##
## Value: Duration
##
## Default: 30s
vm_mon.check_interval = 30s

## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is set.
##
## Default: 80%
vm_mon.process_high_watermark = 80%

## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is clear.
##
## Default: 60%
vm_mon.process_low_watermark = 60%

{{ additional_configs }}
2 changes: 1 addition & 1 deletion include/emqx.hrl
Expand Up @@ -113,7 +113,7 @@
%% Alarm
%%--------------------------------------------------------------------

-record(alarm, {
-record(alarm, {
id :: binary(),
severity :: notice | warning | error | critical,
title :: iolist(),
Expand Down
67 changes: 67 additions & 0 deletions priv/emqx.schema
Expand Up @@ -1840,3 +1840,70 @@ end}.
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
end}.

%%--------------------------------------------------------------------
%% Operating System Monitor
%%--------------------------------------------------------------------

{mapping, "os_mon.cpu_check_interval", "emqx.os_mon", [
{default, 60},
{datatype, {duration, s}}
]}.

{mapping, "os_mon.cpu_high_watermark", "emqx.os_mon", [
{default, "80%"},
{datatype, {percent, float}}
]}.

{mapping, "os_mon.cpu_low_watermark", "emqx.os_mon", [
{default, "60%"},
{datatype, {percent, float}}
]}.

{mapping, "os_mon.mem_check_interval", "emqx.os_mon", [
{default, 60},
{datatype, {duration, s}}
]}.

{mapping, "os_mon.sysmem_high_watermark", "emqx.os_mon", [
{default, "70%"},
{datatype, {percent, float}}
]}.

{mapping, "os_mon.procmem_high_watermark", "emqx.os_mon", [
{default, "5%"},
{datatype, {percent, float}}
]}.

{translation, "emqx.os_mon", fun(Conf) ->
[{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)},
{cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)},
{cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)},
{mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)},
{sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)},
{procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}]
end}.

%%--------------------------------------------------------------------
%% VM Monitor
%%--------------------------------------------------------------------
{mapping, "vm_mon.check_interval", "emqx.vm_mon", [
{default, 30},
{datatype, {duration, s}}
]}.

{mapping, "vm_mon.process_high_watermark", "emqx.vm_mon", [
{default, "80%"},
{datatype, {percent, float}}
]}.

{mapping, "vm_mon.process_low_watermark", "emqx.vm_mon", [
{default, "60%"},
{datatype, {percent, float}}
]}.

{translation, "emqx.vm_mon", fun(Conf) ->
[{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
{process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)},
{process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}]
end}.
2 changes: 1 addition & 1 deletion src/emqx.app.src
Expand Up @@ -4,7 +4,7 @@
{modules,[]},
{registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy,
replayq]},
replayq, sasl, os_mon]},
{env,[]},
{mod,{emqx_app,[]}},
{maintainers,["Feng Lee <feng@emqx.io>"]},
Expand Down
151 changes: 151 additions & 0 deletions src/emqx_alarm_handler.erl
@@ -0,0 +1,151 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(emqx_alarm_handler).

-behaviour(gen_event).

-include("emqx.hrl").
-include("logger.hrl").

-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2]).

-export([load/0, get_alarms/0]).

-record(common_alarm, {id, desc}).
-record(alarm_history, {id, clear_at}).

-define(ALARMS_TAB, emqx_alarms).
-define(ALARM_HISTORY_TAB, emqx_alarm_history).

%%----------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------

load() ->
gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {?MODULE, []}).

get_alarms() ->
gen_event:call(alarm_handler, ?MODULE, get_alarms).

%%----------------------------------------------------------------------
%% gen_event callbacks
%%----------------------------------------------------------------------

init({_Args, {alarm_handler, Alarms}}) ->
create_tables(),
lists:foreach(fun({Id, _Desc}) ->
set_alarm_history(Id)
end, Alarms),
{ok, []};
init(_) ->
create_tables(),
{ok, []}.

handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) ->
handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State);
handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
?LOG(notice, "Alarm report: set ~p", [Alarm]),
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
set_alarm_(AlarmId, AlarmDesc),
{ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(notice, "Alarm report: clear ~p", [AlarmId]),
emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)),
clear_alarm_(AlarmId),
{ok, State};
handle_event(_, State) ->
{ok, State}.

handle_info(_, State) -> {ok, State}.

handle_call(get_alarms, State) ->
{ok, get_alarms_(), State};
handle_call(_Query, State) -> {ok, {error, bad_query}, State}.

terminate(swap, _State) ->
{emqx_alarm_handler, get_alarms_()};
terminate(_, _) ->
ok.

%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------

create_tables() ->
ok = ekka_mnesia:create_table(?ALARMS_TAB, [
tigercl marked this conversation as resolved.
Show resolved Hide resolved
{type, set},
{disc_copies, [node()]},
{local_content, true},
{record_name, common_alarm},
{attributes, record_info(fields, common_alarm)}]),
ok = ekka_mnesia:create_table(?ALARM_HISTORY_TAB, [
{type, set},
{disc_copies, [node()]},
{local_content, true},
{record_name, alarm_history},
{attributes, record_info(fields, alarm_history)}]).
tigercl marked this conversation as resolved.
Show resolved Hide resolved

encode_alarm({AlarmId, #alarm{severity = Severity,
title = Title,
summary = Summary,
timestamp = Ts}}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
{desc, [{severity, Severity},
{title, iolist_to_binary(Title)},
{summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(Ts)}]}]);
encode_alarm({AlarmId, AlarmDesc}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
{desc, maybe_to_binary(AlarmDesc)}]).

alarm_msg(Topic, Payload) ->
Msg = emqx_message:make(?MODULE, Topic, Payload),
emqx_message:set_headers(#{'Content-Type' => <<"application/json">>},
emqx_message:set_flag(sys, Msg)).

topic(alert, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
topic(clear, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>).

maybe_to_binary(Data) when is_binary(Data) ->
Data;
maybe_to_binary(Data) ->
iolist_to_binary(io_lib:format("~p", [Data])).

set_alarm_(Id, Desc) ->
ok = mnesia:dirty_write(?ALARMS_TAB, #common_alarm{id = Id, desc = Desc}).

clear_alarm_(Id) ->
ok = mnesia:dirty_delete(?ALARMS_TAB, Id),
set_alarm_history(Id).

get_alarms_() ->
Alarms = ets:tab2list(?ALARMS_TAB),
lists:foldr(fun(#common_alarm{id = Id, desc = Desc}, Acc) ->
Acc ++ [{Id, Desc}];
(_, Acc) -> Acc
end, [], Alarms).

tigercl marked this conversation as resolved.
Show resolved Hide resolved
set_alarm_history(Id) ->
ok = mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id,
clear_at = undefined}).