diff --git a/include/rule_engine.hrl b/include/rule_engine.hrl index 73ec2895d..575bcce44 100644 --- a/include/rule_engine.hrl +++ b/include/rule_engine.hrl @@ -140,3 +140,12 @@ begin try (_EXP_) catch _:_ -> throw(_ERROR_) end end). + +%% Tables +-define(RULE_TAB, emqx_rule). +-define(ACTION_TAB, emqx_rule_action). +-define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params). +-define(RES_TAB, emqx_resource). +-define(RES_PARAMS_TAB, emqx_resource_params). +-define(RULE_HOOKS, emqx_rule_hooks). +-define(RES_TYPE_TAB, emqx_resource_type). diff --git a/src/emqx_rule_engine_sup.erl b/src/emqx_rule_engine_sup.erl index c3a78cce3..3a51a5257 100644 --- a/src/emqx_rule_engine_sup.erl +++ b/src/emqx_rule_engine_sup.erl @@ -18,6 +18,8 @@ -behaviour(supervisor). +-include("rule_engine.hrl"). + -export([start_link/0]). -export([init/1]). @@ -26,6 +28,9 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + Opts = [public, named_table, set, {read_concurrency, true}], + ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), + ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), Registry = #{id => emqx_rule_registry, start => {emqx_rule_registry, start_link, []}, restart => permanent, diff --git a/src/emqx_rule_registry.erl b/src/emqx_rule_registry.erl index ae3c35294..d1c094ddd 100644 --- a/src/emqx_rule_registry.erl +++ b/src/emqx_rule_registry.erl @@ -86,15 +86,6 @@ -define(REGISTRY, ?MODULE). -%% Tables --define(RULE_TAB, emqx_rule). --define(ACTION_TAB, emqx_rule_action). --define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params). --define(RES_TAB, emqx_resource). --define(RES_PARAMS_TAB, emqx_resource_params). --define(RULE_HOOKS, emqx_rule_hooks). --define(RES_TYPE_TAB, emqx_resource_type). - %% Statistics -define(STATS, [ {?RULE_TAB, 'rules.count', 'rules.max'} @@ -418,9 +409,6 @@ delete_resource_type(Type) -> %%------------------------------------------------------------------------------ init([]) -> - Opts = [public, named_table, set, {read_concurrency, true}], - ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), - ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), %% Enable stats timer ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0), {ok, #{}}. diff --git a/src/emqx_rule_runtime.erl b/src/emqx_rule_runtime.erl index b252edae0..1be315775 100644 --- a/src/emqx_rule_runtime.erl +++ b/src/emqx_rule_runtime.erl @@ -142,6 +142,12 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) -> _:{match_conditions_error, Error} -> ?LOG(debug, "WHERE clause exception for ~s failed: ~p", [RuleID, Error]); + _:{select_and_collect_error, Error} -> + ?LOG(debug, "FOREACH clause exception for ~s failed: ~p", + [RuleID, Error]); + _:{match_incase_error, Error} -> + ?LOG(debug, "INCASE clause exception for ~s failed: ~p", + [RuleID, Error]); _:Error:StkTrace -> ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", [RuleID, Error, StkTrace]) @@ -242,7 +248,7 @@ filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) -> fun(Item) -> InputAndItem = maps:merge(columns(Input), #{CollKey => Item}), case ?RAISE(match_conditions(InCase, InputAndItem), - {match_conditions_error, _REASON_}) of + {match_incase_error, _REASON_}) of true when DoEach == [] -> true; true -> {true, ?RAISE(select_and_transform(DoEach, InputAndItem),