Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sink Connector should publish all outstanding messages on flush #283

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jordanbull
Copy link
Contributor

When flush is called with a non-full batch, the publisher client may wait up to maxDelayThresholdMs before publishing the remaining messages introducing unnecessary tail latency on every flush. This change will force those messages to be published at the start of the flush call.

@google-cla
Copy link

google-cla bot commented Jun 17, 2021

Thanks for your pull request. It looks like this may be your first contribution to a Google open source project (if not, look below for help). Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

📝 Please visit https://cla.developers.google.com/ to sign.

Once you've signed (or fixed any issues), please reply here with @googlebot I signed it! and we'll verify it.


What to do if you already signed the CLA

Individual signers
Corporate signers

ℹ️ Googlers: Go here for more info.

@jordanbull
Copy link
Contributor Author

@googlebot I signed it!

@samarthsingal
Copy link
Collaborator

Can you elaborate why this is needed? As implemented the flush() call just processes the results of the publishing and the publisher does it own batching. flush() does not cause any publishing. With this, batching will be difficult to predict since it will at both flush time and when maxDelayThresholdMs elapses. If tail latency is a concern, have you tried just using a lower setting for maxDelayThresholdMs?

@jordanbull
Copy link
Contributor Author

Given Connect executes both the flush/commit and polls/puts in a synchronous loop, there will be no new messages arriving while waiting for flush to complete. Flush currently is awaiting completion of every message put in the producer anyways. Since there aren't any new messages coming, I figured we might as well send the batch immediately rather than wait for the time to elapse given the number of messages in that batch isn't going to change.

@samarthsingal
Copy link
Collaborator

My understanding was that poll/flush cycle was async and the sink would keep polling and delivering more messages regardless of whether the previous put() call had committed. That is my reading from https://github.com/a0x8o/kafka/blob/93185ee0de2349c7b8413f48611f3ce1e10dc951/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L215-L232- in each iteration() call it first tries to commit and then poll for new messages.

Is it documented that this is synchronous?

@jordanbull
Copy link
Contributor Author

My understanding was that poll/flush cycle was async and the sink would keep polling and delivering more messages regardless of whether the previous put() call had committed. That is my reading from https://github.com/a0x8o/kafka/blob/93185ee0de2349c7b8413f48611f3ce1e10dc951/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L215-L232- in each iteration() call it first tries to commit and then poll for new messages.

Is it documented that this is synchronous?

Unfortunately, I'm unable to find documentation that explicitly states that (maybe worth contributing to the java docs in connect), but as I read it, both the WorkerSinkTask code and the flush method of the PubSubSinkTask already depend on this fact for correctness.

IE PubSubSinkTask.flush would have a race in awaiting the future results and clearing them at https://github.com/GoogleCloudPlatform/pubsub/blob/master/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java#L369

Similarly in WorkerSinkTask polling adds offsets to the un-synchronized currentOffsets map and then the commit call clears that same map https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L571

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants