Skip to content

Commit

Permalink
Merge pull request #10336 from sstrigler/EMQX-8507-rule-engine-need-a…
Browse files Browse the repository at this point in the history
…-new-api-to-crud-rule-engines-setting-configs

feat: add `/rule_engine` API endpoint
  • Loading branch information
sstrigler committed Apr 12, 2023
2 parents a49eea5 + e6f8682 commit f668ad7
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 104 deletions.
5 changes: 4 additions & 1 deletion apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

-export([roots/0, fields/1]).

-type tag() :: rule_creation | rule_test.
-type tag() :: rule_creation | rule_test | rule_engine.

-spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
check_params(Params, Tag) ->
Expand All @@ -48,12 +48,15 @@ check_params(Params, Tag) ->

roots() ->
[
{"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})},
{"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
{"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
{"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
{"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
].

fields("rule_engine") ->
emqx_rule_engine_schema:rule_engine_settings();
fields("rule_creation") ->
emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
Expand Down
69 changes: 58 additions & 11 deletions apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

%% API callbacks
-export([
'/rule_engine'/2,
'/rule_events'/2,
'/rule_test'/2,
'/rules'/2,
Expand All @@ -41,7 +42,7 @@
]).

%% query callback
-export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]).
-export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]).

-define(ERR_BADARGS(REASON), begin
R0 = err_msg(REASON),
Expand Down Expand Up @@ -134,6 +135,7 @@ api_spec() ->

paths() ->
[
"/rule_engine",
"/rule_events",
"/rule_test",
"/rules",
Expand All @@ -145,6 +147,9 @@ paths() ->
error_schema(Code, Message) when is_atom(Code) ->
emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).

rule_engine_schema() ->
ref(emqx_rule_api_schema, "rule_engine").

rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").

Expand Down Expand Up @@ -184,7 +189,7 @@ schema("/rules") ->
responses => #{
200 =>
[
{data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})},
{data, mk(array(rule_info_schema()), #{desc => ?DESC("api1_resp")})},
{meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
],
400 => error_schema('BAD_REQUEST', "Invalid Parameters")
Expand Down Expand Up @@ -289,6 +294,26 @@ schema("/rule_test") ->
200 => <<"Rule Test Pass">>
}
}
};
schema("/rule_engine") ->
#{
'operationId' => '/rule_engine',
get => #{
tags => [<<"rules">>],
description => ?DESC("api9"),
responses => #{
200 => rule_engine_schema()
}
},
put => #{
tags => [<<"rules">>],
description => ?DESC("api10"),
'requestBody' => rule_engine_schema(),
responses => #{
200 => rule_engine_schema(),
400 => error_schema('BAD_REQUEST', "Invalid request")
}
}
}.

param_path_id() ->
Expand All @@ -309,7 +334,7 @@ param_path_id() ->
QueryString,
?RULE_QS_SCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_rule_resp/1
fun ?MODULE:format_rule_info_resp/1
)
of
{error, page_limit_invalid} ->
Expand All @@ -331,7 +356,7 @@ param_path_id() ->
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{201, format_rule_resp(Rule)};
{201, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
msg => "create_rule_failed",
Expand Down Expand Up @@ -362,7 +387,7 @@ param_path_id() ->
'/rules/:id'(get, #{bindings := #{id := Id}}) ->
case emqx_rule_engine:get_rule(Id) of
{ok, Rule} ->
{200, format_rule_resp(Rule)};
{200, format_rule_info_resp(Rule)};
not_found ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end;
Expand All @@ -372,7 +397,7 @@ param_path_id() ->
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{200, format_rule_resp(Rule)};
{200, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
msg => "update_rule_failed",
Expand Down Expand Up @@ -419,6 +444,16 @@ param_path_id() ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end.

'/rule_engine'(get, _Params) ->
{200, format_rule_engine_resp(emqx_conf:get([rule_engine]))};
'/rule_engine'(put, #{body := Params}) ->
case rule_engine_update(Params) of
{ok, Config} ->
{200, format_rule_engine_resp(Config)};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
end.

%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
Expand All @@ -440,11 +475,9 @@ encode_nested_error(RuleError, Reason) ->
{RuleError, Reason}
end.

format_rule_resp(Rules) when is_list(Rules) ->
[format_rule_resp(R) || R <- Rules];
format_rule_resp({Id, Rule}) ->
format_rule_resp(Rule#{id => Id});
format_rule_resp(#{
format_rule_info_resp({Id, Rule}) ->
format_rule_info_resp(Rule#{id => Id});
format_rule_info_resp(#{
id := Id,
name := Name,
created_at := CreatedAt,
Expand All @@ -465,6 +498,9 @@ format_rule_resp(#{
description => Descr
}.

format_rule_engine_resp(Config) ->
maps:remove(rules, Config).

format_datetime(Timestamp, Unit) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).

Expand Down Expand Up @@ -661,3 +697,14 @@ run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, like, Pattern} | Fuzzy]) -
run_fuzzy_match(E, Fuzzy);
run_fuzzy_match(E, [_ | Fuzzy]) ->
run_fuzzy_match(E, Fuzzy).

rule_engine_update(Params) ->
case emqx_rule_api_schema:check_params(Params, rule_engine) of
{ok, _CheckedParams} ->
{ok, #{config := Config}} = emqx_conf:update([rule_engine], Params, #{
override_to => cluster
}),
{ok, Config};
{error, Reason} ->
{error, Reason}
end.
60 changes: 34 additions & 26 deletions apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
roots/0,
fields/1,
desc/1,
post_config_update/5
post_config_update/5,
rule_engine_settings/0
]).

-export([validate_sql/1]).
Expand All @@ -40,31 +41,13 @@ tags() ->
roots() -> ["rule_engine"].

fields("rule_engine") ->
[
{ignore_sys_message,
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
{rules,
?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
desc => ?DESC("rule_engine_rules"), default => #{}
})},
{jq_function_default_timeout,
?HOCON(
emqx_schema:duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC("rule_engine_jq_function_default_timeout")
}
)},
{jq_implementation_module,
?HOCON(
hoconsc:enum([jq_nif, jq_port]),
#{
default => jq_nif,
mapping => "jq.jq_implementation_module",
desc => ?DESC("rule_engine_jq_implementation_module")
}
)}
];
rule_engine_settings() ++
[
{rules,
?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
desc => ?DESC("rule_engine_rules"), default => #{}
})}
];
fields("rules") ->
[
rule_name(),
Expand Down Expand Up @@ -227,6 +210,31 @@ actions() ->
qos() ->
?UNION([emqx_schema:qos(), binary()]).

rule_engine_settings() ->
[
{ignore_sys_message,
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
{jq_function_default_timeout,
?HOCON(
emqx_schema:duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC("rule_engine_jq_function_default_timeout")
}
)},
{jq_implementation_module,
?HOCON(
hoconsc:enum([jq_nif, jq_port]),
#{
default => jq_nif,
mapping => "jq.jq_implementation_module",
desc => ?DESC("rule_engine_jq_implementation_module"),
deprecated => {since, "v5.0.22"},
importance => ?IMPORTANCE_HIDDEN
}
)}
].

validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, _Result} -> ok;
Expand Down

0 comments on commit f668ad7

Please sign in to comment.