-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5793: Tighten up the semantics of the OutOfOrderSequenceException #3865
KAFKA-5793: Tighten up the semantics of the OutOfOrderSequenceException #3865
Conversation
da658a6
to
08c9ca3
Compare
@@ -62,7 +62,7 @@ public Builder(byte magic, | |||
int timeout, | |||
Map<TopicPartition, MemoryRecords> partitionRecords, | |||
String transactionalId) { | |||
super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? 3 : 2)); | |||
super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? ApiKeys.PRODUCE.latestVersion() : 2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the 3
being hard coded here was a bug. Once the desired version is set, that is always the protocol version used. I think we should always use the latestVersion
if we are using v2 of the message format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense.
a42286b
to
103de6b
Compare
This is ready for review now. cc @hachikuji |
@apurvam, I just merged a refactor from @hachikuji (0cf7708) that moves the protocol definition to the request/response classes so you'll have to rebase this PR. Hopefully it's straightforward. :) |
tests for insertion in order.
2a39853
to
1ec1b5b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comments. Haven't looked at tests yet.
@@ -591,7 +593,7 @@ private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse re | |||
|
|||
private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) { | |||
if (transactionManager != null) { | |||
if (exception instanceof OutOfOrderSequenceException | |||
if ((exception instanceof OutOfOrderSequenceException || exception instanceof UnknownProducerException) | |||
&& !transactionManager.isTransactional() | |||
&& transactionManager.hasProducerId(batch.producerId())) { | |||
log.error("The broker received an out of order sequence number for topic-partition " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need to update this log message to mention the specific failure.
@@ -19,3 +19,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout | |||
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n | |||
|
|||
log4j.logger.org.apache.kafka=ERROR | |||
log4j.logger.org.apache.kafka.clients.producer.internals=TRACE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inadvertent I assume.
@@ -62,7 +62,7 @@ public Builder(byte magic, | |||
int timeout, | |||
Map<TopicPartition, MemoryRecords> partitionRecords, | |||
String transactionalId) { | |||
super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? 3 : 2)); | |||
super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? ApiKeys.PRODUCE.latestVersion() : 2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense.
@@ -524,7 +525,15 @@ public ApiException build(String message) { | |||
public ApiException build(String message) { | |||
return new AuthenticationFailedException(message); | |||
} | |||
}); | |||
}), | |||
UNKNOWN_PRODUCER(59, "The specified producerId does not exist on the broker. This indicates data loss on the broker " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may indicate data loss, but not necessarily, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was that if it is raised to the user, then it definitely indicates data loss. However, based on our offline conversation, we will return the OutOfOrderSequenceException
in place of this one to indicate data loss. So I will change the wording of this comment.
@@ -156,8 +178,11 @@ public ProduceResponse(Struct struct) { | |||
Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); | |||
long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); | |||
long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); | |||
long logStartOffset = INVALID_OFFSET; | |||
if (partRespStruct.hasField(LOG_START_OFFSET_KEY_NAME)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could use the fancy new getOrElse
method in Struct
.
@@ -486,6 +508,20 @@ synchronized void adjustSequencesDueToFailedBatch(ProducerBatch batch) { | |||
} | |||
} | |||
|
|||
private synchronized void startSequencesAtBeginning(TopicPartition topicPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. There are some edge cases where resetting the sequence number may not be safe. In particular, since retention policies are enforced asynchronously on the brokers, it could be possible to see an UnknownProducer exception on one broker and then a leader failover could effectively restore some of producer state. This case is rare, but maybe the safest way to handle it is to reset the producerId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one offshoot of this is that if we reset the producer id here, then the user is certain to hit https://issues.apache.org/jira/browse/KAFKA-5870, which will result in OutOfOrderSequencExceptions
for all in flight requests.
I wonder if that inconvenience is worth it. I am on the fence at this point, but leaning slightly toward leaving the code as is until we fix KAFKA-5870.
|
||
package org.apache.kafka.common.errors; | ||
|
||
public class UnknownProducerException extends ApiException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A javadoc would be helpful.
@@ -188,6 +213,8 @@ protected Struct toStruct(short version) { | |||
.set(BASE_OFFSET_KEY_NAME, part.baseOffset); | |||
if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) | |||
partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); | |||
if (partStruct.hasField(LOG_START_OFFSET_KEY_NAME)) | |||
partStruct.set(LOG_START_OFFSET_KEY_NAME, part.logStartOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use setIfExists
?
// We don't want to return the UnknownProducerException to the user because we already have the | ||
// OutOfOrderSequenceException to indicate data loss, and there is no point having two of them | ||
// indicating the same thing. | ||
exception = Errors.OUT_OF_ORDER_SEQUENCE_NUMBER.exception(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess another option might be to let UnknownProducerException
extend from OutOfOrderSequenceException
. The user would still only need to handle the latter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending a few minor changes discussed offline.
Jason and I were discussing the subclassing vs just changing the exception. Technically speaking, We disambiguate this in the client using the additional metadata, and then only raise the exception to the user if it is still data loss. So the user will only have to handle |
OK, that's fine. Note that you could retain the additional data by passing |
}); | ||
}), | ||
UNKNOWN_PRODUCER(59, "The specified producerId does not exist on the broker. This may indicate data loss on the broker " + | ||
"and should be investigated.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message is more alarming than the exception Javadoc. Either we should make the other one more alarming or this one less. :)
Yes, the mores specific name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more random comments of things I noticed while quickly looking through the PR. Not blockers, can be addressed separately, if necessary.
@@ -518,6 +518,17 @@ class Partition(val topic: String, | |||
info | |||
} | |||
|
|||
def logStartOffset : Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: there should be no space before :
.
return leaderReplica.log.get.logStartOffset | ||
case None => | ||
-1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaderReplicaIfLocal.map(_.log.get.logStartOffset).getOrElse(-1)
is more concise.
@@ -114,9 +114,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { | |||
val correlationId = -1 | |||
TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers) | |||
|
|||
val version = 2: Short | |||
val version = ApiKeys.PRODUCE.latestVersion() : Short |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for ()
and there should be no space before :
.
1. Make the error message for the UnknownProducerIdException less drastic. 2. Rename UnknownProducerException to UnknownProducerIdException
Thanks for addressing the comments. Seem fine to me. And feel free to merge whenever you're ready btw. :) |
@@ -591,12 +593,12 @@ private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse re | |||
|
|||
private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) { | |||
if (transactionManager != null) { | |||
if (exception instanceof OutOfOrderSequenceException | |||
if ((exception instanceof OutOfOrderSequenceException || exception instanceof UnknownProducerIdException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the || needed ?
UnknownProducerIdException is subclass of OutOfOrderSequenceException
synchronized long lastAckedOffset(TopicPartition topicPartition) { | ||
Long offset = lastAckedOffset.get(topicPartition); | ||
if (offset == null) | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: using ProduceResponse.INVALID_OFFSET is better
if (response.baseOffset == ProduceResponse.INVALID_OFFSET) | ||
return; | ||
long lastOffset = response.baseOffset + batch.recordCount - 1; | ||
if (lastOffset > lastAckedOffset(batch.topicPartition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should log be added for the else case ?
@tedyu The PR has been merged. Feel free to submit another PR if you see room for improvement. |
return false; | ||
|
||
Errors error = response.error; | ||
if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && !hasUnresolvedSequence(batch.topicPartition) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't Errors.UNKNOWN_PRODUCER_ID be included here as well ?
Considering the UnknownProducerIdException is a subclass of OutOfOrderSequenceException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, no. This is the part of the code where we want to disambiguate between UNKNOWN_PRODUCER_ID
and OUT_OF_ORDER_SEQUENCE_EXCEPTION
.
Description of the solution can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Exactly+Once+-+Solving+the+problem+of+spurious+OutOfOrderSequence+errors