Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(emqx_rule_engine): set inc_action_metrics as async_reply_fun #11126

Merged
merged 10 commits into from
Jun 30, 2023
8 changes: 4 additions & 4 deletions apps/emqx_bridge/src/emqx_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

-export([
send_message/2,
send_message/4
send_message/5
]).

-export([config_key_path/0]).
Expand Down Expand Up @@ -220,14 +220,14 @@ send_to_matched_egress_bridges(Topic, Msg) ->
send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
send_message(BridgeType, BridgeName, ResId, Message).
send_message(BridgeType, BridgeName, ResId, Message, #{}).

send_message(BridgeType, BridgeName, ResId, Message) ->
send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) ->
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
not_found ->
{error, bridge_not_found};
#{enable := true} = Config ->
QueryOpts = query_opts(Config),
QueryOpts = maps:merge(query_opts(Config), QueryOpts0),
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
#{enable := false} ->
{error, bridge_stopped}
Expand Down
5 changes: 2 additions & 3 deletions apps/emqx_bridge/test/emqx_bridge_testlib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ delete_all_bridges() ->
).

%% test helpers
parse_and_check(Config, ConfigString, Name) ->
BridgeType = ?config(bridge_type, Config),
parse_and_check(BridgeType, BridgeName, ConfigString) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := BridgeConfig}}} = RawConf,
#{<<"bridges">> := #{BridgeType := #{BridgeName := BridgeConfig}}} = RawConf,
BridgeConfig.

resource_id(Config) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,14 @@ bridge_config(TestCase, _TestGroup, Config) ->
Host = ?config(bridge_host, Config),
Port = ?config(bridge_port, Config),
Version = ?config(iotdb_version, Config),
Type = ?config(bridge_type, Config),
Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary
>>,
ServerURL = iotdb_server_url(Host, Port),
ConfigString =
io_lib:format(
"bridges.iotdb.~s {\n"
"bridges.~s.~s {\n"
" enable = true\n"
" base_url = \"~s\"\n"
" authentication = {\n"
Expand All @@ -142,12 +143,13 @@ bridge_config(TestCase, _TestGroup, Config) ->
" }\n"
"}\n",
[
Type,
Name,
ServerURL,
Version
]
),
{Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}.
{Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}.

make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
#{
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx_resource/include/emqx_resource.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
expire_at => infinity | integer(),
async_reply_fun => reply_fun(),
simple_query => boolean(),
is_buffer_supported => boolean()
is_buffer_supported => boolean(),
reply_to => reply_fun()
}.
-type resource_data() :: #{
id := resource_id(),
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_resource/src/emqx_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ query(ResId, Request, Opts) ->
{simple_sync, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
{sync, _} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{async, _} ->
Expand Down
51 changes: 38 additions & 13 deletions apps/emqx_resource/src/emqx_resource_buffer_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

-export([
simple_sync_query/2,
simple_sync_query/3,
simple_async_query/3
]).

Expand All @@ -61,7 +62,7 @@
-define(COLLECT_REQ_LIMIT, 1000).
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)).
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
Expand Down Expand Up @@ -133,6 +134,10 @@ async_query(Id, Request, Opts0) ->
%% simple query the resource without batching and queuing.
-spec simple_sync_query(id(), request()) -> term().
simple_sync_query(Id, Request) ->
simple_sync_query(Id, Request, #{}).

-spec simple_sync_query(id(), request(), query_opts()) -> term().
simple_sync_query(Id, Request, QueryOpts0) ->
%% Note: since calling this function implies in bypassing the
%% buffer workers, and each buffer worker index is used when
%% collecting gauge metrics, we use this dummy index. If this
Expand All @@ -141,10 +146,11 @@ simple_sync_query(Id, Request) ->
%% `emqx_resource_metrics:*_shift/3'.
?tp(simple_sync_query, #{id => Id, request => Request}),
Index = undefined,
QueryOpts = simple_query_opts(),
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
emqx_resource_metrics:matched_inc(Id),
Ref = make_request_ref(),
Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: now we have 2 different query option keys to specify async reply functions. Maybe we could choose only one of async_reply_fun and reply_to for this query option to avoid confusion?

Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts),
_ = handle_query_result(Id, Result, _HasBeenSent = false),
Result.

Expand All @@ -156,7 +162,10 @@ simple_async_query(Id, Request, QueryOpts0) ->
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
emqx_resource_metrics:matched_inc(Id),
Ref = make_request_ref(),
Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
Result = call_query(
async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts
),
_ = handle_query_result(Id, Result, _HasBeenSent = false),
Result.

Expand Down Expand Up @@ -308,31 +317,31 @@ code_change(_OldVsn, State, _Extra) ->
end
).

pick_call(Id, Key, Query, Timeout) ->
pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) ->
?PICK(Id, Key, Pid, begin
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
receive
{MRef, Response} ->
erlang:demonitor(MRef, [flush]),
Response;
maybe_reply_to(Response, QueryOpts);
{'DOWN', MRef, process, Pid, Reason} ->
error({worker_down, Reason})
after Timeout ->
erlang:demonitor(MRef, [flush]),
receive
{MRef, Response} ->
Response
maybe_reply_to(Response, QueryOpts)
after 0 ->
error(timeout)
end
end
end).

pick_cast(Id, Key, Query) ->
pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) ->
?PICK(Id, Key, Pid, begin
ReplyTo = undefined,
ReplyTo = maps:get(reply_to, QueryOpts, undefined),
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
ok
end).
Expand Down Expand Up @@ -1051,9 +1060,14 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
end
).

apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
apply_query_fun(
sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts
) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
maybe_reply_to(
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
QueryOpts
);
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
Expand Down Expand Up @@ -1081,12 +1095,17 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
end,
Request
);
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) ->
apply_query_fun(
sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts
) ->
?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}),
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
maybe_reply_to(
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch),
QueryOpts
);
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
Expand Down Expand Up @@ -1118,6 +1137,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
Batch
).

maybe_reply_to(Result, #{reply_to := ReplyTo}) ->
do_reply_caller(ReplyTo, Result),
Result;
maybe_reply_to(Result, _) ->
Result.

handle_async_reply(
#{
request_ref := Ref,
Expand Down
36 changes: 22 additions & 14 deletions apps/emqx_rule_engine/src/emqx_rule_runtime.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
-export([
apply_rule/3,
apply_rules/3,
clear_rule_payload/0
clear_rule_payload/0,
inc_action_metrics/2
]).

-import(
Expand Down Expand Up @@ -323,9 +324,7 @@ handle_action_list(RuleId, Actions, Selected, Envs) ->
handle_action(RuleId, ActId, Selected, Envs) ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
try
Result = do_handle_action(ActId, Selected, Envs),
inc_action_metrics(Result, RuleId),
Result
do_handle_action(RuleId, ActId, Selected, Envs)
catch
throw:out_of_service ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
Expand All @@ -345,21 +344,29 @@ handle_action(RuleId, ActId, Selected, Envs) ->
})
end.

do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target).
do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
?TRACE(
"BRIDGE",
"bridge_action",
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
),
case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]},
case
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
of
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
throw(out_of_service);
?RESOURCE_ERROR_M(R, _) when ?IS_RES_DOWN(R) ->
throw(out_of_service);
Result ->
Result
end;
do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
%% the function can also throw 'out_of_service'
Mod:Func(Selected, Envs, Args).
Result = Mod:Func(Selected, Envs, Args),
inc_action_metrics(RuleId, Result),
Result.

eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload));
Expand Down Expand Up @@ -512,14 +519,15 @@ nested_put(Alias, Val, Columns0) ->
Columns = handle_alias(Alias, Columns0),
emqx_rule_maps:nested_put(Alias, Val, Columns).

-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
inc_action_metrics(RuleId, Result) ->
_ = do_inc_action_metrics(RuleId, Result),
Result.

do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) ->
do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
inc_action_metrics(R, RuleId) ->
do_inc_action_metrics(RuleId, R) ->
case is_ok_result(R) of
false ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
Expand Down