Skip to content

Commit

Permalink
Support validation of params
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu committed Apr 23, 2019
1 parent 5094c43 commit b283184
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 41 deletions.
2 changes: 1 addition & 1 deletion docs/api_examples.md
Expand Up @@ -169,7 +169,7 @@ $ curl -XDELETE -v --basic -u $APPSECRET -k 'http://localhost:8080/api/v3/resour
``` shell

$ curl -v --basic -u $APPSECRET -k 'http://localhost:8080/api/v3/resources' -d \
'{"name":"webhook1", "type": "web_hook", "config": {"url": "http://127.0.0.1:9910", "headers": {"token": "axfw34y235wrq234t4ersgw4t"}, "method": "POST"}, "description": "web hook resource-1"}'
'{"name":"webhook1", "type": "web_hook", "config": {"url": "http://127.0.0.1:9910", "headers": [{"key":"token", "value":"axfw34y235wrq234t4ersgw4t"}], "method": "POST"}, "description": "web hook resource-1"}'

curl -v --basic -u $APPSECRET -k 'http://localhost:8080/api/v3/rules' -d \
'{"name":"connected_msg_to_http","for":"client.connected","rawsql":"select * from \"#\"","actions":[{"name":"web_hook:event_action","params":{"$resource": "web_hook:webhook1", "template": {"client": "${client_id}", "user": "${username}", "c": {"u": "${username}", "e": "${e}"}}}}],"description":"Forward connected events to webhook"}'
Expand Down
6 changes: 3 additions & 3 deletions docs/cli_examples.md
Expand Up @@ -5,7 +5,7 @@
### create

```shell
$ ./bin/emqx_ctl rules create 'steven_msg_to_http' 'message.publish' 'SELECT payload FROM "#" where user=Steven' '{"web_hook:publish_action": {"$resource": "web_hook:webhook1", "url": "http://www.baidu.com"}}' -d "Forward msgs from clientid=Steven to webhook"
$ ./bin/emqx_ctl rules create 'steven_msg_to_http' 'message.publish' 'SELECT payload FROM "#" where user=Steven' '[{"name":"web_hook:publish_action", "params": {"$resource": "web_hook:webhook1", "url": "http://www.baidu.com"}}]' -d "Forward msgs from clientid=Steven to webhook"

