diff --git a/docs/api_examples.md b/docs/api_examples.md index 5cc0c9fe5..90e0b59e9 100644 --- a/docs/api_examples.md +++ b/docs/api_examples.md @@ -6,6 +6,12 @@ APPSECRET="88ebdd6569afc:Mjg3MzUyNTI2Mjk2NTcyOTEwMDEwMDMzMTE2NTM1MTkzNjA" ## Rules +### test sql +$ curl -v --basic -u $APPSECRET -k 'http://localhost:8080/api/v3/rules?test' -d \ +'{"rawsql":"select * from \"message.publish\" where topic=\"t/a\"","ctx":{}}' + + + ### create ```shell $ curl -v --basic -u $APPSECRET -k 'http://localhost:8080/api/v3/rules' -d \ diff --git a/include/rule_engine.hrl b/include/rule_engine.hrl index 9fdc193e6..88e2f6df9 100644 --- a/include/rule_engine.hrl +++ b/include/rule_engine.hrl @@ -99,126 +99,3 @@ begin try (_EXP_) catch _:_REASON_ -> throw(_ERROR_) end end). - --define(EVENT_ALIAS(ALIAS), - case ALIAS of - '$message' -> - [ '$message' - , '$any' - , 'message.publish' - , 'message.deliver' - , 'message.acked' - , 'message.dropped' - ]; - '$client' -> - [ '$client' - , '$any' - , 'client.connected' - , 'client.disconnected' - , 'client.subscribe' - , 'client.unsubscribe' - ]; - _ -> ['$any', ALIAS] - end). - --define(COLUMNS(EVENT), - case EVENT of - 'message.publish' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"flags">> - , <<"id">> - , <<"payload">> - , <<"peername">> - , <<"qos">> - , <<"timestamp">> - , <<"topic">> - ]; - 'message.deliver' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"auth_result">> - , <<"mountpoint">> - , <<"flags">> - , <<"id">> - , <<"payload">> - , <<"peername">> - , <<"topic">> - , <<"qos">> - , <<"timestamp">> - ]; - 'message.acked' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"flags">> - , <<"id">> - , <<"payload">> - , <<"peername">> - , <<"topic">> - , <<"qos">> - , <<"timestamp">> - ]; - 'message.dropped' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"flags">> - , <<"id">> - , <<"node">> - , <<"payload">> - , <<"peername">> - , <<"qos">> - , <<"timestamp">> - , <<"topic">> - ]; - 'client.connected' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"auth_result">> - , <<"clean_start">> - , <<"connack">> - , <<"connected_at">> - , <<"is_bridge">> - , <<"keepalive">> - , <<"mountpoint">> - , <<"peername">> - , <<"proto_ver">> - ]; - 'client.disconnected' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"auth_result">> - , <<"mountpoint">> - , <<"peername">> - , <<"reason_code">> - ]; - 'client.subscribe' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"auth_result">> - , <<"mountpoint">> - , <<"peername">> - , <<"topic_filters">> - , <<"topic">> - , <<"qos">> - ]; - 'client.unsubscribe' -> - [ <<"client_id">> - , <<"username">> - , <<"event">> - , <<"auth_result">> - , <<"mountpoint">> - , <<"peername">> - , <<"topic_filters">> - , <<"topic">> - , <<"qos">> - ]; - RuleType -> - error({unknown_rule_type, RuleType}) - end). \ No newline at end of file diff --git a/include/rule_events.hrl b/include/rule_events.hrl new file mode 100644 index 000000000..9809d8ece --- /dev/null +++ b/include/rule_events.hrl @@ -0,0 +1,254 @@ +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-define(EVENT_ALIAS(ALIAS), + case ALIAS of + '$message' -> + [ '$message' + , '$any' + , 'message.publish' + , 'message.deliver' + , 'message.acked' + , 'message.dropped' + ]; + '$client' -> + [ '$client' + , '$any' + , 'client.connected' + , 'client.disconnected' + , 'client.subscribe' + , 'client.unsubscribe' + ]; + _ -> ['$any', ALIAS] + end). + +-define(COLUMNS(EVENT), + case EVENT of + 'message.publish' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"flags">> + , <<"id">> + , <<"payload">> + , <<"peername">> + , <<"qos">> + , <<"timestamp">> + , <<"topic">> + ]; + 'message.deliver' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"auth_result">> + , <<"mountpoint">> + , <<"flags">> + , <<"id">> + , <<"payload">> + , <<"peername">> + , <<"topic">> + , <<"qos">> + , <<"timestamp">> + ]; + 'message.acked' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"flags">> + , <<"id">> + , <<"payload">> + , <<"peername">> + , <<"topic">> + , <<"qos">> + , <<"timestamp">> + ]; + 'message.dropped' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"flags">> + , <<"id">> + , <<"node">> + , <<"payload">> + , <<"peername">> + , <<"qos">> + , <<"timestamp">> + , <<"topic">> + ]; + 'client.connected' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"auth_result">> + , <<"clean_start">> + , <<"connack">> + , <<"connected_at">> + , <<"is_bridge">> + , <<"keepalive">> + , <<"mountpoint">> + , <<"peername">> + , <<"proto_ver">> + ]; + 'client.disconnected' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"auth_result">> + , <<"mountpoint">> + , <<"peername">> + , <<"reason_code">> + ]; + 'client.subscribe' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"auth_result">> + , <<"mountpoint">> + , <<"peername">> + , <<"topic_filters">> + , <<"topic">> + , <<"qos">> + ]; + 'client.unsubscribe' -> + [ <<"client_id">> + , <<"username">> + , <<"event">> + , <<"auth_result">> + , <<"mountpoint">> + , <<"peername">> + , <<"topic_filters">> + , <<"topic">> + , <<"qos">> + ]; + RuleType -> + error({unknown_rule_type, RuleType}) + end). + +-define(EG_ENVS(EVENT), + case EVENT of + 'message.publish' -> + #{event => 'message.publish', + flags => #{dup => false,retain => false}, + from => <<"c_emqx">>, + headers => + #{allow_publish => true, + peername => {{127,0,0,1},50891}, + username => <<"u_emqx">>}, + id => <<0,5,137,164,41,233,87,47,180,75,0,0,5,124,0,1>>, + payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,qos => 1, + timestamp => {1558,713054,353201}, + topic => <<"t1">>}; + 'message.deliver' -> + #{anonymous => true,auth_result => success, + client_id => <<"c_emqx">>, + event => 'message.deliver', + flags => #{dup => false,retain => false}, + from => <<"c_emqx">>, + headers => + #{allow_publish => true, + peername => {{127,0,0,1},50891}, + username => <<"u_emqx">>}, + id => <<0,5,137,164,41,233,87,47,180,75,0,0,5,124,0,1>>, + mountpoint => undefined, + payload => <<"{\"id\": 1, \"name\": \"ha\"}">>, + peername => {{127,0,0,1},50891}, + qos => 1, + sockname => {{127,0,0,1},1883}, + timestamp => {1558,713054,353201}, + topic => <<"t1">>,username => <<"u_emqx">>, + ws_cookie => undefined,zone => external}; + 'message.acked' -> + #{client_id => <<"c_emqx">>, + event => 'message.acked', + flags => #{dup => false,retain => false}, + from => <<"c_emqx">>, + headers => + #{allow_publish => true, + peername => {{127,0,0,1},50891}, + username => <<"u_emqx">>}, + id => <<0,5,137,164,41,233,87,47,180,75,0,0,5,124,0,1>>, + payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,qos => 1, + timestamp => {1558,713054,353201}, + topic => <<"t1">>,username => <<"u_emqx">>}; + 'message.dropped' -> + #{event => 'message.dropped', + flags => #{dup => false,retain => false}, + from => <<"c_emqx">>, + headers => + #{allow_publish => true, + peername => {{127,0,0,1},50891}, + username => <<"u_emqx">>}, + id => <<0,5,137,164,41,236,124,3,180,75,0,0,5,124,0,2>>, + node => nonode@nohost, + payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,qos => 1, + timestamp => {1558,713054,559236}, + topic => <<"t1">>}; + 'client.connected' -> + #{anonymous => true,auth_result => success, + client_id => <<"c_emqx">>, + connack => 0, + connattrs => + #{clean_start => true, + client_id => <<"c_emqx">>, + conn_mod => emqx_connection, + connected_at => {1558,713054,248245}, + credentials => + #{anonymous => true,auth_result => success, + client_id => + <<"c_emqx">>, + mountpoint => undefined, + peername => {{127,0,0,1},50891}, + sockname => {{127,0,0,1},1883}, + username => <<"u_emqx">>,ws_cookie => undefined, + zone => external}, + is_bridge => false,keepalive => 60,peercert => nossl, + peername => {{127,0,0,1},50891}, + proto_name => <<"MQTT">>,proto_ver => 4, + username => <<"u_emqx">>,zone => external}, + event => 'client.connected',mountpoint => undefined, + peername => {{127,0,0,1},50891}, + sockname => {{127,0,0,1},1883}, + username => <<"u_emqx">>,ws_cookie => undefined, + zone => external}; + 'client.disconnected' -> + #{anonymous => true,auth_result => success, + client_id => <<"c_emqx">>, + event => 'client.disconnected',mountpoint => undefined, + peername => {{127,0,0,1},50891}, + reason_code => closed, + sockname => {{127,0,0,1},1883}, + username => <<"u_emqx">>,ws_cookie => undefined, + zone => external}; + 'client.subscribe' -> + #{anonymous => true,auth_result => success, + client_id => <<"c_emqx">>, + event => 'client.subscribe',mountpoint => undefined, + peername => {{127,0,0,1},50891}, + sockname => {{127,0,0,1},1883}, + topic_filters => + [{<<"t1">>,#{nl => 0,qos => 1,rap => 0,rc => 1,rh => 0}}], + username => <<"u_emqx">>,ws_cookie => undefined, + zone => external}; + 'client.unsubscribe' -> + #{anonymous => true,auth_result => success, + client_id => <<"c_emqx">>, + event => 'client.unsubscribe',mountpoint => undefined, + peername => {{127,0,0,1},50891}, + sockname => {{127,0,0,1},1883}, + topic_filters => [{<<"t1">>,#{}}], + username => <<"u_emqx">>,ws_cookie => undefined, + zone => external}; + RuleType -> + error({unknown_rule_type, RuleType}) + end). \ No newline at end of file diff --git a/src/emqx_rule_engine.erl b/src/emqx_rule_engine.erl index bf079e3a6..e0b507fc7 100644 --- a/src/emqx_rule_engine.erl +++ b/src/emqx_rule_engine.erl @@ -172,8 +172,7 @@ module_attributes(Module) -> %%------------------------------------------------------------------------------ -spec(create_rule(#{}) -> {ok, rule()} | no_return()). -create_rule(Params = #{rawsql := Sql, - actions := Actions}) -> +create_rule(Params = #{rawsql := Sql, actions := Actions}) -> case emqx_rule_sqlparser:parse_select(Sql) of {ok, Select} -> Rule = #rule{id = rule_id(), diff --git a/src/emqx_rule_engine_api.erl b/src/emqx_rule_engine_api.erl index 680fcc8b1..6d75d3466 100644 --- a/src/emqx_rule_engine_api.erl +++ b/src/emqx_rule_engine_api.erl @@ -15,8 +15,9 @@ -module(emqx_rule_engine_api). -include("rule_engine.hrl"). +-include("rule_events.hrl"). --import(minirest, [return/0, return/1]). +-import(minirest, [return/1]). -rest_api(#{name => create_rule, method => 'POST', @@ -132,7 +133,7 @@ -define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))). -define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))). --define(ERR_NO_HOOK(HOOK), list_to_binary(io_lib:format("Hook ~s Not Found", [(HOOK)]))). +-define(ERR_NO_HOOK(HOOK), list_to_binary(io_lib:format("Event ~s Not Found", [(HOOK)]))). -define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))). -define(ERR_BADARGS(REASON), begin @@ -143,8 +144,25 @@ %%------------------------------------------------------------------------------ %% Rules API %%------------------------------------------------------------------------------ - create_rule(_Bindings, Params) -> + case proplists:get_value(<<"test">>, Params) of + true -> + test_rule_sql(Params); + _ -> + do_create_rule(Params) + end. + +test_rule_sql(Params) -> + try rule_sql_test(jsx:decode(jsx:encode(Params), [return_maps])) of + Result -> return({ok, Result}) + catch + throw:{invalid_hook, Hook} -> + return({error, 400, ?ERR_NO_HOOK(Hook)}); + _Error:Reason -> + return({error, 400, ?ERR_BADARGS(Reason)}) + end. + +do_create_rule(Params) -> try emqx_rule_engine:create_rule(parse_rule_params(Params)) of {ok, Rule} -> return({ok, record_to_map(Rule)}); @@ -335,6 +353,53 @@ parse_action(Actions) -> Params} end. +-spec(rule_sql_test(#{}) -> {ok, Result::map()} | no_return()). +rule_sql_test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) -> + case emqx_rule_sqlparser:parse_select(Sql) of + {ok, Select} -> + Event = emqx_rule_sqlparser:select_from(Select), + Rule = #rule{rawsql = Sql, + for = Event, + selects = emqx_rule_sqlparser:select_fields(Select), + conditions = emqx_rule_sqlparser:select_where(Select), + actions = [#{name => test_rule_sql, + apply => feedback_action()}]}, + FullContext = fill_default_values(hd(Event), emqx_rule_maps:atom_key_map(Context)), + emqx_rule_runtime:apply_rule(Rule, FullContext), + wait_feedback(); + Error -> error(Error) + end. + +feedback_action() -> + fun(Data, _Envs) -> + erlang:put(rule_sql_test_result, Data) + end. + +wait_feedback() -> + case erlang:get(rule_sql_test_result) of + undefined -> #{}; + Data -> Data + end. + +fill_default_values(Event, #{<<"topic_filters">> := TopicFilters} = Context) -> + do_fill_default_values(Event, Context#{ + <<"topic_filters">> => parse_topic_filters(TopicFilters)}); +fill_default_values(Event, Context) -> + do_fill_default_values(Event, Context). + +do_fill_default_values(Event, Context) -> + maps:merge(?EG_ENVS(Event), Context). + +parse_topic_filters(TopicFilters) -> + [ case TpcFtl of + #{<<"topic">> := Topic, <<"qos">> := QoS} -> + {Topic, #{qos => QoS}}; + #{<<"topic">> := Topic} -> + {Topic, #{}}; + Topic -> + {Topic, #{}} + end || TpcFtl <- jsx:decode(TopicFilters, [return_maps])]. + parse_resource_params(Params) -> parse_resource_params(Params, #{config => #{}, description => <<"">>}). parse_resource_params([], Res) -> diff --git a/src/emqx_rule_engine_cli.erl b/src/emqx_rule_engine_cli.erl index 19bb8a67a..90fc42769 100644 --- a/src/emqx_rule_engine_cli.erl +++ b/src/emqx_rule_engine_cli.erl @@ -32,7 +32,7 @@ -define(OPTSPEC_RESOURCE_TYPE, [{type, $t, "type", {atom, undefined}, "Resource Type"}]). -define(OPTSPEC_ACTION_TYPE, - [ {hook, $k, "hook", {atom, undefined}, "Hook Type"} + [ {hook, $k, "hook", {atom, undefined}, "Event Type"} ]). -define(OPTSPEC_RESOURCES_CREATE, diff --git a/src/emqx_rule_registry.erl b/src/emqx_rule_registry.erl index 0791d9e4a..ed21eb8a2 100644 --- a/src/emqx_rule_registry.erl +++ b/src/emqx_rule_registry.erl @@ -17,6 +17,7 @@ -behaviour(gen_server). -include("rule_engine.hrl"). +-include("rule_events.hrl"). -include_lib("emqx/include/logger.hrl"). -export([start_link/0]). diff --git a/src/emqx_rule_runtime.erl b/src/emqx_rule_runtime.erl index 192a9e2d7..7593409cb 100644 --- a/src/emqx_rule_runtime.erl +++ b/src/emqx_rule_runtime.erl @@ -30,6 +30,8 @@ , on_message_acked/3 ]). +-export([apply_rule/2]). + -import(emqx_rule_maps, [ get_value/2 , get_value/3 @@ -204,7 +206,7 @@ number(Bin) -> %% Step3 -> Take actions take_actions(Actions, Selected, Envs) -> - lists:foreach(fun(Action) -> take_action(Action, Selected, Envs) end, Actions). + lists:map(fun(Action) -> take_action(Action, Selected, Envs) end, Actions). take_action(#{apply := Apply}, Selected, Envs) -> Apply(Selected, Envs). @@ -302,6 +304,9 @@ columns(Input = #{timestamp := Timestamp}, Result) -> columns(Input = #{peername := Peername}, Result) -> columns(maps:remove(peername, Input), Result#{peername => peername(Peername)}); +columns(Input = #{sockname := Peername}, Result) -> + columns(maps:remove(sockname, Input), + Result#{sockname => peername(Peername)}); columns(Input = #{connattrs := Conn}, Result) -> ConnAt = maps:get(connected_at, Conn, null), columns(maps:remove(connattrs, Input), @@ -311,10 +316,13 @@ columns(Input = #{connattrs := Conn}, Result) -> keepalive => maps:get(keepalive, Conn, null), proto_ver => maps:get(proto_ver, Conn, null) })); -columns(Input = #{topic_filters := [{Topic, #{qos := QoS}} | _] = Filters}, Result) -> +columns(Input = #{topic_filters := [{Topic, Opts} | _] = Filters}, Result) -> + Rusult1 = case maps:find(qos, Opts) of + {ok, QoS} -> Result#{qos => QoS}; + error -> Result + end, columns(maps:remove(topic_filters, Input), - Result#{topic => Topic, qos => QoS, - topic_filters => Filters}); + Rusult1#{topic => Topic, topic_filters => Filters}); columns(Input, Result) -> maps:merge(Result, Input). diff --git a/src/emqx_rule_sqlparser.erl b/src/emqx_rule_sqlparser.erl index 6f606aa5b..bd9323bea 100644 --- a/src/emqx_rule_sqlparser.erl +++ b/src/emqx_rule_sqlparser.erl @@ -15,6 +15,7 @@ -module(emqx_rule_sqlparser). -include("rule_engine.hrl"). +-include("rule_events.hrl"). -export([parse_select/1]). @@ -23,6 +24,10 @@ , select_where/1 ]). +-export([ hook/1 + , unquote/1 + ]). + -record(select, {fields, from, where}). -opaque(select() :: #select{}).