Skip to content

Commit

Permalink
fix: make rule cache update more reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu committed Jun 16, 2023
1 parent a2dae34 commit c1e4791
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions apps/emqx_rule_engine/src/emqx_rule_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@

-export([ update_rules_cache/0
, clear_rules_cache/0
, get_rules_from_cache/0
, update_rules_cache_locally/0
]).

%% for debug purposes
Expand Down Expand Up @@ -465,30 +467,45 @@ delete_resource_type(Type) ->
init([]) ->
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
ok = ensure_table_subscribed(),
{ok, #{}}.

handle_call({add_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
_ = ?CLUSTER_CALL(update_rules_cache, []),
%% the multicall is necessary, because the other nodes maybe running an older emqx version
%% so the table has not been subscribed
update_rules_cache_on_all_nodes(),
{reply, ok, State};

handle_call({remove_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
_ = ?CLUSTER_CALL(update_rules_cache, []),
update_rules_cache_on_all_nodes(),
{reply, ok, State};

handle_call(Req, _From, State) ->
?LOG(error, "unexpected call - ~p", [Req]),
{reply, ignored, State}.

handle_cast(update_rules_cache, State) ->
_ = update_rules_cache(),
ok = ensure_table_subscribed(),
ok = update_rules_cache(),
{noreply, State};

handle_cast(Msg, State) ->
?LOG(error, "unexpected cast ~p", [Msg]),
{noreply, State}.

handle_info({mnesia_table_event, {write, _Tab, _NewRule, _OldRules, _Tid} = Event}, State) ->
?LOG(debug, "mnesia_table_event: ~p~n", [Event]),
ok = update_rules_cache_locally(),
{noreply, State};

handle_info({mnesia_table_event, {Delete, _Tab, _What, _OldRules, _Tid} = Event}, State)
when Delete =:= delete; Delete =:= delete_object ->
?LOG(debug, "mnesia_table_event: ~p~n", [Event]),
ok = update_rules_cache_locally(),
{noreply, State};

handle_info(Info, State) ->
?LOG(error, "unexpected info ~p", [Info]),
{noreply, State}.
Expand All @@ -502,6 +519,20 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
%% Private functions
%%------------------------------------------------------------------------------
update_rules_cache_on_all_nodes() ->
ok = update_rules_cache(),
case ekka_mnesia:running_nodes() -- [node()] of
[] -> ok;
OtherNodes ->
_ = rpc:multicall(OtherNodes, ?MODULE, update_rules_cache_locally, [], 5000),
ok
end.

ensure_table_subscribed() ->
case mnesia:subscribe({table, ?RULE_TAB, detailed}) of
{error, {already_exists, _}} -> ok;
{ok, _} -> ok
end.

get_all_records(Tab) ->
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
Expand Down

0 comments on commit c1e4791

Please sign in to comment.