Rule steven_msg_to_http:1555138068602953000 created
```
Expand Down Expand Up @@ -122,9 +122,9 @@ resource_type(name=built_in, provider=emqx_rule_engine, params=#{}, on_create={e
``` shell
./bin/emqx_ctl resources create 'webhook1' 'web_hook' -c '{"url": "http://127.0.0.1:9910", "headers": {"token": "axfw34y235wrq234t4ersgw4t"}, "method": "POST"}'
./bin/emqx_ctl resources create 'webhook1' 'web_hook' -c '{"url": "http://127.0.0.1:9910", "headers": [{"key":"token", "value":"axfw34y235wrq234t4ersgw4t"}], "method": "POST"}'
./bin/emqx_ctl rules create 'connected_msg_to_http' 'client.connected' 'SELECT * FROM "#"' '{"web_hook:event_action": {"$resource": "web_hook:webhook1", "template": {"client": "${client_id}", "user": "${username}", "c": {"u": "${username}", "e": "${e}"}}}}' -d "Forward connected events to webhook"
./bin/emqx_ctl rules create 'connected_msg_to_http' 'client.connected' 'SELECT * FROM "#"' '[{"name":"web_hook:event_action", "params": {"$resource": "web_hook:webhook1", "template": {"client": "${client_id}", "user": "${username}", "c": {"u": "${username}", "e": "${e}"}}}}]' -d "Forward connected events to webhook"
```
Expand Down
13 changes: 12 additions & 1 deletion src/emqx_rule_actions.erl
Expand Up @@ -17,6 +17,17 @@

-include_lib("emqx/include/emqx.hrl").

-define(REPUBLISH_PARAMS_SPEC, #{
from => #{type => string,
format => topic,
required => true,
description => <<"From which topic">>},
to => #{type => string,
format => topic,
required => true,
description => <<"Repubilsh To which topic">>}
}).

-resource_type(#{name => built_in,
create => on_resource_create,
params => #{},
Expand All @@ -35,7 +46,7 @@
for => 'message.publish',
type => built_in,
func => republish_action,
params => #{from => topic, to => topic},
params => ?REPUBLISH_PARAMS_SPEC,
description => "Republish a MQTT message"
}).

Expand Down
25 changes: 13 additions & 12 deletions src/emqx_rule_engine.erl
Expand Up @@ -85,23 +85,25 @@ new_action({App, Mod, #{name := Name,
for := Hook,
type := Type,
func := Func,
params := Params,
params := ParamsSpec,
description := Descr}}) ->
%% Check if the action's function exported
case erlang:function_exported(Mod, Func, 1) of
true -> ok;
false -> error({action_func_not_found, Func})
end,
ok = emqx_rule_validator:validate_spec(ParamsSpec),
#action{name = action_name(Type, Name), for = Hook, app = App, type = Type,
module = Mod, func = Func, params = Params,
module = Mod, func = Func, params = ParamsSpec,
description = iolist_to_binary(Descr)}.

new_resource_type({App, Mod, #{name := Name,
params := Params,
params := ParamsSpec,
create := Create,
description := Descr}}) ->
ok = emqx_rule_validator:validate_spec(ParamsSpec),
#resource_type{name = Name, provider = App,
params = Params,
params = ParamsSpec,
on_create = {Mod, Create},
description = iolist_to_binary(Descr)}.

Expand Down Expand Up @@ -145,14 +147,13 @@ create_rule(Params = #{name := Name,
Error -> error(Error)
end.

prepare_action(Name) when is_atom(Name) ->
prepare_action({Name, #{}});
prepare_action({Name, Args}) ->
case emqx_rule_registry:find_action(Name) of
{ok, #action{module = M, func = F}} ->
{ok, #action{module = M, func = F, params = ParamSpec}} ->
ok = emqx_rule_validator:validate_params(Args, ParamSpec),
NewArgs = with_resource_config(Args),
#{name => Name, params => Args,
apply => ?RAISE(M:F(NewArgs), {init_action_failure,_REASON_,{M,F}})};
#{name => Name, params => NewArgs,
apply => ?RAISE(M:F(NewArgs), {init_action_failure,{{M,F},_REASON_}})};
not_found ->
throw({action_not_found, Name})
end.
Expand All @@ -173,13 +174,13 @@ create_resource(#{name := Name,
config := Config,
description := Descr}) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {Mod, OnCreate}}} ->
NewConfig = ?RAISE(Mod:OnCreate(Name, Config), {init_resource_failure,_REASON_,{Mod,OnCreate}}),
{ok, #resource_type{on_create = {M, F}, params = ParamSpec}} ->
ok = emqx_rule_validator:validate_params(Config, ParamSpec),
ResId = iolist_to_binary([atom_to_list(Type), ":", Name]),
Resource = #resource{id = ResId,
name = Name,
type = Type,
config = NewConfig,
config = ?RAISE(M:F(Name, Config), {init_resource_failure,{{M,F},_REASON_}}),
description = iolist_to_binary(Descr)},
ok = emqx_rule_registry:add_resource(Resource),
{ok, Resource};
Expand Down
34 changes: 19 additions & 15 deletions src/emqx_rule_engine_api.erl
Expand Up @@ -142,7 +142,11 @@
-define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))).
-define(ERR_NO_HOOK(HOOK), list_to_binary(io_lib:format("Hook ~s Not Found", [(HOOK)]))).
-define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))).
-define(ERR_BADARGS, <<"Bad Arguments">>).
-define(ERR_BADARGS(REASON),
begin
R0 = list_to_binary(io_lib:format("~0p", [REASON])),
<<"Bad Arguments: ", R0/binary>>
end).

%%------------------------------------------------------------------------------
%% Rules API
Expand All @@ -159,8 +163,8 @@ create_rule(_Bindings, Params) ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)});
throw:{invalid_hook, Hook} ->
return({error, 400, ?ERR_NO_HOOK(Hook)});
_Error:_Reason ->
return({error, 400, ?ERR_BADARGS})
_Error:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)})
end.

list_rules(_Bindings, _Params) ->
Expand Down Expand Up @@ -204,8 +208,8 @@ create_resource(#{}, Params) ->
catch
throw:{resource_type_not_found, Type} ->
return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
_Error:_Reason ->
return({error, 400, ?ERR_BADARGS})
_Error:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)})
end.

list_resources(#{}, _Params) ->
Expand Down Expand Up @@ -315,19 +319,19 @@ parse_rule_params([{<<"for">>, Hook} | Params], Rule) ->
parse_rule_params([{<<"rawsql">>, RawSQL} | Params], Rule) ->
parse_rule_params(Params, Rule#{rawsql => RawSQL});
parse_rule_params([{<<"actions">>, Actions} | Params], Rule) ->
parse_rule_params(Params, Rule#{actions => [parse_action(A) || A <- Actions]});
parse_rule_params(Params, Rule#{actions => [parse_action(json_term_to_map(A)) || A <- Actions]});
parse_rule_params([{<<"description">>, Descr} | Params], Rule) ->
parse_rule_params(Params, Rule#{description => Descr});
parse_rule_params([_ | Params], Res) ->
parse_rule_params(Params, Res).

parse_action(Actions) ->
case proplists:get_value(<<"params">>, Actions) of
undefined ->
binary_to_existing_atom(proplists:get_value(<<"name">>, Actions), utf8);
Params ->
{binary_to_existing_atom(proplists:get_value(<<"name">>, Actions), utf8),
maps:from_list(atom_key_list(Params))}
case maps:find(<<"params">>, Actions) of
error ->
throw({action_param_missing, Actions});
{ok, Params} ->
{binary_to_existing_atom(maps:get(<<"name">>, Actions), utf8),
emqx_rule_maps:atom_key_map(Params)}
end.

parse_resource_params(Params) ->
Expand All @@ -342,11 +346,11 @@ parse_resource_params([{<<"type">>, Type} | Params], Res) ->
throw({resource_type_not_found, Type})
end;
parse_resource_params([{<<"config">>, Config} | Params], Res) ->
parse_resource_params(Params, Res#{config => maps:from_list(atom_key_list(Config))});
parse_resource_params(Params, Res#{config => emqx_rule_maps:atom_key_map(json_term_to_map(Config))});
parse_resource_params([{<<"description">>, Descr} | Params], Res) ->
parse_resource_params(Params, Res#{description => Descr});
parse_resource_params([_ | Params], Res) ->
parse_resource_params(Params, Res).

atom_key_list(BinKeyList) ->
[{binary_to_existing_atom(K, utf8), V} || {K, V} <- BinKeyList].
json_term_to_map(List) ->
jsx:decode(jsx:encode(List), [return_maps]).
12 changes: 4 additions & 8 deletions src/emqx_rule_engine_cli.erl
Expand Up @@ -240,25 +240,21 @@ make_rule(Opts) ->
for => get_value(hook, Opts),
rawsql => get_value(sql, Opts),
actions => [{?RAISE(binary_to_existing_atom(ActName, utf8), {action_not_found, ActName}),
maps:from_list(atom_key_list(ActParam))}
|| {ActName, ActParam} <- ?RAISE(jsx:decode(Actions), {invalid_action_params, Actions})],
?RAISE(emqx_rule_maps:atom_key_map(ActParam), {invalid_action_param, ActParam})}
|| #{<<"name">> := ActName, <<"params">> := ActParam} <- jsx:decode(Actions, [return_maps])],
description => get_value(descr, Opts)}.

make_resource(Opts) ->
Config = get_value(config, Opts),
#{name => get_value(name, Opts),
type => get_value(type, Opts),
config => maps:from_list(
atom_key_list(
?RAISE(jsx:decode(Config), {invalid_config, Config}))),
config => ?RAISE(emqx_rule_maps:atom_key_map(jsx:decode(Config, [return_maps])),
{invalid_config, Config}),
description => get_value(descr, Opts)}.

printable_actions(Actions) when is_list(Actions) ->
jsx:encode([maps:remove(apply, Act) || Act <- Actions]).

atom_key_list(BinKeyList) ->
[{binary_to_existing_atom(K, utf8), V} || {K, V} <- BinKeyList].

with_opts(Action, RawParams, OptSpecList, {CmdObject, CmdName}) ->
case getopt:parse_and_check(OptSpecList, RawParams) of
{ok, Params} ->
Expand Down
8 changes: 8 additions & 0 deletions src/emqx_rule_maps.erl
Expand Up @@ -19,6 +19,7 @@
, get_value/2
, get_value/3
, put_value/3
, atom_key_map/1
]).

nested_get(Key, Map) when not is_list(Key) ->
Expand Down Expand Up @@ -69,3 +70,10 @@ put_value(_Key, undefined, Map) ->
put_value(Key, Val, Map) ->
maps:put(Key, Val, Map).

atom_key_map(BinKeyMap) when is_map(BinKeyMap) ->
maps:fold(fun(K, V, Acc) ->
Acc#{binary_to_existing_atom(K, utf8) => atom_key_map(V)}
end, #{}, BinKeyMap);
atom_key_map(ListV) when is_list(ListV) ->
[atom_key_map(V) || V <- ListV];
atom_key_map(Val) -> Val.

0 comments on commit b283184

Please sign in to comment.