Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka_producer): send messages to wolff producer to buffer even when connector is in connecting state (sync query mode) #11722

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_impl_producer).

-behaviour(emqx_resource).

-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("snabbkaffe/include/trace.hrl").

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_resource/src/emqx_resource.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.23"},
{vsn, "0.1.24"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [
Expand Down
3 changes: 1 addition & 2 deletions apps/emqx_resource/src/emqx_resource.erl
Expand Up @@ -306,8 +306,7 @@ query(ResId, Request, Opts) ->
{simple_async, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
Opts1 = Opts#{is_buffer_supported => true},
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
Expand Down
6 changes: 4 additions & 2 deletions apps/emqx_resource/src/emqx_resource_buffer_worker.erl
Expand Up @@ -1048,7 +1048,9 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
?RESOURCE_ERROR(not_found, "resource not found")
end.

do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) ->
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do the authn authz resources end up calling this path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they end up calling this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I introduce a new query mode type: #11724
Perhaps it would be more appropriate to restrict this behavior to that more specific type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think we would also need to introduce a new query mode for the async version of a buffered connector, to have the correct semantics as well. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResQM =:= simple_async; ResQM =:= simple_sync
->
%% The connector supports buffer, send even in disconnected state
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
CallMode = call_mode(QM, CBM),
Expand All @@ -1059,7 +1061,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Res
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Resource) ->
?RESOURCE_ERROR(not_connected, "resource not connected").

-define(APPLY_RESOURCE(NAME, EXPR, REQ),
Expand Down
1 change: 1 addition & 0 deletions changes/ee/fix-11722.en.md
@@ -0,0 +1 @@
Fixed an issue where a Kafka Producer bridge with `sync` query mode would not buffer messages when in the `connecting` state.