Skip to content
Permalink
Browse files

Re-start resources manually

  • Loading branch information...
terry-xiaoyu committed Jun 3, 2019
1 parent 222eaaf commit ccbffd7d5db514adf6cd20e8d139e73f80bc1c96
Showing with 110 additions and 54 deletions.
  1. +77 −52 src/emqx_rule_engine.erl
  2. +31 −0 src/emqx_rule_engine_api.erl
  3. +2 −2 src/emqx_rule_engine_app.erl
@@ -19,14 +19,15 @@

-export([ load_providers/0
, unload_providers/0
, re_establish_resources/0
, rebuild_rules/0
, refresh_resources/0
, refresh_rules/0
]).

-export([ create_rule/1
, delete_rule/1
, create_resource/1
, test_resource/1
, start_resource/1
, delete_resource/1
]).

@@ -89,44 +90,6 @@ load_resource_types(App) ->
ResourceTypes = find_resource_types(App),
emqx_rule_registry:register_resource_types(ResourceTypes).

%%------------------------------------------------------------------------------
%% Re-establish resources
%%------------------------------------------------------------------------------

-spec(re_establish_resources() -> ok).
re_establish_resources() ->
try
lists:foreach(
fun(#resource{id = ResId, config = Config, type = Type}) ->
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
cluster_call(init_resource, [M, F, ResId, Config])
end, emqx_rule_registry:get_resources())
catch
_:Error:StackTrace ->
logger:critical("Can not re-stablish resource: ~p,"
"Fix the issue and establish it manually.\n"
"Stacktrace: ~p",
[Error, StackTrace])
end.

-spec(rebuild_rules() -> ok).
rebuild_rules() ->
try
[lists:foreach(
fun(#action_instance{id = Id, name = ActName, args = Args}) ->
{ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName),
cluster_call(init_action, [Mod, Create, Id, with_resource_params(Args)])
end, Actions)
|| #rule{actions = Actions} <- emqx_rule_registry:get_rules()],
ok
catch
_:Error:StackTrace ->
logger:critical("Can not re-build rule: ~p,"
"Fix the issue and establish it manually.\n"
"Stacktrace: ~p",
[Error, StackTrace])
end.

-spec(find_actions(App :: atom()) -> list(action())).
find_actions(App) ->
lists:map(fun new_action/1, find_attrs(App, rule_action)).
@@ -212,30 +175,39 @@ create_resource(#{type := Type, config := Config} = Params) ->
{ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} ->
ok = emqx_rule_validator:validate_params(Config, ParamSpec),
ResId = resource_id(),
cluster_call(init_resource, [M, F, ResId, Config]),
Resource = #resource{id = ResId,
type = Type,
config = Config,
description = iolist_to_binary(maps:get(description, Params, ""))},
ok = emqx_rule_registry:add_resource(Resource),
cluster_call(init_resource, [M, F, ResId, Config]),
{ok, Resource};
not_found ->
{error, {resource_type_not_found, Type}}
end.

-spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}).
start_resource(ResId) ->
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType, config = Config}} ->
{ok, #resource_type{on_create = {Mod, Create}}}
= emqx_rule_registry:find_resource_type(ResType),
init_resource(Mod, Create, ResId, Config),
refresh_actions_of_a_resource(ResId),
ok;
not_found ->
{error, {resource_not_found, ResId}}
end.

-spec(test_resource(#{}) -> ok | {error, Reason :: term()}).
test_resource(#{type := Type, config := Config}) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} ->
try
ok = emqx_rule_validator:validate_params(Config, ParamSpec),
ResId = resource_id(),
cluster_call(init_resource, [ModC, Create, ResId, Config]),
cluster_call(clear_resource, [ModD, Destroy, ResId]),
ok
catch Error:Reason ->
{error, {Error, Reason}}
end;
ok = emqx_rule_validator:validate_params(Config, ParamSpec),
ResId = resource_id(),
cluster_call(init_resource, [ModC, Create, ResId, Config]),
cluster_call(clear_resource, [ModD, Destroy, ResId]),
ok;
not_found ->
{error, {resource_type_not_found, Type}}
end.
@@ -252,6 +224,44 @@ delete_resource(ResId) ->
{error, {resource_not_found, ResId}}
end.

%%------------------------------------------------------------------------------
%% Re-establish resources
%%------------------------------------------------------------------------------

-spec(refresh_resources() -> ok).
refresh_resources() ->
try
lists:foreach(
fun(#resource{id = ResId, config = Config, type = Type}) ->
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
cluster_call(init_resource, [M, F, ResId, Config])
end, emqx_rule_registry:get_resources())
catch
_:Error:StackTrace ->
logger:critical("Can not re-stablish resource: ~p,"
"Fix the issue and establish it manually.\n"
"Stacktrace: ~p",
[Error, StackTrace])
end.

-spec(refresh_rules() -> ok).
refresh_rules() ->
try
[lists:foreach(
fun(#action_instance{id = Id, name = ActName, args = Args}) ->
{ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName),
cluster_call(init_action, [Mod, Create, Id, with_resource_params(Args)])
end, Actions)
|| #rule{actions = Actions} <- emqx_rule_registry:get_rules()],
ok
catch
_:Error:StackTrace ->
logger:critical("Can not re-build rule: ~p,"
"Fix the issue and establish it manually.\n"
"Stacktrace: ~p",
[Error, StackTrace])
end.

%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
@@ -309,11 +319,11 @@ cluster_call(Func, Args) ->
[] -> ok;
ErrL ->
?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
throw({cluster_call_failed, ErrL})
throw({func_fail(Func), ErrL})
end;
{ResL, BadNodes} ->
?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
throw({cluster_call_failed, {nodes_not_exist, BadNodes}})
throw({func_fail(Func), {nodes_not_exist, BadNodes}})
end.

init_resource(Module, OnCreate, ResId, Config) ->
@@ -355,3 +365,18 @@ clear_action(Module, Destroy, ActionInstId) ->
not_found ->
ok
end.

refresh_actions_of_a_resource(ResId) ->
[lists:foreach(
fun(#action_instance{args = Args = #{<<"$resource">> := ResId0},
id = Id, name = ActName}) when ResId0 =:= ResId ->
{ok, #action{module = Mod, on_create = Create}}
= emqx_rule_registry:find_action(ActName),
init_action(Mod, Create, Id, with_resource_params(Args));
(#action_instance{}) ->
ok
end, Actions)
|| #rule{actions = Actions} <- emqx_rule_registry:get_rules()].

func_fail(Func) when is_atom(Func) ->
list_to_atom(atom_to_list(Func) ++ "_failure").
@@ -83,6 +83,13 @@
descr => "Show a resource"
}).

-rest_api(#{name => start_resource,
method => 'POST',
path => "/resources/:bin:id",
func => start_resource,
descr => "Start a resource"
}).

-rest_api(#{name => delete_resource,
method => 'DELETE',
path => "/resources/:bin:id",
@@ -124,6 +131,7 @@
-export([ create_resource/2
, list_resources/2
, show_resource/2
, start_resource/2
, delete_resource/2
]).

@@ -137,6 +145,7 @@
-define(ERR_NO_HOOK(HOOK), list_to_binary(io_lib:format("Event ~s Not Found", [(HOOK)]))).
-define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))).
-define(ERR_UNKNOWN_COLUMN(COLUMN), list_to_binary(io_lib:format("Unknown Column: ~s", [(COLUMN)]))).
-define(ERR_START_RESOURCE(RESID), list_to_binary(io_lib:format("Start Resource ~s Failed", [(RESID)]))).
-define(ERR_BADARGS(REASON),
begin
R0 = list_to_binary(io_lib:format("~0p", [REASON])),
@@ -236,6 +245,11 @@ do_create_resource(Create, Params) ->
catch
throw:{resource_type_not_found, Type} ->
return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
throw:{init_resource_failure, Reason} ->
%% Note that we will return OK in case of resource creation failure,
%% users can always re-start the resource later.
?LOG(error, "[RuleEngineAPI] init_resource_failure: ~p", [Reason]),
return(ok);
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_Error:Reason:StackT ->
@@ -252,6 +266,23 @@ list_resources_by_type(#{type := Type}, _Params) ->
show_resource(#{id := Id}, _Params) ->
reply_with(fun emqx_rule_registry:find_resource/1, Id).

start_resource(#{id := Id}, _Params) ->
try emqx_rule_engine:start_resource(Id) of
ok ->
return(ok);
{error, {resource_not_found, ResId}} ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)})
catch
throw:{{init_resource_failure, _}, Reason} ->
?LOG(error, "[RuleEngineAPI] init_resource_failure: ~p", [Reason]),
return({error, 400, ?ERR_START_RESOURCE(Id)});
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_Error:Reason:StackT ->
?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.

delete_resource(#{id := Id}, _Params) ->
try
emqx_rule_engine:delete_resource(Id),
@@ -27,8 +27,8 @@
start(_Type, _Args) ->
{ok, Sup} = emqx_rule_engine_sup:start_link(),
ok = emqx_rule_engine:load_providers(),
ok = emqx_rule_engine:re_establish_resources(),
ok = emqx_rule_engine:rebuild_rules(),
ok = emqx_rule_engine:refresh_resources(),
ok = emqx_rule_engine:refresh_rules(),
ok = emqx_rule_engine_cli:load(),
ok = emqx_rule_runtime:start(env()),
{ok, Sup}.

0 comments on commit ccbffd7

Please sign in to comment.
You can’t perform that action at this time.