Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add /rule_engine API endpoint #10336

5 changes: 4 additions & 1 deletion apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

-export([roots/0, fields/1]).

-type tag() :: rule_creation | rule_test.
-type tag() :: rule_creation | rule_test | rule_engine.

-spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
check_params(Params, Tag) ->
Expand All @@ -48,12 +48,15 @@ check_params(Params, Tag) ->

roots() ->
[
{"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})},
{"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
{"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
{"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
{"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
].

fields("rule_engine") ->
emqx_rule_engine_schema:rule_engine_settings();
fields("rule_creation") ->
emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
Expand Down
69 changes: 58 additions & 11 deletions apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

%% API callbacks
-export([
'/rule_engine'/2,
'/rule_events'/2,
'/rule_test'/2,
'/rules'/2,
Expand All @@ -41,7 +42,7 @@
]).

%% query callback
-export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]).
-export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]).

-define(ERR_BADARGS(REASON), begin
R0 = err_msg(REASON),
Expand Down Expand Up @@ -134,6 +135,7 @@ api_spec() ->

paths() ->
[
"/rule_engine",
"/rule_events",
"/rule_test",
"/rules",
Expand All @@ -145,6 +147,9 @@ paths() ->
error_schema(Code, Message) when is_atom(Code) ->
emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).

rule_engine_schema() ->
ref(emqx_rule_api_schema, "rule_engine").

rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").

Expand Down Expand Up @@ -184,7 +189,7 @@ schema("/rules") ->
responses => #{
200 =>
[
{data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})},
{data, mk(array(rule_info_schema()), #{desc => ?DESC("api1_resp")})},
{meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
],
400 => error_schema('BAD_REQUEST', "Invalid Parameters")
Expand Down Expand Up @@ -289,6 +294,26 @@ schema("/rule_test") ->
200 => <<"Rule Test Pass">>
}
}
};
schema("/rule_engine") ->
#{
'operationId' => '/rule_engine',
get => #{
tags => [<<"rules">>],
description => ?DESC("api9"),
responses => #{
200 => rule_engine_schema()
}
},
put => #{
tags => [<<"rules">>],
description => ?DESC("api10"),
'requestBody' => rule_engine_schema(),
responses => #{
200 => rule_engine_schema(),
400 => error_schema('BAD_REQUEST', "Invalid request")
}
}
}.

param_path_id() ->
Expand All @@ -309,7 +334,7 @@ param_path_id() ->
QueryString,
?RULE_QS_SCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_rule_resp/1
fun ?MODULE:format_rule_info_resp/1
)
of
{error, page_limit_invalid} ->
Expand All @@ -331,7 +356,7 @@ param_path_id() ->
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{201, format_rule_resp(Rule)};
{201, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
msg => "create_rule_failed",
Expand Down Expand Up @@ -362,7 +387,7 @@ param_path_id() ->
'/rules/:id'(get, #{bindings := #{id := Id}}) ->
case emqx_rule_engine:get_rule(Id) of
{ok, Rule} ->
{200, format_rule_resp(Rule)};
{200, format_rule_info_resp(Rule)};
not_found ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end;
Expand All @@ -372,7 +397,7 @@ param_path_id() ->
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{200, format_rule_resp(Rule)};
{200, format_rule_info_resp(Rule)};
{error, Reason} ->
?SLOG(error, #{
msg => "update_rule_failed",
Expand Down Expand Up @@ -419,6 +444,16 @@ param_path_id() ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
end.

'/rule_engine'(get, _Params) ->
{200, format_rule_engine_resp(emqx_conf:get([rule_engine]))};
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
'/rule_engine'(put, #{body := Params}) ->
case rule_engine_update(Params) of
{ok, Config} ->
{200, format_rule_engine_resp(Config)};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
end.

%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
Expand All @@ -440,11 +475,9 @@ encode_nested_error(RuleError, Reason) ->
{RuleError, Reason}
end.

format_rule_resp(Rules) when is_list(Rules) ->
[format_rule_resp(R) || R <- Rules];
format_rule_resp({Id, Rule}) ->
format_rule_resp(Rule#{id => Id});
format_rule_resp(#{
format_rule_info_resp({Id, Rule}) ->
format_rule_info_resp(Rule#{id => Id});
format_rule_info_resp(#{
id := Id,
name := Name,
created_at := CreatedAt,
Expand All @@ -465,6 +498,9 @@ format_rule_resp(#{
description => Descr
}.

format_rule_engine_resp(Config) ->
maps:remove(rules, Config).

format_datetime(Timestamp, Unit) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).

Expand Down Expand Up @@ -661,3 +697,14 @@ run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, like, Pattern} | Fuzzy]) -
run_fuzzy_match(E, Fuzzy);
run_fuzzy_match(E, [_ | Fuzzy]) ->
run_fuzzy_match(E, Fuzzy).

rule_engine_update(Params) ->
case emqx_rule_api_schema:check_params(Params, rule_engine) of
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
{ok, _CheckedParams} ->
{ok, #{config := Config}} = emqx_conf:update([rule_engine], Params, #{
override_to => cluster
}),
{ok, Config};
{error, Reason} ->
{error, Reason}
end.
60 changes: 34 additions & 26 deletions apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
roots/0,
fields/1,
desc/1,
post_config_update/5
post_config_update/5,
rule_engine_settings/0
]).

-export([validate_sql/1]).
Expand All @@ -40,31 +41,13 @@ tags() ->
roots() -> ["rule_engine"].

fields("rule_engine") ->
[
{ignore_sys_message,
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
{rules,
?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
desc => ?DESC("rule_engine_rules"), default => #{}
})},
{jq_function_default_timeout,
?HOCON(
emqx_schema:duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC("rule_engine_jq_function_default_timeout")
}
)},
{jq_implementation_module,
?HOCON(
hoconsc:enum([jq_nif, jq_port]),
#{
default => jq_nif,
mapping => "jq.jq_implementation_module",
desc => ?DESC("rule_engine_jq_implementation_module")
}
)}
];
rule_engine_settings() ++
[
{rules,
?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
desc => ?DESC("rule_engine_rules"), default => #{}
})}
];
fields("rules") ->
[
rule_name(),
Expand Down Expand Up @@ -227,6 +210,31 @@ actions() ->
qos() ->
?UNION([emqx_schema:qos(), binary()]).

rule_engine_settings() ->
[
{ignore_sys_message,
?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
{jq_function_default_timeout,
?HOCON(
emqx_schema:duration_ms(),
#{
default => <<"10s">>,
desc => ?DESC("rule_engine_jq_function_default_timeout")
}
)},
{jq_implementation_module,
?HOCON(
hoconsc:enum([jq_nif, jq_port]),
#{
default => jq_nif,
mapping => "jq.jq_implementation_module",
desc => ?DESC("rule_engine_jq_implementation_module"),
deprecated => {since, "v5.0.22"},
importance => ?IMPORTANCE_HIDDEN
}
)}
].

validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, _Result} -> ok;
Expand Down