Skip to content

Commit

Permalink
Create/Destroy action and resources using rpc-call
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu committed May 31, 2019
1 parent 85ef42c commit 4963b0e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
55 changes: 39 additions & 16 deletions src/emqx_rule_engine.erl
Expand Up @@ -15,6 +15,7 @@
-module(emqx_rule_engine).

-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").

-export([ load_providers/0
, unload_providers/0
Expand All @@ -29,6 +30,12 @@
, delete_resource/1
]).

-export([ init_resource/4
, init_action/4
, clear_resource/3
, clear_action/3
]).

-type(rule() :: #rule{}).
-type(action() :: #action{}).
-type(resource() :: #resource{}).
Expand Down Expand Up @@ -92,7 +99,7 @@ re_establish_resources() ->
lists:foreach(
fun(#resource{id = ResId, config = Config, type = Type}) ->
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
init_resource(M, F, ResId, Config)
cluster_call(init_resource, [M, F, ResId, Config])
end, emqx_rule_registry:get_resources())
catch
_:Error:StackTrace ->
Expand All @@ -108,7 +115,7 @@ rebuild_rules() ->
[lists:foreach(
fun(#action_instance{id = Id, name = ActName, args = Args}) ->
{ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName),
init_action(Mod, Create, Id, with_resource_params(Args))
cluster_call(init_action, [Mod, Create, Id, with_resource_params(Args)])
end, Actions)
|| #rule{actions = Actions} <- emqx_rule_registry:get_rules()],
ok
Expand Down Expand Up @@ -192,7 +199,7 @@ delete_rule(RuleId) ->
lists:foreach(
fun(#action_instance{id = Id, name = ActName}) ->
{ok, #action{module = Mod, on_destroy = Destory}} = emqx_rule_registry:find_action(ActName),
clear_action(Mod, Destory, Id)
cluster_call(clear_action, [Mod, Destory, Id])
end, Actions),
emqx_rule_registry:remove_rule(RuleId);
not_found ->
Expand All @@ -205,7 +212,7 @@ create_resource(#{type := Type, config := Config} = Params) ->
{ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} ->
ok = emqx_rule_validator:validate_params(Config, ParamSpec),
ResId = resource_id(),
init_resource(M, F, ResId, Config),
cluster_call(init_resource, [M, F, ResId, Config]),
Resource = #resource{id = ResId,
type = Type,
config = Config,
Expand All @@ -223,8 +230,8 @@ test_resource(#{type := Type, config := Config}) ->
try
ok = emqx_rule_validator:validate_params(Config, ParamSpec),
ResId = resource_id(),
init_resource(ModC, Create, ResId, Config),
clear_resource(ModD, Destroy, ResId),
cluster_call(init_resource, [ModC, Create, ResId, Config]),
cluster_call(clear_resource, [ModD, Destroy, ResId]),
ok
catch Error:Reason ->
{error, {Error, Reason}}
Expand All @@ -240,7 +247,7 @@ delete_resource(ResId) ->
try
{ok, #resource_type{on_destroy = {ModD,Destroy}}}
= emqx_rule_registry:find_resource_type(ResType),
clear_resource(ModD, Destroy, ResId),
cluster_call(clear_resource, [ModD, Destroy, ResId]),
ok = emqx_rule_registry:remove_resource(ResId)
catch
Error:Reason ->
Expand All @@ -259,7 +266,7 @@ prepare_action({Name, Args}) ->
{ok, #action{module = Mod, on_create = Create, params_spec = ParamSpec}} ->
ok = emqx_rule_validator:validate_params(Args, ParamSpec),
ActionInstId = action_instance_id(Name),
init_action(Mod, Create, ActionInstId, with_resource_params(Args)),
cluster_call(init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]),
#action_instance{id = ActionInstId, name = Name, args = Args};
not_found ->
throw({action_not_found, Name})
Expand Down Expand Up @@ -300,13 +307,27 @@ gen_id(Prefix, TestFun) ->
action_instance_id(ActionName) ->
iolist_to_binary([atom_to_list(ActionName), integer_to_list(erlang:system_time())]).

cluster_call(Func, Args) ->
case rpc:multicall([node() | nodes()], ?MODULE, Func, Args, 5000) of
{ResL, []} ->
case lists:filter(fun(ok) -> false; (_) -> true end, ResL) of
[] -> ok;
ErrL ->
?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
throw({cluster_call_failed, ErrL})
end;
{ResL, BadNodes} ->
?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
throw({cluster_call_failed, {nodes_not_exist, BadNodes}})
end.

init_resource(Module, OnCreate, ResId, Config) ->
Params = ?RAISE(Module:OnCreate(ResId, Config),
{init_resource_failure, {{Module, OnCreate}, _REASON_}}),
{{init_resource_failure, node()}, {{Module, OnCreate}, _REASON_}}),
emqx_rule_registry:add_resource_params(#resource_params{id = ResId, params = Params}).

init_action(Module, OnCreate, ActionInstId, Params) ->
case ?RAISE(Module:OnCreate(Params), {init_action_failure, {{Module,OnCreate},_REASON_}}) of
case ?RAISE(Module:OnCreate(Params), {{init_action_failure, node()}, {{Module,OnCreate},_REASON_}}) of
{Apply, NewParams} ->
ok = emqx_rule_registry:add_action_instance_params(
#action_instance_params{id = ActionInstId, params = NewParams, apply = Apply});
Expand All @@ -315,22 +336,24 @@ init_action(Module, OnCreate, ActionInstId, Params) ->
#action_instance_params{id = ActionInstId, params = Params, apply = Apply})
end.

clear_resource(_Module, undefined, _ResId) ->
clear_resource(_Module, undefined, ResId) ->
ok = emqx_rule_registry:remove_resource_params(ResId),
ok;
clear_resource(Module, Destroy, ResId) ->
ok = emqx_rule_registry:remove_resource_params(ResId),
case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource_params{params = Params}} ->
?RAISE(Module:Destroy(ResId, Params),
{destroy_resource_failure, {{Module, Destroy}, _REASON_}});
{{destroy_resource_failure, node()}, {{Module, Destroy}, _REASON_}});
not_found ->
ok
end.

clear_action(_Module, undefined, ActionInstId) ->
ok = emqx_rule_registry:delete_action_instance_params(ActionInstId);
ok = emqx_rule_registry:remove_action_instance_params(ActionInstId);
clear_action(Module, Destroy, ActionInstId) ->
{ok, #action_instance_params{params = Params}}
= emqx_rule_registry:get_action_instance_params(ActionInstId),
ok = emqx_rule_registry:delete_action_instance_params(ActionInstId),
?RAISE(Module:Destroy(ActionInstId, Params),
{destroy_action_failure, {{Module, Destroy}, _REASON_}}).
ok = emqx_rule_registry:remove_action_instance_params(ActionInstId),
?RAISE(Module:Destroy(Params),
{{destroy_action_failure, node()}, {{Module, Destroy}, _REASON_}}).
2 changes: 1 addition & 1 deletion src/emqx_rule_engine_api.erl
Expand Up @@ -394,7 +394,7 @@ rule_sql_test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
apply = feedback_action()}),
emqx_rule_runtime:apply_rule(Rule, FullContext)
after
ok = emqx_rule_registry:delete_action_instance_params(ActInstId)
ok = emqx_rule_registry:remove_action_instance_params(ActInstId)
end,
wait_feedback();
Error -> error(Error)
Expand Down
15 changes: 10 additions & 5 deletions src/emqx_rule_registry.erl
Expand Up @@ -43,7 +43,7 @@
, remove_actions_of/1
, add_action_instance_params/1
, get_action_instance_params/1
, delete_action_instance_params/1
, remove_action_instance_params/1
]).

%% Resource Management
Expand All @@ -54,6 +54,7 @@
, find_resource_params/1
, get_resources_by_type/1
, remove_resource/1
, remove_resource_params/1
]).

%% Resource Types
Expand Down Expand Up @@ -297,8 +298,8 @@ get_action_instance_params(ActionInstId) ->
end.

%% @doc Delete an action instance params.
-spec(delete_action_instance_params(action_instance_id()) -> ok).
delete_action_instance_params(ActionInstId) ->
-spec(remove_action_instance_params(action_instance_id()) -> ok).
remove_action_instance_params(ActionInstId) ->
ets:delete(?ACTION_INST_PARAMS_TAB, ActionInstId),
ok.

Expand Down Expand Up @@ -334,16 +335,20 @@ find_resource_params(Id) ->
[] -> not_found
end.

-spec(remove_resource(emqx_rule_engine:resource() | binary()) -> ok).
-spec(remove_resource(emqx_rule_engine:resource() | emqx_rule_engine:resource_id()) -> ok).
remove_resource(Resource) when is_record(Resource, resource) ->
trans(fun delete_resource/1, [Resource#resource.id]);

remove_resource(ResId) when is_binary(ResId) ->
trans(fun delete_resource/1, [ResId]).

-spec(remove_resource_params(emqx_rule_engine:resource_id()) -> ok).
remove_resource_params(ResId) ->
ets:delete(?RES_PARAMS_TAB, ResId),
ok.

%% @private
delete_resource(ResId) ->
ets:delete(?RES_PARAMS_TAB, ResId),
[[ResId =:= ResId1 andalso throw({dependency_exists, {rule, Id}})
|| #{params := #{<<"$resource">> := ResId1}} <- Actions]
|| #rule{id = Id, actions = Actions} <- get_rules()],
Expand Down

0 comments on commit 4963b0e

Please sign in to comment.