Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9312: Wait for splitted batches to be processed after a KafkaProducer#flush() #7877

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from

Conversation

jonathansantilli
Copy link
Contributor

This commit adds the logic to wait for splitted batches to be processed after a Message Too Large Exception has been received.
Also, add a new test class to cover IncompleteBatches class.

This code adds a new constructor to the classes ProducerBatch and RecordAccumulator visible to the package, this decouples the dependency between ProduceRequestResult and IncompleteBatches respectively and allow to test the method RecordAccumulator#awaitFlushCompletion()

The Jira ticket provides a test here that prove the error without these changes.
This PR does not include that specific test since it involves sleeping the thread and could lead to indeterminate behavior.

@jonathansantilli
Copy link
Contributor Author

Hi @lbradstreet this solves the bug reported by you at Jira, can you take a look, please?

@lbradstreet
Copy link
Contributor

@jonathansantilli thanks for the PR, and some much better test cases!

One potential problem with this approach is that in relatively degenerate cases, I do not think we are guaranteed that a record batch will not have to be split again. Given that we only wait for batches that are splitted after the first phase, I think we could still end up returning from the flush early. These cases are not ideal, given that splitting record batches is a relatively expensive process requiring multiple round trips, but I believe it's still possible in cases where compression predictions are way off.

A second lesser problem is that we can end up waiting on batches that were not sent before the flush call, but ended up being split.

The only way I can think of to solve the above problems are to have the produceFuture(s) be considered completed when their children (split) batches are all completed. In this design, awaitFlushCompletion could likely stay as it is, and we would not end up waiting for batches that had not been produced at the time of the flush call, or end up returning too early in cases where multiple splits have occurred.

@jonathansantilli
Copy link
Contributor Author

Thanks a lot for your reply @lbradstreet, about your comments:

  • "...I do not think we are guaranteed that a record batch will not have to be split again."
    Oh, this is not the behavior I was expecting, I guess the design should have a reason for that. Although I have no much experience with this code, I thought after a batch is splitted, I will be splitted on N baches whose size is less than the configured/desired batchSize, for each new batch.
    I think I need to check that part of the code to understand it better. Thanks for the heads-up.

  • "A second lesser problem is that we can end up waiting on batches that were not sent before the flush call, but ended up being split."
    I think even today we have the same issue/behavior
    According to the documentation:
    "Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, however no guarantee is made about the completion of records sent after the flush call begins."

About your proposal, If I got it right, we need a way to track each batch and children (splitted batches) created after the original batch.

Something like:

  • Batch_1 creates Batch_2 and Batch_3
    -- Batch_1 needs to wait for Batch_2 and Batch_3 to be finished.
  • Batch_2 creates Batch_4
    -- Batch_2 needs to wait for Batch_4 to finish

So, each ProducerBatch.produceFuture has associated with itself, each ProducerBatch (children) created after receiving a RecordBatchTooLargeException

I will revise the code, please tell me whatever comes to your mind after this comment or future thoughts.

BTW, Happy new year!

@lbradstreet
Copy link
Contributor

lbradstreet commented Dec 31, 2019

Thanks a lot for your reply @lbradstreet, about your comments:

  • "...I do not think we are guaranteed that a record batch will not have to be split again."
    Oh, this is not the behavior I was expecting, I guess the design should have a reason for that. Although I have no much experience with this code, I thought after a batch is splitted, I will be splitted on N baches whose size is less than the configured/desired batchSize, for each new batch.
    I think I need to check that part of the code to understand it better. Thanks for the heads-up.

Actually, you may be right that it will only split by the producer's configured batch.size. If this size is still too large, then it may not be able to split it further anyway, if this is the way that it splits, as it may end up making the same choices as before? I'm not positive though, I believe the compression estimates make this pretty non-deterministic, so it would be good to investigate this further before going ahead with any of the changes I've suggested.

  • "A second lesser problem is that we can end up waiting on batches that were not sent before the flush call, but ended up being split."
    I think even today we have the same issue/behavior
    According to the documentation:
    "Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, however no guarantee is made about the completion of records sent after the flush call begins."

That is true, though I think we may end up waiting longer in this case, since we will wait for the first set of futures to complete before building the list of the second (split) set of futures. On the positive side, your code doesn't retry the overall awaitFutureCompletion, which wouldn't be guaranteed to ever complete if other threads continue to produce messages at the same time.

About your proposal, If I got it right, we need a way to track each batch and children (splitted batches) created after the original batch.

Something like:

  • Batch_1 creates Batch_2 and Batch_3
    -- Batch_1 needs to wait for Batch_2 and Batch_3 to be finished.
  • Batch_2 creates Batch_4
    -- Batch_2 needs to wait for Batch_4 to finish

So, each ProducerBatch.produceFuture has associated with itself, each ProducerBatch (children) created after receiving a RecordBatchTooLargeException

Yes, that's the idea. Essentially when Batch_2 and Batch_3 are completed, Batch_1's future would end up being completed too. It'd be worth checking whether any code other than awaitFlushCompletion depends on the split batch's future being completed to ensure that it isn't processed further. It would obviously be bad if the producer code could end up retrying and re-splitting a batch more than once because we did not set the original batch to done after the first split.

I will revise the code, please tell me whatever comes to your mind after this comment or future thoughts.

BTW, Happy new year!

Happy new year!

@jonathansantilli
Copy link
Contributor Author

Great! thanks for the reply, am doing some changes and I will take this into consideration:

It'd be worth checking whether any code other than awaitFlushCompletion depends on the split batch's future being completed to ensure that it isn't processed further

@jonathansantilli
Copy link
Contributor Author

Hello, @lbradstreet please take a look at the additions/updated and let me know what you think, please.

@lbradstreet
Copy link
Contributor

@jonathansantilli thanks, I'll give it a look :).

}

ProducerBatch(final TopicPartition tp, final MemoryRecordsBuilder recordsBuilder, final long createdMs,
final boolean isSplitBatch, final ProduceRequestResult produceFuture, final List<ProducerBatch> childrenProducerBatch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is childrenProducerBatch part of this constructor, if the only user of it is the other constructor? getChildrenProducer batch is public so I don't think it needs to be this way for testing either?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that way we decouple the children List as a collaborator (the constructor ins package-private), otherwise, we need to mimic the behavior of:
public Deque<ProducerBatch> split(int splitBatchSize) in order to populate the list of batches (children) that will be created as a consequence of the split process.

@abbccdda
Copy link
Contributor

abbccdda commented Jan 6, 2020

Thanks for the PR. I will try to take a look before end of day

@ijuma
Copy link
Contributor

ijuma commented Jan 7, 2020

retest this please

Copy link
Contributor

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks good to me, do you think it's also necessary to add another integration test for more complex scenarios just as further splits of a batch, or Lucas' test is sufficient?

this.retry = false;
this.isSplitBatch = isSplitBatch;
this.childrenProducerBatch = childrenProducerBatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have multiple null check for childrenProducerBatch which is not necessary, instead we could just reject here if the given childrenProducerBatch is null to ensure it's non-null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If I got your idea correctly)
We were doing that previously, but in that case, we will be always creating an ArrayList reserving memory that may not be used in case a batch does not get splitted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other way we could go is to create new ArrayList(0) if we think that all these null checks are a pain. I do think we should avoid creating a new ArrayList with pre-allocated elements with every batch though. I'm happy with things are now, but we should probably leave a comment next to https://github.com/apache/kafka/pull/7877/files#diff-e735b14a0fe9e843e528c2ce721602cbR72 with our reasoning or we risk it being changed inadvertently.

@@ -45,7 +46,7 @@ public void remove(ProducerBatch batch) {
}
}

public Iterable<ProducerBatch> copyAll() {
public List<ProducerBatch> copyAll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why do we need to change it to type List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it testable, or easier to test, the List still could be get as an Iterable, in fact, that class did not have any test, this PR is adding the tests for it.

childrenProducerBatch.add(batch);
}

private List<ProducerBatch> getChildrenProducerBatch() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we could get rid of null check, addChildrenProducerBatch and getChildrenProducerBatch could be removed as well.

@jonathansantilli
Copy link
Contributor Author

Thanks for looking at the PR @abbccdda about you comment I did not add @lbradstreet test to this PR, as I explain in the PR description:

The Jira ticket provides a test here that proves the error without these changes.
This PR does not include that specific test since it involves sleeping the thread and could lead to indeterminate behavior.

However, I can add another test where the batch gets splitted twice.

@jonathansantilli
Copy link
Contributor Author

retest this please

@jonathansantilli
Copy link
Contributor Author

Hello, @abbccdda @lbradstreet do you guys think this PR is ready to be merged? something else missing?

Cheers!

This commit adds the logic to wait for splitted batches to be processed after a Message Too Large Exception has been received.
Also, add a new test class to cover IncompleteBatches class.
…it batches

With this commit, when a KafkaProducer#flush() call starts, the RecordAccumulator#awaitFlushCompletion()
will wait for all subsequence splitted batches are set as done before returning.
Create the list of ProducerBatch children just if need, after the batch gets splitted.
Also, avoid exposing the ProduceRequestResult (produceFuture), managing internally at ProducerBatch the operations related to it.
@ijuma
Copy link
Contributor

ijuma commented Jan 15, 2020

ok to test

@ijuma
Copy link
Contributor

ijuma commented Jan 15, 2020

This may have been discussed already. But what is the reason why we can't change the chained future to handle all of this transparently?

@jonathansantilli
Copy link
Contributor Author

Thanks for your comment @ijuma

The future is not chained and a new ProducerBatch is created after the ProducerBatch#split has been called and the current future gets completed setting and RecordBatchTooLargeException to it.

We are kind of handling the subsequences calls to future#await() into the new method ProducerBatch#await and no longer exposing the internal future of ProducerBatch.

I hope I got your point correctly, if not, please, let me know our point of view about a possible different solution, looking forward to it.

Copy link
Contributor

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM~

@jonathansantilli
Copy link
Contributor Author

Thanks, @abbccdda do you know who can help us to merge this?

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the above question is whether we can avoid completing the current future with the RecordBatchTooLargeException (seems no one is interested in knowing if it ever gets to this exception and it is swallowed internally anyways) but just chain it with its children futures -- i.e. this parent future would keep a count e.g. and would only complete after all of its children futures have completed and decrementing it to 0 -- this is done in a few places nowadays as we chain a second request after a first response and only letting the returned future to complete after the second step is completed --- but personally I felt that for this case since we do not know if we would be splitting, hence chaining the future at the beginning is doable abut may be a bit more complex to implement.

I made a pass over this PR and it lgtm overall (just a qq about the local field memory barrier). @lbradstreet if you feel it is good to go too I can merge it as-is.

batches.add(batch);
addChildrenProducerBatch(batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering here is critical since we need to make sure the children batches are constructed before we can lifting the parent's future below. I'm wondering if JIT would ever reorder it and thus should we make childrenProducerBatch volatile?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is childrenProducerBatch ever accessed without a lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @guozhangwang not sure if the JIT would ever reorder it, I can change the List<ProducerBatch> childrenProducerBatch to volatile, however, making the List volatile means that the List itself will always be read fresh and never thread cached, but theProducerBatch within the List will not be, is that what we are looking for? if yes, I can update it for sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @ijuma although all methods that interact with childrenProducerBatch are synchronized, I notice now that the method private synchronized List<ProducerBatch> getChildrenProducerBatch() returns a reference to the List childrenProducerBatch, hence two different threads could interact with it without waiting for each other to finish the interaction with the List childrenProducerBatch

I will make synchronized the method private void waitForPossibleSplittedBatches() which is the only place where the method getChildrenProducerBatch() is called, in that way, the access to the List childrenProducerBatch will be always synchronized.

Let me know please how that sounds to you, please.

@jonathansantilli
Copy link
Contributor Author

Hello @guozhangwang thanks for the clarification in your comment, I think I got it now.
I am with you that we can change the current behavior and chain future call (produceFuture.done()) also am with you that it could be a little bit more complex to implement.
If we think that's the way to go, I will change the code to accomplish what you described in your comment.

Please, let me know what you all think.

@jonathansantilli
Copy link
Contributor Author

@ijuma with the latest commit childrenProducerBatch will be never accessed without a lock.

clearChildrenProducerBatch();
}

private synchronized void waitForPossibleSplittedBatches() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is synchronized and will await on the produceFuture. await() is called by awaitFlushCompletion() which is called when a user calls flush(). I am concerned that a user can call flush() and end up effectively dead locking other operations on the ProducerBatch, as getChildrenProducerBatch and addChildrenProducerBatch will not be able to be called by other threads - my concern is that the sender thread may become deadlocked in splitAndReenqueue in this state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point @lbradstreet if the sender thread is working on a split batch and the producer#flush() is called, both will be blocking each other since we need to synchronize the access to the childrenProducerBatch List.

In that case, we should use another mechanism to keep track of the created ProducerBatch when a split occurs.

@ctan888
Copy link
Contributor

ctan888 commented Sep 2, 2020

Hi @jonathansantilli . Are you still working on this PR?

@jonathansantilli
Copy link
Contributor Author

Hi there @d8tltanc sorry about the delay with my reply.

I would like to continue working on this PR. There was some consensus at the beginning about a possible solution, the one implemented in this PR, but then @lbradstreet raised some valid concerns about possible locking.

I did ask for some guidance to implement the needed changes but I guess this PR got forgotten.

Would be fantastic to get some input from the people mentioned in this PR.

@kirktrue
Copy link
Contributor

@jonathansantilli are you willing/able to work on this still?

@jonathansantilli
Copy link
Contributor Author

Hi @kirktrue, yes, I was pending on @lbradstreet guidance or anyone else since there was a potential blocking issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants