Skip to content

Commit

Permalink
Fix race-conditions when re-build rules
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu committed Nov 14, 2019
1 parent 9c45213 commit af89677
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 13 deletions.
9 changes: 9 additions & 0 deletions include/rule_engine.hrl
Expand Up @@ -140,3 +140,12 @@
begin
try (_EXP_) catch _:_ -> throw(_ERROR_) end
end).

%% Tables
-define(RULE_TAB, emqx_rule).
-define(ACTION_TAB, emqx_rule_action).
-define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params).
-define(RES_TAB, emqx_resource).
-define(RES_PARAMS_TAB, emqx_resource_params).
-define(RULE_HOOKS, emqx_rule_hooks).
-define(RES_TYPE_TAB, emqx_resource_type).
5 changes: 5 additions & 0 deletions src/emqx_rule_engine_sup.erl
Expand Up @@ -18,6 +18,8 @@

-behaviour(supervisor).

-include("rule_engine.hrl").

-export([start_link/0]).

-export([init/1]).
Expand All @@ -26,6 +28,9 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
Opts = [public, named_table, set, {read_concurrency, true}],
ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
Registry = #{id => emqx_rule_registry,
start => {emqx_rule_registry, start_link, []},
restart => permanent,
Expand Down
12 changes: 0 additions & 12 deletions src/emqx_rule_registry.erl
Expand Up @@ -86,15 +86,6 @@

-define(REGISTRY, ?MODULE).

%% Tables
-define(RULE_TAB, emqx_rule).
-define(ACTION_TAB, emqx_rule_action).
-define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params).
-define(RES_TAB, emqx_resource).
-define(RES_PARAMS_TAB, emqx_resource_params).
-define(RULE_HOOKS, emqx_rule_hooks).
-define(RES_TYPE_TAB, emqx_resource_type).

%% Statistics
-define(STATS,
[ {?RULE_TAB, 'rules.count', 'rules.max'}
Expand Down Expand Up @@ -418,9 +409,6 @@ delete_resource_type(Type) ->
%%------------------------------------------------------------------------------

init([]) ->
Opts = [public, named_table, set, {read_concurrency, true}],
ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
%% Enable stats timer
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
{ok, #{}}.
Expand Down
8 changes: 7 additions & 1 deletion src/emqx_rule_runtime.erl
Expand Up @@ -142,6 +142,12 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
_:{match_conditions_error, Error} ->
?LOG(debug, "WHERE clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{select_and_collect_error, Error} ->
?LOG(debug, "FOREACH clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{match_incase_error, Error} ->
?LOG(debug, "INCASE clause exception for ~s failed: ~p",
[RuleID, Error]);
_:Error:StkTrace ->
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
[RuleID, Error, StkTrace])
Expand Down Expand Up @@ -242,7 +248,7 @@ filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
fun(Item) ->
InputAndItem = maps:merge(columns(Input), #{CollKey => Item}),
case ?RAISE(match_conditions(InCase, InputAndItem),
{match_conditions_error, _REASON_}) of
{match_incase_error, _REASON_}) of
true when DoEach == [] -> true;
true ->
{true, ?RAISE(select_and_transform(DoEach, InputAndItem),
Expand Down

0 comments on commit af89677

Please sign in to comment.