Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1 #3743

Conversation

@apurvam
Copy link
Contributor

apurvam commented Aug 25, 2017

Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved:

  1. Sequence number management on the client when there are request failures. When a batch fails, future inflight batches will also fail with OutOfOrderSequenceException. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back.
  2. On the broker, when we have multiple inflights, we can get duplicates of multiple old batches. With this patch, we retain the record metadata for 5 older batches.

I have added TODO(reviewers) comments for specific decisions in the code which are worth discussing.

TODO:

  1. Add more unit tests, especially for loading different snapshot versions correctly, more client side unit tests, more broker side tests to validate that we are caching the correct number of batches (some of this is already there).
  2. Update the system tests to check for ordering.
  3. Run a tight loop of system tests.
  4. Add comments about the assumptions made around the network client semantics of send/receive.
  5. Gracefully handle changes in the producer id. In particular, we should never change the producer id once a batch has been sent, only the sequence number. This protects against duplicates and is a generalization of the previous approach to not reset producer state on retry.
@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Aug 25, 2017

core/src/test/resources/log4j.properties Outdated
@@ -19,7 +19,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR

This comment has been minimized.

Copy link
@ijuma

ijuma Aug 27, 2017

Contributor

Probably unintentional.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 8, 2017

Author Contributor

Yes. I will fix the log4j.properties as we near the end of the review process.

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Aug 29, 2017

Here are my todos for the patch, to help reviewers.

1. Snapshot load unit tests.
2. ProducerState tests to ensure that the correct number of batches are retained when truncation happens at head. 
3. Log tests to ensure that we handle duplicates of batches older than the last appended batch.
4. Review client tests. Do we need to add anything around batch expiry? We probably should add assertions around resetting the batch metadata on retry if we don’t do it already.
5. How does KIP-91 discussion impact this? We probably should retain the producer id and only set a new sequence if the producer id hasn’t changed. This is a generalization of the previous `isRetry` check, where we don’t change the producer id of a batch once assigned.
6. Need to upgrade the system test to enforce ordering.
Copy link
Contributor

hachikuji left a comment

Thanks for the patch. Left a few comments. Note that I left out comments related to expiration of batches when their ultimate outcome is unknown since we discussed this in detail offline.

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated
private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer. Otherwise we cannot guarantee idempotence.");

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

Instead of saying we can't guarantee idempotence, maybe we can just say that idempotence is only supported for in-flight requests greater than 5. This makes it sound like it's a correctness concern, but it's not really.

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
@@ -235,7 +235,7 @@ public RecordAppendResult append(TopicPartition tp,
}
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
private MemoryRecordsBuilder recordsBuilder(TopicPartition topicPartition, ByteBuffer buffer, byte maxUsableMagic) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

nit: seems we don't use topicPartition.

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

nit: might help if the log message metions that this is the base sequence.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
@@ -68,6 +68,7 @@
public final String logPrefix;

private final Map<TopicPartition, Integer> sequenceNumbers;
private final Map<TopicPartition, Integer> lastAckedSequence;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

nit: would be nice to have consistent naming (e.g. sequences and lastAckedSequences). Also, a comment which clarifies what purpose each serves (especially for the former).

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
@@ -472,17 +473,25 @@ public boolean hasUndrained() {
}

ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.inRetry()) {
if (producerIdAndEpoch != null) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

Seems we need to update the comment below with this check gone.

core/src/main/scala/kafka/log/ProducerStateManager.scala Outdated
}

if (duplicate.size > 1)
throw new IllegalStateException(s"Found ${duplicate.size} duplicate batches in the cached producer metadata " +

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

Not really sure this is worth checking here. Shouldn't we have ensured that duplicates weren't added to batchMetadata in the first place.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 8, 2017

Author Contributor

Yes. This definitely seems spurious. I was debating not including this check, and your comment confirms that I should remove it.

core/src/main/scala/kafka/log/ProducerStateManager.scala Outdated
s"for producerId: $producerId and producerEpoch: $producerEpoch. This means that it's likely there are " +
s"duplicates in the log as well.")

if (duplicate.size == 1)

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

You can replace this with duplicate.headOption.

core/src/main/scala/kafka/log/ProducerStateManager.scala Outdated
// of size 1 which uses the last appended sequence number.
this.firstSeq = this.lastSeq
}
// TODO(reviewers): The semantics of the ProducerIdEntry have changed so that now explicitly caches the metadata

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

Hmm.. It's not clear that you're handling the case that the marker has an epoch bump below. The old sequence numbers will not be valid for the new epoch, so we should clear them as the comment below says?

As for the last offset being potentially out of sync, I'm not sure it's a problem, but it makes me a tad uncomfortable. We use this to determine when a producer should be expired (due to retention enforcement), but it may be OK if we expire a producerId when the only record left is the marker. However, we might need to be a little careful when loading the producer state from the log to make sure that producer expiration is determined consistently. Maybe we should at least rename the lastOffset method to lastDataOffset or something like that to emphasize that it does not take control records into account.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 9, 2017

Author Contributor

Actually, the case where the control batch has a new epoch is being handled in currentEntry.maybeUpdateEpoch. If the epoch changes we update the epoch in the ProducerIdEntry and clear all the batch metadata, which resets all the sequence numbers producer id.

I am not sure I follow your point about producer id expiration. We expire producer ids by time. However, when we are deleting segments whose retention period is elapsed, we use offsets to determine whether the producer snapshot should go. This could mean that the producer id is removed even though a control message is left. This should be fine, because that message is orphaned, and will be deleted eventually once it violates retention. I don't think this is a correctness issue at all.

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Yes, I said it made me uncomfortable, not that it was necessarily a problem. I would be fine renaming the method to lastDataOffset as suggested above so that it's clear that it is not intended to include control records.

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 13, 2017

Contributor

We can remove the TODO now?

core/src/main/scala/kafka/tools/DumpLogSegments.scala Outdated
@@ -154,10 +154,10 @@ object DumpLogSegments {

private def dumpPidSnapshot(file: File): Unit = {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

nit: rename to dumpProducerIdSnapshot.

core/src/main/scala/kafka/log/ProducerStateManager.scala Outdated
new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"),
new Field(CoordinatorEpochField, Type.INT32, "The epoch of the last transaction coordinator to send an end transaction marker"),
new Field(CurrentTxnFirstOffsetField, Type.INT64, "The first offset of the on-going transaction (-1 if there is none)"),
new Field(RecordBatchMetadataField, new ArrayOf(RecordBatchMetadataSchema), "The record metadata for up to the last 5 batches appended by the producer")

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 29, 2017

Contributor

Related to my previous question, but I'm wondering how likely is it for us to hit a case where we need the additional entries after recovering the log. We would have to receive the produce requests, successfully write them to the log, and await enough time for a new snapshot to be written (which happens after rolling a segment and upon shutdown). All of that before the producer receives the responses. The cost of having this seems minor, but it's still useful to understand how much extra resilience it actually affords.

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Aug 30, 2017

I created a short writeup about these changes. It goes over the assumptions made, the basic solution, and also some follow up work that needs to be done to handle corner cases:

https://docs.google.com/document/d/1EBt5rDfsvpK6mAPOOWjxa9vY0hJ0s9Jx9Wpwciy0aVo/edit?usp=sharing

@apurvam apurvam force-pushed the apurvam:KAFKA-5494-increase-max-in-flight-for-idempotent-producer branch Sep 6, 2017
@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Sep 6, 2017

Latest updates (copied from my commit message):

  1. Batch expiry on the producer. In this case, we need mute the
    partition and make sure all the inflight requests are fully resolved. If
    they are successful, then all is well. If they are also expired or fail
    due to OutOfOrderSequence, we need to reset the producer id.

  2. We now deal correctly with out of order responses to in flight
    requests which may happen due to leadership changes. We keep track of
    all inflight batches, ordered by sequence. Whenever there is an error,
    we reduce to one inflight request by ensuring only the next batch
    by sequence can be drained. When we requeue a batch, we check to make
    sure that the queue remains in sequence order. When a batch completes,
    it is removed from the in flight list, and the next batch by sequence
    changes.

Some other changes:

  1. We never change sequence numbers unless we are certain that a batch
    failed fatally on the broker (got a fatal error code), and future
    batches retured OutOfOrderSequence. Before the latest commit, we would
    always reset sequence numbers on retry, and have other mechanisms to
    ensure that a given batch got the same sequence as before on the next
    drain. This is now much tighter, and needs to be in order to correctly
    handle the out of order response problem.

Todo:

Add unit tests for out of order responses.
Add unit tests new cases around batch expiry (all batches expired, all
batches succeeded, future batches failed with OutOfSequence).

Copy link
Contributor

hachikuji left a comment

Thanks for the updates. Mostly finished with the changes on the client side.

clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java Outdated
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch());
for (ProducerBatch newBatch : batches) {
newBatch.setProducerState(producerIdAndEpoch, sequence, isTransactional());
newBatch.retry = true;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

Alternatively, we could set retry to be true in the constructor when isSplitBatch is enabled? Or is there a reason we only want to set this when idempotence is in use?

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Would this suggestion work?

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 13, 2017

Contributor

Discussed offline, but after looking again, it seems we no longer need this at all.

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
}
}
return numSplitBatches;
}

private void maybeEnsureQueueIsOrdered(Deque<ProducerBatch> deque, ProducerBatch batch) {
if (transactionManager != null) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

nit: only two uses of this and one of them already has a check for transactionManager being null. Maybe we could change this to ensureQueueIsOrdered and add a null check for the other case.

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " +
"position {}" + incomingBatch.baseSequence(), incomingBatch.topicPartition, orderedBatches.size());

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

Inadvertent string append?

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
}
}
return numSplitBatches;
}

private void maybeEnsureQueueIsOrdered(Deque<ProducerBatch> deque, ProducerBatch batch) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

It might be more natural to add an insertInOrder function (which executes the logic below modulo the first pollFirst) rather than inserting out of order and fixing up later.

Another thing I was wondering is whether a deque is still the right data structure given that we now require ordering. Would a priority queue give us everything we need?

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 12, 2017

Author Contributor

I thought about this, and the deque still seems like the right one for performance reasons. We use the priority queue just for in flight batches for starters, so it is smaller. Also, we use it for a variety of purposes (like reducing to 1 in flight when there are retries). And we only need to re order the deque extremely rarely, so it is worth making that operation slightly more inefficient in the interests of keeping the performance for the common case.

Another thing is that I wanted to keep the behavior when idempotence is not enabled just the same, which is another reason I went this route.

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated

ProducerBatch incomingBatch = deque.pollFirst();
List<ProducerBatch> orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < incomingBatch.baseSequence())

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

Maybe worth a comment that we depend on the invariant that assigned sequence numbers for batches on the queue are all from the same producerId.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 11, 2017

Author Contributor

This is a good point. If the producer id changes, that means the transaction manager will stop tracking inflight batches for the previous producer id. So I updated the code to only try reordering if there are actually in flight batches by the current producer id.

Once we have batches by the current producer id, we know that that all batches by the previous producer id are no longer in play, because that is the only way to get out of the unresolved state and start draining new batches.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
@@ -67,7 +67,27 @@

public final String logPrefix;

private final Map<TopicPartition, Integer> sequenceNumbers;
// The base sequence of the next batch bound for a given partition.
private final Map<TopicPartition, Integer> nextSequence;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

High level comment. There is a lot of per-partition state that we're tracking with separate collections. I wonder if this would be easier to manage with a PartitionState object and a single Map<TopicPartition, PartitionState>?

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
if (!batch.hasSequence())
throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

nit: maybe we could add a private static extension of PriorityQueue which sets the comparator?

clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java Outdated
@@ -279,6 +279,10 @@ public void abort() {
aborted = true;
}

public void unsetProducerState() {
builtRecords = null;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

What should the state of the builder be for this? Maybe we should verify at least that it wasn't aborted?

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
(lastAckedSequence.containsKey(topicPartition) && (sequence - lastAckedSequence.get(topicPartition) == 1));
}

synchronized void setNextSequence(TopicPartition topicPartition, int sequence) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

nit: this could be private? Also, there's an unneeded space after synchronized.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
return inflightBatchesBySequence.get(topicPartition).peek();
}

synchronized void stopTrackingBatch(ProducerBatch batch) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 8, 2017

Contributor

nit: maybe a better name is something like addInFlightBatch?

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Sep 9, 2017

One new problem we have to tackle with the new approach to duplicates. When sequnce numbers wrap around, it is possible to detect spurious duplicates. There doesn't seem to be an easy solution for this which is elegant, un less we put some bound on how old a duplicate can be. This also is worth discussing.

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Sep 11, 2017

retest this please

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Sep 11, 2017

@apurvam apurvam force-pushed the apurvam:KAFKA-5494-increase-max-in-flight-for-idempotent-producer branch Sep 11, 2017
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
@@ -310,6 +310,7 @@ public void reenqueue(ProducerBatch batch, long now) {
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
deque.addFirst(batch);
maybeEnsureQueueIsOrdered(deque, batch);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Mentioned previously, but I think it would be more intuitive to insert in order rather than fixing up the collection after the insertion.

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Would like to hear your thoughts on my previous comment as well:

Another thing I was wondering is whether a deque is still the right data structure given that we now require ordering. Would a priority queue give us everything we need?

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Sep 12, 2017

retest this please

clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java Outdated
@@ -279,6 +279,13 @@ public void abort() {
aborted = true;
}

public void unsetProducerState() {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Hmm.. This doesn't actually unset the producer state. Maybe reopen would be a better name?

Copy link
Contributor

hachikuji left a comment

A few more (smaller) comments.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
Integer currentLastAckedSequence = lastAckedSequence.get(topicPartition);
if (currentLastAckedSequence == null)
return -1;
return lastAckedSequence.get(topicPartition);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Can't we return currentLastAckedSequence?

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

I chose to return -1 when there is no last ack'd sequence because it makes the logic elsewhere a bit cleaner, for instance is maybeUpdateLastAckdSequence we don't need a null check with the current setup.

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 13, 2017

Contributor

That is fine, I am just questioning why we don't use the variable currentLastAckedSequence instead of calling lastAckedSequence.get(topicPartition) a second time.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

ack. Will update.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
}

synchronized void removeInFlightBatch(ProducerBatch batch) {
if (inflightBatchesBySequence.containsKey(batch.topicPartition))

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

nit: Of the micro-optimization variety, but I think it's a little nicer pattern to do a get followed by a null check rather than two hash lookups.

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
// id must have changed and the batches being re enqueued are from the old producer id. In this case
// we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,
// or they will succeed.
if (transactionManager.nextBatchBySequence(batch.topicPartition) != null &&

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

nit: annoying that we make two calls to nextBatchBySequence here. Maybe we can use a local variable?

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated


synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
return inflightBatchesBySequence.get(topicPartition).peek();

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Why do we not need the null check here, but we do in removeInFlightBatch? Should this be documented at least?

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

Good catch. Actually, we do need the null check here. The reason it exists in both places is that when we reset producer the producer id, if there is a queued batch in retry from the previous producer id, we would hit an NPE when trying to drain it after the reset. I have fixed it.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
return inflightBatchesBySequence.containsKey(topicPartition) && !inflightBatchesBySequence.get(topicPartition).isEmpty();
}

synchronized boolean partitionHasUnresolvedSequence(TopicPartition topicPartition) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

nit: since the argument is a partition, we could probably drop the partition prefix from the name.

transactionManager.sequenceNumber(batch.topicPartition));
if (transactionManager != null) {
if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
transactionManager.maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

nit: if you use a local variable for lastSequence, then you can also use it in the log message below.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

Actually, the log message should contain the actual lastAckedSequence since the update is conditional: with out of order responses, it is possible that we might move the sequence backward, so the maybeUpadteLastAckedSequence guards against that.

This comment has been minimized.

Copy link
@tedyu

tedyu Sep 15, 2017

Contributor

Pardon.

In the current code base, I don't see maybeUpadteLastAckedSequence.
What's the method called now ?

core/src/main/scala/kafka/log/Log.scala Outdated
// if this is a client produce request, there will be upto 5 batches which could have been duplicated.
// If we find a duplicate, we return the metadata of the appended batch to the client.
if (isFromClient) {
maybeLastEntry match {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Forgot to make this change?

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java Outdated
failBatch(batch, response, exception);
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know the whether the sequence number was accepted or not, and

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

typo: drop "the" before "whether"

core/src/main/scala/kafka/log/ProducerStateManager.scala Outdated
if (this.producerEpoch != producerEpoch) {
batchMetadata.clear()
this.producerEpoch = producerEpoch
return true

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

nit: explicit returns are frowned upon in scala. Maybe we can drop the return and add an else branch? Same for duplicateOf below.

core/src/main/scala/kafka/log/ProducerStateManager.scala Outdated
// of size 1 which uses the last appended sequence number.
this.firstSeq = this.lastSeq
}
// TODO(reviewers): The semantics of the ProducerIdEntry have changed so that now explicitly caches the metadata

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 12, 2017

Contributor

Yes, I said it made me uncomfortable, not that it was necessarily a problem. I would be fine renaming the method to lastDataOffset as suggested above so that it's clear that it is not intended to include control records.

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Sep 13, 2017

retest this please

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Sep 13, 2017

Some tests here are definitely leaking threads. Trying to reproduce by running tests in sequence locally.

Copy link
Contributor Author

apurvam left a comment

@hachikuji I addressed most of your comments.

I also found the cause of the thread leaks: in the new scheme, the in memory batchMetadata only caches data batches. However, if an instance of the ProducerStateManager only got a control record for a producer, we would have a ProducerIdEntry with empty batch metadata. When we go to write a snapshot, the new code would NPE, causing threads to leak in shutdown, and causing other tests to fail.

core/src/main/scala/kafka/log/Log.scala Outdated
// if this is a client produce request, there will be upto 5 batches which could have been duplicated.
// If we find a duplicate, we return the metadata of the appended batch to the client.
if (isFromClient) {
maybeLastEntry match {

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

Actually, I did try your suggestion, but it didn't quite work: for some reason spurious duplicates were being detected for some reason. So I left the old code as is.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated


synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
return inflightBatchesBySequence.get(topicPartition).peek();

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

Good catch. Actually, we do need the null check here. The reason it exists in both places is that when we reset producer the producer id, if there is a queued batch in retry from the previous producer id, we would hit an NPE when trying to drain it after the reset. I have fixed it.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
Integer currentLastAckedSequence = lastAckedSequence.get(topicPartition);
if (currentLastAckedSequence == null)
return -1;
return lastAckedSequence.get(topicPartition);

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

I chose to return -1 when there is no last ack'd sequence because it makes the logic elsewhere a bit cleaner, for instance is maybeUpdateLastAckdSequence we don't need a null check with the current setup.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
}

synchronized boolean isNextSequence(TopicPartition topicPartition, int sequence) {
return (!lastAckedSequence.containsKey(topicPartition) && sequence == 0) ||

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

good suggestion. By returning -1 from lastAckedSequence if there isn't a last acked sequence, we can reduce this to sequence - lastAckedSequence(topicPartition) == 1. Much simpler!

@apurvam

This comment has been minimized.

Copy link
Contributor Author

apurvam commented Sep 13, 2017

However, if an instance of the ProducerStateManager only got a control record for a producer, we would have a ProducerIdEntry with empty batch metadata. When we go to write a snapshot, the new code would NPE, causing threads to leak in shutdown, and causing other tests to fail.

The specific test causing the problem was TransactionsTest.testFencingOnSend. In the current patch, we drop the batch metadata when be bump the epoch because the previous sequence numbers no longer apply and shouldn't be checked. In this test, we write a snapshot immediately after after the fence, which triggers the NPE.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
// The partition has been fully drained. At this point, the last ack'd sequence should be once less than
// next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
// reset the sequence number if necessary.
if (lastAckedSequence(topicPartition) == sequenceNumber(topicPartition) - 1) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 13, 2017

Contributor

I think we could use isNextSequence here.

assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());

// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 13, 2017

Contributor

I think we should be asserting on the result of the send futures in these test cases.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

Just added the assertions for test cases in which they were missing.

clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java Outdated

assertEquals(1, client.inFlightRequestCount());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertNotNull(request1.get());

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 13, 2017

Contributor

Can we assert the offset? This ensures that the requests have not been reordered. Also, the isDone check should probably come first

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 13, 2017

Author Contributor

Hmm. The lastAckdSequence checks already ensure that the responses are coming in the right order. We also check the sequence of the outgoing batch matches what we expect when the requests are sent, which is really what we want to check in these tests.

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java Outdated
// we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,
// or they will succeed.
if (transactionManager.nextBatchBySequence(batch.topicPartition) != null &&
batch.baseSequence() != transactionManager.nextBatchBySequence(batch.topicPartition).baseSequence()) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 14, 2017

Contributor

nit: here we use sequence number equality to tell when two batches match. Below in drain we use equals: https://github.com/apache/kafka/pull/3743/files#diff-e67ae6379c50ac1d9c58cefaf71346ceR540. Unless there is a good reason, maybe we should be consistent?

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 14, 2017

Author Contributor

ack. I prefer to just use sequence number equality. Will update the drain method.

Copy link
Contributor

hachikuji left a comment

A few more comments. Getting close!

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java Outdated
@@ -228,6 +231,12 @@ void run(long now) {

private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();


if (transactionManager != null && transactionManager.shouldResetProducerStateAfterResolvingSequences()) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 14, 2017

Contributor

Seems like there's no longer any reason for this logic to be in sendProducerData. Maybe we may as well move it into run(now) with the rest of the other periodic transaction/idempotent checks?

sender.run(time.milliseconds()); // send second request
sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
assertTrue(request2.isDone());

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 14, 2017

Contributor

Should we assert the failure exception?


assertEquals(2, client.inFlightRequestCount());

sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 14, 2017

Contributor

nit: maybe TIMEOUT_ERROR would be better for this case since NOT_LEADER_FOR_PARTITION means the record definitely wasn't written.

By the way, doesn't this expose a weakness in our error checking? We know for sure that the write never succeeded even though we had to expire it.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 14, 2017

Author Contributor

Yes. I think this was brought up in the KIP-91 discussion as well: we could optimize for error codes which definitively indicate that the batch wasn't written, but we don't do that today.

This means we could get into situations where we reset the producer id where we don't strictly need to. I filed https://issues.apache.org/jira/browse/KAFKA-5897 to track this.

@@ -467,6 +469,583 @@ public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Ex
assertSendFailure(ClusterAuthorizationException.class);
}


@Test
public void testIdempotenceWithMultipleInflights() throws Exception {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Sep 14, 2017

Contributor

I was looking for a test case which exercised the sequence number adjustment logic, but couldn't find one. Specifically a test case which caused the sequence number of a batch to change.

This comment has been minimized.

Copy link
@apurvam

apurvam Sep 14, 2017

Author Contributor

Good catch. There wasn't a test, and there was a bug in that logic. Particularly, when a batch fails fatally, the future in flight batches are guaranteed to fail with an OutOfOrderSequenceException. We retry these exceptions as long as the batch in question is not the next-in-line batch. However, when we reset sequence numbers, the first future batch does become the next-in-line batch, and it is not retried when it fails with --the expected-- OutOfOrderSequenceException.

I have have the test case and the fix for the bug in an upcoming commit.

apurvam added 4 commits Aug 12, 2017
Todo:
  1) Write more unit tests.
  2) Handle deletion / retention / cleaning correctly.
and incremented during drain. If a batch is retried, it's sequence
number is unset during the completion handler. If the first inflight
batch returns an error, the next sequence to assign is reset to the last
ack'd sequence + 1.
apurvam added 20 commits Sep 11, 2017
  1. A solution for detecting duplicates when sequence numbers
wraparound. This is essentially the same as the solutior for detecing
out of order sequences with wraparound, but both are limited.
  2. Correctly handle duplicate sequence exceptions on the client.
…a. This makes accesses from the cleaner threads (log cleaner, producer id expiration, etc.) safe
  1. Fixed a potentioal NPE in ProducerStateManager.writeSnapshot where
we could have no batch metadata in memory (for instance, if only a
control batch was written), and yet write a snapshot. This was causing
tests to fail because an NPE was raised during shutdown, leaking
threads.
  2. Addressed the remaining PR comments.
If a batch has a sequence, but there are no recorded inflight requests
to that partition, it means that the producer id has changed, and the
batch under consideration was from a previous producerId. It should
always be drained (if it returns again, it will fail fatally).

The code before this patch would ensure that it would _never_ drain.
…ust the lastSeq per producer to the LogCleaner, hence simpliying the concurrency model
One notable update: We didn't have a test for reassiging sequence
numbers, and we had a bug in that logic. Essentially a newly reassigned
batch would be fail with an out of order sequence exception because it
would be the next in line batch after its sequence was adjusted.

This is fixed by tracking whether the sequence of a batch has been
adjusted since the last drain, in which case the out of order sequence
exception for it is always retried. If it fails again, it will be fatal.
@apurvam apurvam force-pushed the apurvam:KAFKA-5494-increase-max-in-flight-for-idempotent-producer branch to 2d6b5de Sep 14, 2017
  1. Move the 'reopen' flag from MemoryRecordsBuilder to ProducerBatch
  2. Move the logic to resolve sequence number state to the top of the
run loop.
  3. More canonical scala in Log.scala.
@hachikuji

This comment has been minimized.

I think you forgot to remove this from sendProducerData.

Copy link
Contributor

hachikuji left a comment

LGTM. I'll merge after the builds complete.


// If there are no inflight batches being tracked by the transaction manager, it means that the producer
// id must have changed and the batches being re enqueued are from the old producer id. In this case
// we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,

This comment has been minimized.

Copy link
@tedyu

tedyu Sep 15, 2017

Contributor

OutOfOrderSequence -> OutOfOrderSequenceException

}
}));
}
inflightBatchesBySequence.get(batch.topicPartition).offer(batch);

This comment has been minimized.

Copy link
@tedyu

tedyu Sep 22, 2017

Contributor

Shouldn't we add a check for the number of batches stored in the PriorityQueue ?
What if more than 5 batches are stored ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.