New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5259: TransactionalId auth implies ProducerId auth #3075
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Btw, you may want to update the |
@ijuma Yes, thanks for reminding me. |
f5823e8
to
5cfedd9
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
5cfedd9
to
185b4b1
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
8b300aa
to
08d4783
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
08d4783
to
030839d
Compare
@ijuma I know you're busy, but perhaps you can help review some of the changes in |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) } | ||
// Any failed partition check causes the entire request to fail. We only send back error responses | ||
// for the partitions that failed to avoid needing to send an ambiguous error code for the partitions | ||
// which succeeded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a deeper look. However, I had one question: would it be clearer if we had an error code for this? Something referring to the fact that the partition failed because the request failed (i.e. the request is transactional, so either all of it succeeds or none succeeds).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I debated this, but I couldn't think of a suitable error. Maybe OPERATION_NOT_ATTEMPTED or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an OPERATION_NOT_ATTEMPTED
for the partitions which were abandoned would be nice to have.
Another way to deal with this is to make it clear that the operation for a specific partition was successfully completed only if the status for that partition is Errors.NONE
. If there is no entry for a partition, then you can assume that the particular operation was not attempted. This is the status quo today, and I think it is sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had forgotten to reply to this. It should not block this PR and if there is no time to change it before the release, it's fine. However, the OPERATION_NOT_ATTEMPTED option seems a little better.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
cc @apurvam |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the client side code and left comments.
@@ -605,7 +606,9 @@ public void abortTransaction() throws ProducerFencedException { | |||
* Implementation of asynchronously send a record to a topic. | |||
*/ | |||
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { | |||
ensureProperTransactionalState(); | |||
if (transactionManager != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be a matter of preference, but I find it easier to do such checks in the callee: it generally creates fewer branches in the top level method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it's subjective. The reason I preferred moving the check here is that it's easier to dismiss the function if you know you are not concerned about the transactional/idempotent cases. Otherwise, you have to descend into it to find that it just returns. My preference is not too strong either way though.
@@ -640,7 +643,7 @@ public void abortTransaction() throws ProducerFencedException { | |||
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); | |||
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); | |||
// producer callback will make sure to call both 'callback' and interceptor callback | |||
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, transactionManager); | |||
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before adding transactions, we only had an interceptor callback if an interceptor was defined. We should probably go back to that pattern now that we have moved the call of transactionManager.setError
to Sender.failBatch
.
|
||
if (transactionManager.isInErrorState() && accumulator.hasUnflushedBatches()) { | ||
log.error("Aborting producer batches due to fatal error", transactionManager.lastError()); | ||
accumulator.abortBatches(transactionManager.lastError()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in this case, we will still call sendProducerData
which will do nothing, and then we will poll again. Would it be better to just return here?
@@ -512,12 +495,10 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons | |||
final RuntimeException exception; | |||
if (error == Errors.TOPIC_AUTHORIZATION_FAILED) | |||
exception = new TopicAuthorizationException(batch.topicPartition.topic()); | |||
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) | |||
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use the Errors.CLUSTER_AUTHORIZATION_FAILED.execption()
here and elsewhere. THis way the error messages can be updated consistently everywhere for the same error type.
I have already started doing this for the ProducerFencedException
in an upcoming patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the point, but the problem is that the message for CLUSTER_AUTHORIZATION_FAILED is very generic since there are a number of cases that this handles. I wanted to be able to give a better message since we know the specific operation that would have failed.
requestHandler.fatal(new KafkaException(errorMessage, lastError)); | ||
else | ||
requestHandler.fatal(new KafkaException(errorMessage)); | ||
private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: In this class, all the private methods come after the public and package-private methods. It would be nice to maintain that consistency.
@@ -596,6 +606,8 @@ public void handleResponse(AbstractResponse response) { | |||
reenqueue(); | |||
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { | |||
reenqueue(); | |||
} else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) { | |||
fatal(new ClusterAuthorizationException("The producer is not authorized to generate a producerId for idempotence")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned before, we should be using the Errors.CLUSTER_AUTHORIZATION_FAILED.exception()
here so that the messages are consistent and can be updated consistently in the future.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji : Thanks for the updated patch. Just one more comment.
// which succeeded. | ||
val partitionErrors = (unauthorizedForWriteRequestInfo.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ | ||
nonExistingOrUnauthorizedForDescribeTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++ | ||
internalTopics.map(_ ->Errors.TOPIC_AUTHORIZATION_FAILED)).toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to just return a response level error instead of a partition level error since the failure always happens to all partitions? We can probably have an error message in the response that describes the specific topics that caused the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might be the simplest way to convey the intended semantic. One minor annoyance is that we wouldn't be able to set the unauthorized topic list in TopicAuthorizationException
(unless we tried to parse the error message). I don't have a strong preference. What do you think @ijuma?
9e024ca
to
8d41288
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
I've opened https://issues.apache.org/jira/browse/KAFKA-5322 to resolve the question of the AddPartitions error codes. I'm going to go ahead and merge this to trunk and 0.11.0. |
Author: Jason Gustafson <jason@confluent.io> Reviewers: Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com> Closes #3075 from hachikuji/KAFKA-5259-FIXED (cherry picked from commit 38f6cae) Signed-off-by: Jason Gustafson <jason@confluent.io>
No description provided.