New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KAFKA-9965] Fix accumulator tryAppend, so that fresh new producerBatch is created #12462
base: trunk
Are you sure you want to change the base?
Conversation
@cmccabe @jonmcewen I see previous two PRs (#11326 and #8690) that fixes the RoundRobinPartitioner. I think making the change in RecordAccumulator will naturally solve the issue (for all such affected Partitioners). wdyt ? Also tagging @ashwinpankaj @ijuma for reviews |
cc @artemlivshits @junrao since you both worked on recent partitioner changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for picking this up @sudeshwasnik . LGTM.
} else { | ||
int appendedBytes = last.estimatedSizeInBytes() - initialBytes; | ||
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes); | ||
} | ||
} | ||
return null; | ||
return new RecordAppendResult(null, false, true, false, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment that new batch needs to be created
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Outdated
Show resolved
Hide resolved
// Return a result that will cause another call to append. | ||
// either 1. current topicPartition producerBatch is full - return and prepare for another batch/partition. | ||
// 2. no producerBatch existed for this topicPartition, create a new producerBatch. | ||
if (appendResult == null && abortOnNewBatch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like there are still cases when onNewBatch logic is invoked and partition could be switched for the second time, and it seems to change the behavior of DefaultPartitioner - if the batch is sent (removed from the queue) before a new record is produced, onNewBatch isn't going to get called and it'll get stuck with the current partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ - if there are no batches in current partition, are there cases we would still want to switch partitions? Technically, the partition is completely empty at this point, and we can create and store new batches. I'll revisit the code and get back asap if there exists some case where we wouldn't want to fill empty batches
in partitions and instead want to switch.
Apart from that, here is some context on intended change from my limited knowledge on Partitioning logic ->
@artemlivshits IIUC onNewBatch should only be called when existing batch is full.
The difference this PR is trying to make with this condition here is -
- if appendResult is null, that means existing batch for current partition is full -> in this case, return and call onNewBatch.
- if appendResult isn't null, at this point will also imply appendResult.newBatchCreated is true. And producerBatch isn't completed for current partition, onNewbatch should not be called. Instead we should stick to this partition, create a producerBatch and fill it. We shouldn't switch partitions here.
wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there are no batches in current partition, are there cases we would still want to switch partitions?
It could happen that batch got sent out by the the time next record arrives. When linger.ms=0 (the default), the sender would try to send the batch out as soon as possible. So the sequence of events could look like this:
- batch queue is empty.
- append record: create a new batch, but don't switch partitions because queue is empty
- sender drains the queue, so now queue is empty
- append record: create a new batch, but don't switch partitions because queue is empty
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense! with current approach we could get stuck to same partition, leading to starvation.
There are now two constraints - 1. Prevent even partitions getting skipped consistently when there are no producerBatches corresponding to this partition. 2. Do not always select the same partition when producerBatch is empty. ie, detect if recent drain was done for this partition and skip it in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with these conditions, i have made some modifications to the original PR, will summarize the new approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artemlivshits the approach now is little brittle tbh. We could have had a timeout after drain for a partition, within which partition should get skipped. But that approach seemed to make the solution complex. Wdyt?
Any other approach you think we should think about instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sudeshwasnik
I am following up with this fix to make kafka to have proper roundrobin. i had a thought why cant we put the partitionid in record value..so if the partitioner called twice the paritioner in record will be there and twice call of partitioner wont happen.
if (record.partition() != null)
return record.partition();
this always returns null.
…main null when either partition is encountered first time, batches are cleaned up after expiration OR when sender clears deque
efe62d2
to
772ffd8
Compare
hey @artemlivshits , thanks for pointing out that if we just check if partition's producerBatches are empty -> we may get stuck selecting the same partition if producerBatches keep getting cleared up by sender during drain. Here is a new approach -
example -
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sudeshwasnik : Thanks for finding the issue and providing a PR. Left a comment below.
// 2. DQ was removed due to - all batches were cleared due to expiration or sender cleared batches after draining. | ||
// if so, abort and look to call partitioner -> onNewBatch and select other partition. | ||
// This prevents a single partition getting re-selected after recent drain. | ||
if (abortOnNewBatch && noDqForPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this completely addresses the issue. For example, if the producer adds records relatively slowly, it's possible that by the time the RoundRobinPartitioner adds the record to the same partition again, the batches for that partition have already been drained.
Here is another possibility. We could change the computation for the following code in KafkaProducer. If a partitioner is defined, but doesn't override the default implementation of onNewBatch
(which is the case for RoundRobinPartitioner), we could also set abortOnNewBatch to false. We could use reflection to get the method and do the isDefault
test.
// A custom partitioner may take advantage on the onNewBatch callback.
boolean abortOnNewBatch = partitioner != null;
What do you think @jolshan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if a partitioner is defined, but doesn't override the default implementation of onNewBatch
I think this is the only correct way to fix this without breaking partitioners that depend on the onNewBatch being called to switch partitions. Given that the onNewBatch is deprecated now, we should avoid abortOnNewBatch logic unless it's defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an issue for all partitioners or just RoundRobinPartitioner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is potentially an issue for any partitioner not overriding the default empty implementation for onNewBatch
since the partitioner is unnecessarily called a second time for the same record on appending to a new batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah -- it seems like abortOnNewBatch being hardcoded has always been an issue. Any partitioner that doesn't implement it (and essentially batch on partitions like the old sticky partitioner) doesn't really have a use for it as far as I can see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old sticky partitioner changed the behavior such that partition
is called one more time when a batch is created (which is not a problem for the old sticky partitioner, because partition
isn't changing state, the state is changed by onNewBatch
) but changes behavior of pretty much all other partitioners that change state on partition
. The RoundRobinPartitioner
probably has the most visible effects, but all partitioners that change state on partition
are affected.
It's kind of surprising that this change of behavior wasn't noticed for a while (which probably indicates how often custom partitioners are used).
Eliminating the second call of partition
would change the partition switching pattern for the old sticky partitioner, and we didn't want to do that in case people want to switch back to the default logic. So doing the reflection lookup is probably the best way to separate when we need the onNewBatch
& second partition
call, vs. doing one partition
call per tryAppend
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually was noticed back in 2020. :( I just missed the comments. 717c55b#r39200381
We also saw a ticket filed for this 2 days ago. https://issues.apache.org/jira/browse/KAFKA-14573
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using reflection is a creative option, but it does mean that if someone overrides the method to eg add a log, then suddenly things will break. Which would be quite surprising (it goes without saying).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compatibility is tricky here. I think back when we implemented this I remember there were some concerns about the onNewBatch method since we wanted to avoid too many changes to the interface and allow for old partitioners to work as intended. I suppose if there was a way to avoid "abortOnNewBatch" and the subsequent call to partition, that would be ideal, but not sure it is something we can add now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onNewBatch
is now deprecated, so hopefully, people would take a hint not to override it :-)
But, yeah, agree with calling it creative (I wouldn't have come up with that given my C++ background -- these ideas are blocked before they appear in my mind).
Where are we with this? |
I think it's still a choice between breaking old default sticky partitioner logic vs. not fixing even older round robin partitioner logic (which has been broken for a few years) vs. using reflection to sort of eat the cake and have it too. One thing that might've changed with time is that if people got confidence in the new built-in partitioner logic, maybe it's ok to break the old default sticky partitioner, but it's hard to tell. |
I want RoundRobinPartitioner to work again. When might this PR be merged and released? |
@artemlivshits unfortunately, I won't be able to contribute to this PR anymore. Thank you everyone for the reviews, I apologize for the discontinuation. |
Issue - partitioner.partition is called once. For this partition ->
accumulator
would try to find existing batch present to fill record in.If not, it would abortAppend on record in this partition buffer, and flag
abortForNewBatch
.With
abortForNewBatch
, partitioner.partition is called again! RoundRobin partition would increment the counter twice for the same record (when key, and record.partition is null). For even number of partitions, only half of them are filled up.Since every second partition gets records filled in batch,
accumulator
will always find empty batch for current partition, and hence it will always get skipped.Solution
here is what i had in mind with the approach on this PR
Committer Checklist (excluded from commit message)