Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix({kafka,pulsar}_producer): correctly handle metrics for connectors that have internal buffers #11724

Merged
merged 5 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,9 +32,9 @@
-define(kafka_producers, kafka_producers).

query_mode(#{kafka := #{query_mode := sync}}) ->
simple_sync;
simple_sync_internal_buffer;
query_mode(_) ->
simple_async.
simple_async_internal_buffer.

callback_mode() -> async_if_possible.

Expand Down
Expand Up @@ -133,15 +133,15 @@ t_query_mode(CtConfig) ->
end,
fun(Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace))
?assertMatch([_], ?of_kind(simple_sync_internal_buffer_query, Trace))
end
),
?check_trace(
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
end,
fun(Trace) ->
%% We should have a sync Snabbkaffe trace
%% We should have an async Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
end
),
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,
Expand Down
Expand Up @@ -73,7 +73,7 @@
callback_mode() -> async_if_possible.

query_mode(_Config) ->
simple_async.
simple_async_internal_buffer.

-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) ->
Expand Down
14 changes: 10 additions & 4 deletions apps/emqx_resource/include/emqx_resource.hrl
Expand Up @@ -22,11 +22,18 @@
-type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped.
-type callback_mode() :: always_sync | async_if_possible.
-type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
-type query_mode() ::
simple_sync
| simple_async
| simple_sync_internal_buffer
| simple_async_internal_buffer
| sync
| async
| no_queries.
-type result() :: term().
-type reply_fun() ::
{fun((result(), Args :: term()) -> any()), Args :: term()}
| {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()}
{fun((...) -> any()), Args :: [term()]}
| {fun((...) -> any()), Args :: [term()], reply_context()}
| undefined.
-type reply_context() :: #{reply_dropped => boolean()}.
-type query_opts() :: #{
Expand All @@ -36,7 +43,6 @@
expire_at => infinity | integer(),
async_reply_fun => reply_fun(),
simple_query => boolean(),
is_buffer_supported => boolean(),
reply_to => reply_fun()
}.
-type resource_data() :: #{
Expand Down
14 changes: 14 additions & 0 deletions apps/emqx_resource/src/emqx_resource.erl
Expand Up @@ -311,6 +311,20 @@ query(ResId, Request, Opts) ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
{simple_async_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
ResId, Request, Opts
);
{sync, _} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{async, _} ->
Expand Down
51 changes: 48 additions & 3 deletions apps/emqx_resource/src/emqx_resource_buffer_worker.erl
Expand Up @@ -39,7 +39,8 @@
-export([
simple_sync_query/2,
simple_sync_query/3,
simple_async_query/3
simple_async_query/3,
simple_sync_internal_buffer_query/3
]).

-export([
Expand All @@ -53,7 +54,9 @@

-export([queue_item_marshaller/1, estimate_size/1]).

-export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).
-export([
handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3
]).

-export([clear_disk_queue_dir/2]).

Expand Down Expand Up @@ -169,6 +172,42 @@ simple_async_query(Id, Request, QueryOpts0) ->
_ = handle_query_result(Id, Result, _HasBeenSent = false),
Result.

%% This is a hack to handle cases where the underlying connector has internal buffering
%% (e.g.: Kafka and Pulsar producers). Since the message may be inernally retried at a
%% later time, we can't bump metrics immediatelly if the return value is not a success
%% (e.g.: if the call timed out, but the message was enqueued nevertheless).
-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term().
simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
ReplyAlias = alias([reply]),
try
MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined),
QueryOpts1 = QueryOpts0#{
reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
},
QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
case simple_async_query(Id, Request, QueryOpts) of
{error, _} = Error ->
Error;
{async_return, {error, _} = Error} ->
Error;
{async_return, {ok, _Pid}} ->
receive
{ReplyAlias, Response} ->
Response
after Timeout ->
_ = unalias(ReplyAlias),
receive
{ReplyAlias, Response} ->
Response
after 0 -> {error, timeout}
end
end
end
after
_ = unalias(ReplyAlias)
end.

simple_query_opts() ->
ensure_expire_at(#{simple_query => true, timeout => infinity}).

Expand Down Expand Up @@ -1049,7 +1088,7 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
end.

do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
ResQM =:= simple_async; ResQM =:= simple_sync
ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
->
%% The connector supports buffer, send even in disconnected state
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
Expand Down Expand Up @@ -1908,6 +1947,12 @@ reply_call(Alias, Response) ->
erlang:send(Alias, {Alias, Response}),
ok.

%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
%% callbacks.
reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
?MODULE:reply_call(ReplyAlias, Response),
do_reply_caller(MaybeReplyTo, Response).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() ->
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_resource/src/emqx_resource_manager.erl
Expand Up @@ -147,9 +147,9 @@ create(ResId, Group, ResourceType, Config, Opts) ->
QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
case QueryMode of
%% the resource has built-in buffer, so there is no need for resource workers
simple_sync ->
simple_sync_internal_buffer ->
ok;
simple_async ->
simple_async_internal_buffer ->
ok;
%% The resource is a consumer resource, so there is no need for resource workers
no_queries ->
Expand Down
1 change: 1 addition & 0 deletions changes/ee/fix-11724.en.md
@@ -0,0 +1 @@
Fixed a metrics issue where messages sent to Kafka would count as failed even when they were successfully sent late due to its internal buffering.
zmstone marked this conversation as resolved.
Show resolved Hide resolved