Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add /rule_engine API endpoint #10336

3 changes: 3 additions & 0 deletions apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ check_params(Params, Tag) ->

roots() ->
[
{"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})},
{"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
{"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
{"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
{"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
].

fields("rule_engine") ->
emqx_rule_engine_schema:rule_engine_settings();
fields("rule_creation") ->
emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
Expand Down
92 changes: 81 additions & 11 deletions apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

%% API callbacks
-export([
'/rule_engine'/2,
'/rule_events'/2,
'/rule_test'/2,
'/rules'/2,
Expand All @@ -41,7 +42,7 @@
]).

%% query callback
-export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]).
-export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]).

-define(ERR_BADARGS(REASON), begin
R0 = err_msg(REASON),
Expand Down Expand Up @@ -134,6 +135,7 @@ api_spec() ->

paths() ->
[
"/rule_engine",
"/rule_events",
"/rule_test",
"/rules",
Expand All @@ -145,6 +147,9 @@ paths() ->
error_schema(Code, Message) when is_atom(Code) ->
emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).

rule_engine_schema() ->
ref(emqx_rule_api_schema, "rule_engine").

rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").

Expand Down Expand Up @@ -184,7 +189,7 @@ schema("/rules") ->
responses => #{
200 =>
[
{data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})},
{data, mk(array(rule_info_schema()), #{desc => ?DESC("api1_resp")})},
{meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
],
400 => error_schema('BAD_REQUEST', "Invalid Parameters")
Expand Down Expand Up @@ -289,6 +294,26 @@ schema("/rule_test") ->
200 => <<"Rule Test Pass">>
}
}
};
schema("/rule_engine") ->
#{
'operationId' => '/rule_engine',
get => #{
tags => [<<"rules">>],
description => ?DESC("api9"),
responses => #{
200 => rule_engine_schema()
}
},
put => #{
tags => [<<"rules">>],
description => ?DESC("api10"),
'requestBody' => rule_engine_schema(),
responses => #{
200 => rule_engine_schema(),
400 => error_schema('BAD_REQUEST', "Invalid request")
}
}
}.

param_path_id() ->
Expand All @@ -309,7 +334,7 @@ param_path_id() ->
QueryString,
?RULE_QS_SCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_rule_resp/1
fun ?MODULE:format_rule_info_resp/1
)
of
{error, page_limit_invalid} ->
Expand All @@ -331,7 +356,7 @@ param_path_id() ->
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{201, format_rule_resp(Rule)};
{201, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
msg => "create_rule_failed",
Expand Down Expand Up @@ -362,7 +387,7 @@ param_path_id() ->
'/rules/:id'(get, #{bindings := #{id := Id}}) ->
case emqx_rule_engine:get_rule(Id) of
{ok, Rule} ->
{200, format_rule_resp(Rule)};
{200, format_rule_info_resp(Rule)};
not_found ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end;
Expand All @@ -372,7 +397,7 @@ param_path_id() ->
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{200, format_rule_resp(Rule)};
{200, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
msg => "update_rule_failed",
Expand Down Expand Up @@ -419,6 +444,24 @@ param_path_id() ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end.

'/rule_engine'(get, _Params) ->
{200, format_rule_engine_resp(emqx_conf:get([rule_engine]))};
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
'/rule_engine'(put, #{body := Params}) ->
?CHECK_PARAMS(
Params,
rule_engine,
case emqx_conf:update([rule_engine], Params, #{override_to => cluster}) of
{ok, #{config := Config}} ->
{200, format_rule_engine_resp(Config)};
{error, Reason} ->
?SLOG(error, #{
msg => "update_rule_engine_failed",
reason => Reason
}),
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
end
).

%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
Expand All @@ -440,11 +483,11 @@ encode_nested_error(RuleError, Reason) ->
{RuleError, Reason}
end.

format_rule_resp(Rules) when is_list(Rules) ->
[format_rule_resp(R) || R <- Rules];
format_rule_resp({Id, Rule}) ->
format_rule_resp(Rule#{id => Id});
format_rule_resp(#{
format_rule_info_resp(Rules) when is_list(Rules) ->
[format_rule_info_resp(R) || R <- Rules];
format_rule_info_resp({Id, Rule}) ->
format_rule_info_resp(Rule#{id => Id});
format_rule_info_resp(#{
id := Id,
name := Name,
created_at := CreatedAt,
Expand All @@ -465,6 +508,33 @@ format_rule_resp(#{
description => Descr
}.

format_rule_engine_resp(#{rules := Rules} = Config) ->
Config#{rules => maps:map(fun format_rule_resp/2, Rules)}.

format_rule_resp(
_Id,
#{
name := Name,
actions := Action,
sql := SQL,
enable := Enable,
description := Descr
} = Rule
) ->
Format = #{
name => Name,
actions => format_action(Action),
sql => SQL,
enable => Enable,
description => Descr
},
case Rule of
#{metadata := MetaData = #{created_at := CreatedAt}} ->
Format#{metadata => MetaData#{created_at => format_datetime(CreatedAt, millisecond)}};
_ ->
Format
end.

format_datetime(Timestamp, Unit) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).

Expand Down
60 changes: 34 additions & 26 deletions apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
roots/0,
fields/1,
desc/1,
post_config_update/5
post_config_update/5,
rule_engine_settings/0
]).

-export([validate_sql/1]).
Expand All @@ -40,31 +41,13 @@ tags() ->
roots() -> ["rule_engine"].

fields("rule_engine") ->
[
{ignore_sys_message,
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
{rules,
?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
desc => ?DESC("rule_engine_rules"), default => #{}
})},
{jq_function_default_timeout,
?HOCON(
emqx_schema:duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC("rule_engine_jq_function_default_timeout")
}
)},
{jq_implementation_module,
?HOCON(
hoconsc:enum([jq_nif, jq_port]),
#{
default => jq_nif,
mapping => "jq.jq_implementation_module",
desc => ?DESC("rule_engine_jq_implementation_module")
}
)}
];
rule_engine_settings() ++
[
{rules,
?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
desc => ?DESC("rule_engine_rules"), default => #{}
})}
];
fields("rules") ->
[
rule_name(),
Expand Down Expand Up @@ -227,6 +210,31 @@ actions() ->
qos() ->
?UNION([emqx_schema:qos(), binary()]).

rule_engine_settings() ->
[
{ignore_sys_message,
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
{jq_function_default_timeout,
?HOCON(
emqx_schema:duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC("rule_engine_jq_function_default_timeout")
}
)},
{jq_implementation_module,
?HOCON(
hoconsc:enum([jq_nif, jq_port]),
#{
default => jq_nif,
mapping => "jq.jq_implementation_module",
desc => ?DESC("rule_engine_jq_implementation_module"),
deprecated => {since, "v5.0.22"},
importance => ?IMPORTANCE_HIDDEN
}
)}
].

validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, _Result} -> ok;
Expand Down
18 changes: 18 additions & 0 deletions apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,21 @@ test_rule_params(Sql, Payload) ->
<<"sql">> => Sql
}
}.

t_rule_engine(_) ->
{200, _} = emqx_rule_engine_api:'/rule_engine'(get, foo),
{200, #{
%,
jq_function_default_timeout := 12000
% hidden! jq_implementation_module := jq_port
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
}} = emqx_rule_engine_api:'/rule_engine'(put, #{
body => #{
<<"jq_function_default_timeout">> => <<"12s">>,
<<"jq_implementation_module">> => <<"jq_port">>
}
}),
SomeRule = #{<<"sql">> => <<"SELECT * FROM \"t/#\"">>},
{400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{
body => #{<<"rules">> => #{<<"some_rule">> => SomeRule}}
}),
{400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
1 change: 1 addition & 0 deletions changes/ce/feat-10336.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `/rule_engine` API endpoint to manage configuration of rule engine.
11 changes: 11 additions & 0 deletions rel/i18n/emqx_rule_api_schema.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,17 @@ emqx_rule_api_schema {
}
}

root_rule_engine {
desc {
en: "Rule engine configurations. This API can be used to change EMQX rule engine settings. But not for the rules. To list, create, or update rules, call the '/rules' API instead."
zh: "规则引擎配置。该 API 可用于查看和修改规则引擎相关的一些设置。但不可用于规则,请调用 '/rules' API 来对规则进行操作。"
sstrigler marked this conversation as resolved.
Show resolved Hide resolved
}
label: {
en: "Rule engine configuration"
zh: "规则引擎配置"
}
}

root_rule_creation {
desc {
en: "Schema for creating rules"
Expand Down
37 changes: 27 additions & 10 deletions rel/i18n/emqx_rule_engine_api.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ emqx_rule_engine_api {
zh: "根据规则来源 Topic 过滤, 使用 MQTT Topic 匹配"
}
}

api1_resp {
desc {
en: "List of rules"
zh: "列出所有规则"
sstrigler marked this conversation as resolved.
Show resolved Hide resolved
}
label: {
en: "List Rules"
zh: "列出所有规则"
}
}
api2 {
desc {
en: "Create a new rule using given Id"
Expand Down Expand Up @@ -116,7 +125,6 @@ emqx_rule_engine_api {
zh: "删除集群规则"
sstrigler marked this conversation as resolved.
Show resolved Hide resolved
}
}

api7 {
desc {
en: "Reset a rule metrics"
Expand All @@ -127,7 +135,6 @@ emqx_rule_engine_api {
zh: "重置规则计数"
}
}

api8 {
desc {
en: "Test a rule"
Expand All @@ -138,14 +145,24 @@ emqx_rule_engine_api {
zh: "测试规则"
}
}
desc9 {
api9 {
desc {
en: "List of rules"
zh: "列出所有规则"
en: "Get rule engine configuration."
zh: "获取规则引擎配置。"
}
label: {
en: "List Rules"
zh: "列出所有规则"
label {
en: "Get configuration"
zh: "获取配置"
}
}
api10 {
desc {
en: "Update rule engine configuration."
zh: "更新规则引擎配置。"
}
label {
en: "Update configuration"
zh: "更新配置"
}
}
}
}