Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5886: Implement KIP-91 delivery.timeout.ms #3849

Closed
wants to merge 1 commit into from

Conversation

sutambe
Copy link
Contributor

@sutambe sutambe commented Sep 13, 2017

First shot at implementing kip-91 delivery.timeout.ms. Summary

  1. Added delivery.timeout.ms config. Default 120,000
  2. Changed retries to MAX_INT.
  3. batches may expire whether they are inflight or not. So muted is no longer used in RecordAccumulator.expiredBatches.
  4. In some rare situations batch.done may be called twice. Attempted transitions from failed to succeeded are logged. Successful to successful is an error (exception as before). Other transitions (failed-->aborted, aborted-->failed) are ignored.
  5. The old test from RecordAccumulatorTest is removed. It has three additional tests. testExpiredBatchSingle, testExpiredBatchesSize, testExpiredBatchesRetry. All of them test that expiry is independent of muted.

@sutambe sutambe changed the title Implement KIP-91 delivery.timeout.ms KAFKA-5886: Implement KIP-91 delivery.timeout.ms Sep 13, 2017
abortRecordAppends();
return expired;
boolean maybeExpire(long deliveryTimeoutMs, long now) {
if (deliveryTimeoutMs < (now - this.createdMs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should <= be used ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

break;
}
synchronized (dq) {
// iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deliveryTimeoutMs should be mentioned

@apurvam
Copy link
Contributor

apurvam commented Sep 13, 2017

Thanks for the PR @sutambe . Looking over the changes, it seems that there are two cases from the KIP which don't seem to be covered:

  1. Setting the pollTimeout to be the expiryTime of the oldest batch being sent in the produce request. I think we need this to make sure that we expire batches in a timely manner.
  2. Related to the previous point, the current patch doesn't seem to expire inflight requests, which is another feature that the KIP seems to promise.

Have I missed something? Or are you planning on adding the functionality above?

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. Left some comments.

/** <code>delivery.timeout.ms</code> */
public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";
private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on the time to report success or failure to send a message. "
+ "The time is measured from the point when send returns. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems better to say Producer.send() instead of send.

@@ -76,13 +76,13 @@
private String expiryErrorMessage;
private boolean retry;

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
this(tp, recordsBuilder, now, false);
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are passing now everywhere else. Maybe we can just keep the argument name the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual argument is now. However, I like the formal argument name to be createTime because it's an immutable value while constructing a batch. now, is by definition, changing.

@@ -232,6 +238,7 @@
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.LONG, 120 * 1000, atLeast(0L), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate the delivery.timeout.ms is greater than request.timeout.ms?

@@ -164,25 +164,29 @@ public void abort(RuntimeException exception) {
* @param exception The exception that occurred (or null if the request was successful)
*/
public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
final FinalState finalState;
if (exception == null) {
final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably cleaner to have an explicit EXPIRED state.

Copy link
Contributor Author

@sutambe sutambe Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some digging around. An expired batch's final state is FAILED. I don't feel great about adding yet another finalState. We already have ABORTED and FAILED. The ProducerBatch.done will get even more complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not a big deal but just want to call out that this is a behavior change. Currently the producer will throw exception when transition from FAILED state to another state due to some reason other than expiration. If we change this logic, we may miss those cases which are not failed by expiration but still got state update twice. It may not be that important if we do not have programming bugs.

Personally I think it is better to clearly define the states of the batches even if additional complexity is necessary.

The comments should probably also cover the force close case for completeness.

if (this.finalState.get() == FinalState.ABORTED) {
log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);
return;
if (!this.finalState.compareAndSet(null, tryFinalState)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here probably needs more comments. We may have the following three cases that the state of a batch has been updated before the ProduceResponse returns:

  1. A transaction abortion happens. The state of the batches would have been updated to ABORTED.
  2. The producer is closed forcefully. The state of the batches would have been updated to ABORTED.
  3. The batch is expired when it is in-flight. The state of the batch would have been updated to EXPIRED.

In the other cases, we should throw IllegalStateException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review the updated method documentation.

// Stop at the first batch that has not expired.
break;
}
synchronized (dq) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batches still needs to be expired in order if max.in.flight.requests.per.connection is set to 1. So we probably still want to check if the partition is muted or not. That said, if we guarantee that when RecordAccumulator.expiredBatches() returns non-empty list, all the earlier batches have already been expired, we can remove the muted check here.

BTW, I did not see the logic of expiring an in-flight batch in the current patch. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin Added the muted check back

Iterator<ProducerBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
ProducerBatch batch = batchIterator.next();
boolean isFull = batch != lastBatch || batch.isFull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFull is no longer used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not used.

@apurvam
Copy link
Contributor

apurvam commented Sep 14, 2017

Heads up @sutambe , the following PR just got merged, and may have some conflicts with the current patch : #3743

There shouldn't be any impact on the logic, however.

@ijuma
Copy link
Contributor

ijuma commented Sep 18, 2017

Friendly reminder that the feature freeze is this Wednesday.

@becketqin
Copy link
Contributor

@ijuma Just want to check. Do you think this feature is a "minor" feature?

@ijuma
Copy link
Contributor

ijuma commented Sep 19, 2017

@becketqin, it is possible to classify this as a minor feature, but the fact that it affects a core part of the Producer puts it in a bit of a grey area. If the PR is almost ready and we miss the feature freeze, my take is that it would be OK to merge it by the end of this week. Later than that and it seems a bit risky.

It's a bit worrying that the merge conflicts haven't been resolved since last week.

@sutambe
Copy link
Contributor Author

sutambe commented Sep 19, 2017

@ijuma @becketqin I've an new PR but after a rebase I've to fix one more test. Working on that now.

@ijuma
Copy link
Contributor

ijuma commented Sep 19, 2017

Thanks @sutambe!

@sutambe
Copy link
Contributor Author

sutambe commented Sep 19, 2017

@apurvam It's not clear to me why testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail and testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed are failing. It looks like a batch that should be in IncompleteBatches isn't there. Any thoughts?

@@ -271,7 +276,8 @@ private long sendProducerData(long now) {
}
}

List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.deliveryTimeoutMs, now);
boolean needsTransactionStateReset = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable can be dropped.

@tedyu
Copy link
Contributor

tedyu commented Sep 19, 2017

I added the following to testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail before the first sender.run() call

        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
            new Metrics(), new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, DELIVERY_TIMEOUT, 50, transactionManager, apiVersions);

The test still fails.

@apurvam
Copy link
Contributor

apurvam commented Sep 19, 2017

@sutambe where are those tests failing? The latest PR builder suggests that the clients and core tests all passed.

@sutambe
Copy link
Contributor Author

sutambe commented Sep 19, 2017

@apurvam @ijuma @becketqin The Sender and RecordAccumulator are passing now. The failing tests are connect tests that are irrelevant.

@apurvam
Copy link
Contributor

apurvam commented Sep 19, 2017

@sutambe I don't think the test failures are irrelevant since the same 3 tests failed in all the runs. Further, the cause of the failure is:

java.lang.AssertionError: 
  Unexpected method call Listener.onFailure(job-0, org.apache.kafka.common.KafkaException: Failed to construct kafka producer):

I think their mocks may need to be updated to take account of the new configs and attendant ConfigExceptions

List<ProducerBatch> expiredBatches = new ArrayList<>();

// Expiration of inflight batches happens here in the order of createdMs if the final state of the
// batch in not known by (batch's creation time + deliveryTimeoutMs).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in not -> is not

Map.Entry<Long, List<ProducerBatch>> expiredInflightBatches = inFlightBatches.pollFirstEntry();
for (ProducerBatch batch: expiredInflightBatches.getValue()) {
if (batch.getFinalState() == null &&
batch.maybeExpire(deliveryTimeoutMs, now)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check 'if (deliveryTimeoutMs <= (now - this.createdMs))' inside maybeExpire() would be true.
Looks like another method can be created inside ProducerBatch which expires the batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybeExpire has a side-effect of setting errorMessage internally. Hence calling it again in if.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand.
That part can be refactored - goal is to reduce unnecessary comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam Those test don't even compile or run on my machine. What's up with those tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can't construct a kafka producer with the changes made in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming nFlightBatches is a TreeSet suggested above, this code can be simplified to:

        while (!inFlightBatches.isEmpty() &&
               inFlightBatches.first().maybeExpire(deliveryTimeoutMs, now)) {
            expiredBatches.add(inFlightBatches.pollFirst());
        }

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sutambe Thanks for updating the patch. Made a pass on the non-test file and left some comments. Will review the tests tomorrow. We may need to have some quick turnaround to get this into 1.0.0.

} else {
// A SUCCESSFUL batch must not succeed again.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment accurate? The new state is not necessarily SUCCEEDED.

@@ -164,25 +164,29 @@ public void abort(RuntimeException exception) {
* @param exception The exception that occurred (or null if the request was successful)
*/
public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
final FinalState finalState;
if (exception == null) {
final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not a big deal but just want to call out that this is a behavior change. Currently the producer will throw exception when transition from FAILED state to another state due to some reason other than expiration. If we change this logic, we may miss those cases which are not failed by expiration but still got state update twice. It may not be that important if we do not have programming bugs.

Personally I think it is better to clearly define the states of the batches even if additional complexity is necessary.

The comments should probably also cover the force close case for completeness.

* <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached).
* <li> the batch is in retry AND request timeout has elapsed after the backoff period ended.
* </ol>
* Expire the batch if it no outcome is known in within delivery.timeout.ms.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some typos in this comments. "Expire the batch if no outcome is known within delivery.timeout.ms"

@@ -85,6 +90,10 @@
private int drainIndex;
private final TransactionManager transactionManager;

// A mapping of batch's creation time to the batch for quick of the oldest batch
// and lookup in the order of creation
private final NavigableMap<Long, List<ProducerBatch>> inFlightBatches;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be a per partition Map? Intuitively we just need a TreeSet<ProducerBatch> with a comparator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently the my understanding of TreeSet is not accurate. It uses the comparator to decide whether the entries are the same or not. We can use a TreeMap<Long, Set> then. We may also want to bucket the timestamp a little bit, say one second to avoid huge amount of Sets created for each ms in the TreeMap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this too. Using millisecond as unit for Map key is not prudent.

After the switch to second as unit, we may need to check the two adjacent buckets keyed by ts-1 (sec) and ts+1 (sec).

Copy link
Contributor Author

@sutambe sutambe Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, TreeSet does not cut it. The naming is consistent. A TreeSet is a set. It's just that equality criterion is different.

Map.Entry<Long, List<ProducerBatch>> expiredInflightBatches = inFlightBatches.pollFirstEntry();
for (ProducerBatch batch: expiredInflightBatches.getValue()) {
if (batch.getFinalState() == null &&
batch.maybeExpire(deliveryTimeoutMs, now)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming nFlightBatches is a TreeSet suggested above, this code can be simplified to:

        while (!inFlightBatches.isEmpty() &&
               inFlightBatches.first().maybeExpire(deliveryTimeoutMs, now)) {
            expiredBatches.add(inFlightBatches.pollFirst());
        }

*/
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);
List<ProducerBatch> inflight = inFlightBatches.get(batch.createdMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic would become inFlightRequests.remove(batch) when a TreeSet is used for this.

@@ -572,6 +616,16 @@ public boolean hasUndrained() {
batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);
// Put this batch in the list of inflight batches because we might have
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be just inFlightBatches.add(batch)

@@ -586,6 +640,10 @@ public boolean hasUndrained() {
return batches;
}

public Long getEarliestDeliveryTimeoutMs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually just use earliestDeliveryTimeout in Kafka.

@@ -132,6 +135,7 @@ public Sender(LogContext logContext,
SenderMetricsRegistry metricsRegistry,
Time time,
int requestTimeout,
long deliveryTimeoutMs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need the deliveryTimeoutMs in the sender. It is only used as an argument passed to the accumulator. But the accumulator already has the config.

@@ -271,7 +276,7 @@ private long sendProducerData(long now) {
}
}

List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.deliveryTimeoutMs, now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems an existing issue. When we expire the batches here. The memory of those batches will be deallocated. It seems that we will deallocate the same batch again when the ProduceResponse returns?

@apurvam
Copy link
Contributor

apurvam commented Sep 21, 2017

@sutambe I had a look at the failing Sender expiry tests. What is happening is that the tests are not modified to account for the fact that the inflight batches can be expired. In the tests, we used to expire a batch sitting in the accumulator but not the inflight batch. When the inflight batch returned, it would be re queued.

But now, the test sends the response for the inflight batch, but when it goes to requeue, it discovers that there shouldn't be an inflight request an raises an exception.

The tests should be updated to account for the new behavior and make sure that the inflight batch is not expired.

@apurvam
Copy link
Contributor

apurvam commented Sep 21, 2017

Actually, the test reveals a bug in the current patch: the response for the inflight batch which expired is not being handled correctly. We should not be trying to requeue it to start with.

So we need two tests: one where the inflight batch is not expired, and the current case. The reenqueue logic in the sender needs to be updated to not reenqueue the expired batches.

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sutambe Thanks for updating the batch. A few comments:

  1. for a batch that is got expired prematurely, we should not reqenqueu the batch. (as @apurvam noticed) and we should not double deallocate the memory.
  2. There are a few review comments before that are not addressed yet. (such as unused local variables)
  3. We may want to revisit some of the tests and see if they still make sense.
  4. It would be good to add more unit tests to the patch. More specifically, we may want to have the following tests:
  • Test a batch is correctly inserted into the in.flight.batches if needed. And not inserted if not needed.
  • Test the callback of an expired batch is fired in time when it is in-flight/not in-flight
  • Test when expire an in-flight batch, we still wait for the request to finish before sending the next batch.
  • Test we are not going to retry an already expired batch.
  • Test when batch is expired prematurely, the buffer pool is only deallocated after the response is returned. (because we are still holding the batch until the response is returned)

* time is interpreted correctly as not expired when the linger time is larger than the difference
* between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
* A {@link ProducerBatch} configured using a timestamp preceding its create time is interpreted correctly
* as not expired by {@link ProducerBatch#maybeExpire(long, long)}.
*/
@Test
public void testLargeLingerOldNowExpire() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test has nothing to do with linger.ms anymore...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change the test name to something like testBatchExpiration. and the test below to testBatchExpirationAfterReenqueue.

* preceding its create time is interpreted correctly as not expired when the retryBackoff time is larger than the
* difference between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
* A {@link ProducerBatch} configured using a timestamp preceding its create time is interpreted correctly
* as not expired by {@link ProducerBatch#maybeExpire(long, long)}.
*/
@Test
public void testLargeRetryBackoffOldNowExpire() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above we should rename this.

long lingerMs = Integer.MAX_VALUE / 16;
long retryBackoffMs = Integer.MAX_VALUE / 8;
int requestTimeoutMs = Integer.MAX_VALUE / 4;
long deliveryTimoutMs = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typo is still there.

long lingerMs = 3000L;
int requestTimeout = 60;
long deliveryTimoutMs = 3200L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: timeout

// If the final state of the batch is known (success or failed) just skip past it because
// the callback has already been invoked. This allows the clean up of the map in bulk right here.
if (batch.finalState() == null) {
batch.expire(now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still expire the batches when they are expired instead of expiring all the bucket? Having a second granularity bucket does not prevent us from doing that, right?

@sutambe
Copy link
Contributor Author

sutambe commented Dec 20, 2017

@apurvam @becketqin I updated the implementation to use ConcurrentMap<TopicPartition, Deque<ProducerBatch>>. Please take a look. I don't see the above test failures on my machine.

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the patch. Left some comments.

Iterator<ProducerBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
ProducerBatch batch = batchIterator.next();
boolean isFull = batch != lastBatch || batch.isFull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not used.

@@ -85,12 +89,21 @@
private int drainIndex;
private final TransactionManager transactionManager;

// A per-partition queue of batches ordered by creation time for quick access of the oldest batch
private final ConcurrentMap<TopicPartition, PriorityQueue<ProducerBatch>> soonToExpireInFlightBatches;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a PriorityQueue for this because the batches in the RecordAccumulator is already in order. So we just need to keep the draining order.

private void maybeRemoveFromSoonToExpire(ProducerBatch batch) {
PriorityQueue<ProducerBatch> soonToExpireQueue = soonToExpireInFlightBatches.get(batch.topicPartition);
if (soonToExpireQueue != null) {
soonToExpireQueue.remove(batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we always insert the batch to the inFlightBatches queue and there is no bug, the batch to be removed should always be the first batch. Can we assert on that?

// If the expiry is farther than requestTimeoutMs, we don't have to keep
// track of this batch because it will either succeed or fail (due to
// request timeout) much sooner.
if (batch.soonToExpire(deliveryTimeoutMs, now, requestTimeoutMs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original reason we have this optimization is because we used to have a big sorted data structure. So avoiding inserting elements to it makes sense. Given that now the batch order in the RecordAccumulator is already guaranteed. It seems we can just put all the drained batches to the inFlightBatches queue, which is simpler.

// We assume that the batch at the front of the deque will always be the next to expire.
// This may not be true if max.in.flight.requests.per.connection > 1 and retries happen.
// Watch for overflow in createdMs + deliveryTimeoutMs when deliveryTimeoutMs is Long.MAX_VALUE
nextBatchExpiryTimeMs = (first.createdMs + deliveryTimeoutMs < 0) ? nextBatchExpiryTimeMs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The while loop may break if the request size has reached. So there is no guarantee that it will iterate over all the partitions. One alternative is to find the nextBatchExpiryTimeMs in the expireBatches.

void maybeUpdateNextExpiryTime(long now) {
if (now >= nextBatchExpiryTimeMs) {
if (soonToExpireInFlightBatches.isEmpty()) {
nextBatchExpiryTimeMs = Long.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems intuitively this should be the earliest batch in the entire record accumulator?

@@ -277,7 +277,7 @@ private long sendProducerData(long now) {
}
}

List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we may release the memory for the expired batches before the response is returned. This means the underneath ByteBuffer is still referred by the ProducerBatch instance in the inFlightRequests. I am not sure if this would cause any problem, but it seems a little dangerous.

sender.run(time.milliseconds()); // send request
assertEquals(1, client.inFlightRequestCount());

Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the response preparation needed in this case?

@apurvam
Copy link
Contributor

apurvam commented Jan 16, 2018

retest this please

1 similar comment
@apurvam
Copy link
Contributor

apurvam commented Jan 17, 2018

retest this please

@apurvam
Copy link
Contributor

apurvam commented Jan 17, 2018

So the org.apache.kafka.clients.producer.internals.SenderTest.testMetadataTopicExpiry test has failed twice in a row with:

java.lang.ArrayIndexOutOfBoundsException
	at java.base/java.util.zip.CRC32C.update(CRC32C.java:151)
	at org.apache.kafka.common.utils.Checksums.update(Checksums.java:42)
	at org.apache.kafka.common.utils.Crc32C.compute(Crc32C.java:72)
	at org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(DefaultRecordBatch.java:468)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.writeDefaultBatchHeader(MemoryRecordsBuilder.java:357)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:311)
	at org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:427)
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:614)
	at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
	at org.apache.kafka.clients.producer.internals.SenderTest.testMetadataTopicExpiry(SenderTest.java:473)

Given that these changes are on the same code, and given the consistent failure of this test, it is probably a regression. @sutambe can you reproduce the failure locally?

@apurvam
Copy link
Contributor

apurvam commented Jan 17, 2018

Just looking at the stack trace and the test, it may be that an expired batch is being closed twice in some cases.

@hachikuji
Copy link
Contributor

@sutambe @becketqin It would be nice to unblock this. Can someone else pick up the work?

@becketqin
Copy link
Contributor

@hachikuji Yeah, this has been pending for too long. I have spoken to @sutambe and he said he still wants to finish the patch. He will figure out the ETA and see if that works.

@bbejeck
Copy link
Contributor

bbejeck commented May 1, 2018

@sutambe @becketqin is there any update on the status of this PR? It would be great if we could get this in the next release.

@Ishiihara
Copy link
Contributor

@guozhangwang @junrao @bbejeck @becketqin We also hit this issue when running Kafka Streams library with some high volume output topics. It would be nice to get this moving and push it to the next release.

@radai-rosenblatt
Copy link
Contributor

becket cant load this page for some reason (some weird issue with his github profile?).
we are ok with you taking over this patch.

@sutambe
Copy link
Contributor Author

sutambe commented May 9, 2018

@bbejeck @apurvam @becketqin @hachikuji I don't have any update since Dec last year. Sorry, the work has stalled and it has been very hard to find cycles for this effort. I don't mind if Confluent wants to take this effort forward. Better later than never.

Avoiding overflow when deliveryTimeoutMs is MAX_VALUE
per-partition map for tracking soon to expire batches
Updated tests
@Ishiihara
Copy link
Contributor

cc @abbccdda @yuyang08

@yuyang08
Copy link
Contributor

yuyang08 commented May 31, 2018

@sutambe i made some change based on your pull request to fix style check and test failure. do yo mind I amend the change to this pull request? cc @becketqin @apurvam @hachikuji
https://github.com/yuyang08/kafka/commit/69fc79a91d0556408c8037649f1e03aa56206ef2

@guozhangwang
Copy link
Contributor

@yuyang08 I'd suggest you create your own PR against apache kafka trunk and let other reviewers to continue reviewing that one.

@yuyang08
Copy link
Contributor

yuyang08 commented Jun 1, 2018

@guozhangwang sure. will create a separate pull request

@yuyang08
Copy link
Contributor

@guozhangwang @apurvam @becketqin created new pr #5270 for KAFKA-5886

@ijuma
Copy link
Contributor

ijuma commented Feb 18, 2019

This has been merged via a different PR, closing.

@ijuma ijuma closed this Feb 18, 2019
ijuma added a commit to confluentinc/kafka that referenced this pull request Jun 5, 2021
…ache#3849)

This issue has been there for multiple years.

Also adjust the logging to only include overridden topic configs, I
_think_ this behavior changed unintentionally as part of the kraft work
(and made the original issue worse).

Unit test included and also tested manually.

Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
YiDing-Duke pushed a commit to YiDing-Duke/kafka that referenced this pull request Jun 7, 2021
…ache#3849)

This issue has been there for multiple years.

Also adjust the logging to only include overridden topic configs, I
_think_ this behavior changed unintentionally as part of the kraft work
(and made the original issue worse).

Unit test included and also tested manually.

Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
YiDing-Duke pushed a commit to confluentinc/kafka that referenced this pull request Jun 7, 2021
…ache#3849)

This issue has been there for multiple years.

Also adjust the logging to only include overridden topic configs, I
_think_ this behavior changed unintentionally as part of the kraft work
(and made the original issue worse).

Unit test included and also tested manually.

Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
YiDing-Duke pushed a commit to YiDing-Duke/kafka that referenced this pull request Jun 7, 2021
…ache#3849)

This issue has been there for multiple years.

Also adjust the logging to only include overridden topic configs, I
_think_ this behavior changed unintentionally as part of the kraft work
(and made the original issue worse).

Unit test included and also tested manually.

Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
YiDing-Duke pushed a commit to confluentinc/kafka that referenced this pull request Jun 7, 2021
…ache#3849)

This issue has been there for multiple years.

Also adjust the logging to only include overridden topic configs, I
_think_ this behavior changed unintentionally as part of the kraft work
(and made the original issue worse).

Unit test included and also tested manually.

Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
YiDing-Duke pushed a commit to confluentinc/kafka that referenced this pull request Jun 7, 2021
…ache#3849)

This issue has been there for multiple years.

Also adjust the logging to only include overridden topic configs, I
_think_ this behavior changed unintentionally as part of the kraft work
(and made the original issue worse).

Unit test included and also tested manually.

Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
YiDing-Duke pushed a commit to YiDing-Duke/kafka that referenced this pull request Jun 8, 2021
…ache#3849)

This issue has been there for multiple years.

Also adjust the logging to only include overridden topic configs, I
_think_ this behavior changed unintentionally as part of the kraft work
(and made the original issue worse).

Unit test included and also tested manually.

Reviewer: Alok Nikhil <anikhil@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet