-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12677: return not_controller error in envelope response itself #10794
Conversation
This is awesome, I also notice that there is no logic about |
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
@hachikuji @mumrah @abbccdda @cmccabe , could you help review this PR? I'll add tests later. Thank you. |
Tests added. Thanks. |
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the problem has been fixed, you can make some small changes to trigger the QA some more times to prove this approach is right.
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
*/ | ||
def sendRequest( | ||
request: AbstractRequest.Builder[_ <: AbstractRequest], | ||
callback: ControllerRequestCompletionHandler | ||
callback: ControllerRequestCompletionHandler, | ||
requestHeader: RequestHeader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a requestHeader
parameter to do envelope response parsing.
Jenkins PR build results proved the
|
@dengziming , thanks for the comments. I've updated. Thanks. |
@hachikuji @mumrah @abbccdda @cmccabe , call for review since the tests keep failing. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@showuon Great finding, thanks for your PR, a trivial suggestion below.
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
@hachikuji @mumrah @abbccdda @cmccabe , please help review when available. Thank you. |
@showuon Thanks for the patch. One detail in the description does not seem right to me.
The client is not necessarily trying to reach the controller. It is sending a request to a broker and the fact that we forward it to a controller is an implementation detail. So the NOT_CONTROLLER error does not make sense for the client. That is why we do not return it. We decided instead that if the controller could not be reached before expiration of the timeout, then we would return REQUEST_TIMED_OUT instead. That aside, it is not very clear to me why we need to parse the envelope response. The intent was to return NOT_CONTROLLER as the error code in the EnvelopeResponse itself. Perhaps there is a race condition in the controller handling where this is not working? Probably that is what we should fix. |
@hachikuji , thanks for your explanation.
That makes sense! I'll look into it and let you know. Thank you. |
@hachikuji , I checked and there's no race condition there. The reason why we didn't put So, to fix this issue, I need to build EnvelopeResponse with That's my solution. Please help review. Thank you very much! |
Throwable throwableToBeEncode = t; | ||
// Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR. | ||
if (t instanceof CompletionException) { | ||
throwableToBeEncode = t.getCause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix 1: unwrap the CompletionException
to get the original exception inside. Even if we don't want the NotControllerException
return back to client, we need to know it to do some check.
@hachikuji , call for review. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@showuon Thanks for the updates. Makes sense overall, but left a couple comments.
// Since it's Not Controller error response, we need to make envelope response with Not Controller error | ||
// to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller | ||
new EnvelopeResponse(new EnvelopeResponseData() | ||
.setErrorCode(Errors.NOT_CONTROLLER.code())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think this makes sense. Basically we were missing the same logic we have in KafkaApis.handleEnvelope
to verify the controller status. Instead, we just send the request into the controller thread. Do I have that right? I guess another way we could do it is to send the full envelope into the controller and let it set the NOT_CONTROLLER error at the right level, but that might take some refactoring and this solution seems reasonable as well.
@@ -36,10 +37,16 @@ | |||
private final String message; | |||
|
|||
public static ApiError fromThrowable(Throwable t) { | |||
Throwable throwableToBeEncode = t; | |||
// Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, a little strange to handle this case explicitly and not others. Would we want to handle ExecutionException
as well for example?
Also, the comment seems to be missing some context. Which future does it refer to? Maybe it would be helpful to point to the specific usage that requires this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionException
-> good point! Updated. Thanks.
@hachikuji , I've updated addressed your comments. Please take a look again. Thank you. |
Failed tests are unrelated. Thanks.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Left a couple minor comments.
@@ -36,10 +38,21 @@ | |||
private final String message; | |||
|
|||
public static ApiError fromThrowable(Throwable t) { | |||
Throwable throwableToBeEncode = t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: throwableToBeEncoded
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
val responseBytes = context.buildResponseEnvelopePayload(abstractResponse) | ||
val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE) | ||
val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) { | ||
// Since it's a Not Controller error response, we need to make envelope response with Not Controller error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test case which fails without this fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we're a little inconsistent in the comments when referring to the NOT_CONTROLLER error. Here we use "Not Controller" while in the tests we use "Not_Controller." I would suggest using "NOT_CONTROLLER" consistently.
@hachikuji , thanks for the reminder. Yes, I forgot to add tests for that area. Added. Thank you. |
@@ -29,7 +29,7 @@ import java.util | |||
import java.util.Collections | |||
import scala.jdk.CollectionConverters._ | |||
|
|||
@Timeout(120000) | |||
@Timeout(120) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I don't think 0.12 seconds is a good timeout for these integration tests. I would be OK cutting the time down to 60 seconds per test, though, rather than 120.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the default unit is seconds, so this is 120s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@showuon Thanks for the updates. Just a few more small comments.
Mockito.doReturn(mockSend).when(channelRequest.envelope.get.context).buildResponseSend(envelopResponseArgumentCaptor.capture()) | ||
|
||
// create an inner response without error | ||
val ResponseWithoutError = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use lower case ResponseWithoutError
} | ||
|
||
@Test | ||
def testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another useful test case is when the inner response has an error which is different from NOT_CONTROLLER. In this case, we expect no envelope error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! Added testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError
@@ -179,6 +184,108 @@ class RequestChannelTest { | |||
assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString)) | |||
} | |||
|
|||
@Test | |||
def testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): Unit = { | |||
val channelRequest = buildRequestWithEnvelope() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's a bit confusing that buildRequestWithEnvelope
creates an alter config request, which we then respond to with a metadata response. Could we let buildRequestWithEnvelope
provide the AbstractRequest
so that we can use a MetadataRequest
? For example, see KafkaApis.buildRequestWithEnvelope
. Probably we could factor out a common method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the buildRequestWithEnvelope
method in TestUtils class to share between KafkaApisTest
and RequestChannelTest
. And also use MetadataRequest
as input. Thank you.
val requestContextSpy: RequestContext = Mockito.spy(envelopeContext) | ||
|
||
RequestHeader.parse(envelopeBuffer) | ||
val EnvelopeBuffer2 = envelopeBuffer.duplicate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lower case envelopeBuffer2
@@ -36,10 +38,21 @@ | |||
private final String message; | |||
|
|||
public static ApiError fromThrowable(Throwable t) { | |||
Throwable throwableToBeEncoded = t; | |||
// Get the original cause from `CompletionException` and `ExecutionException`, otherwise, it will return unexpected UNKNOWN_SERVER_ERROR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can make this a little more concise. How about this?
// Get the underlying cause for common exception types from the concurrent library.
// This is useful to handle cases where exceptions may be raised from a future or a
// completion stage (as might be the case for requests sent to the controller in
// `ControllerApis`)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! Updated.
val responseBytes = context.buildResponseEnvelopePayload(abstractResponse) | ||
val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE) | ||
val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) { | ||
// Since it's a Not Controller error response, we need to make envelope response with Not Controller error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we're a little inconsistent in the comments when referring to the NOT_CONTROLLER error. Here we use "Not Controller" while in the tests we use "Not_Controller." I would suggest using "NOT_CONTROLLER" consistently.
@hachikuji , I've addressed your comments. Please take a look again. Failed tests are unrelated. Thanks.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
…in KRaft mode (#10794) In Kafka Raft mode, the flow sending request from client to controller is like this: 1. client send request to a random controller (ex: A-controller) 2. A-controller will forward the request to active controller (ex: B-controller) to handle the request 3. After active B-controller completed the request, the A-controller will receive the response, and do a check: 3.1. if the response has "disconnected" or "NOT_CONTROLLER" error, which means the cached active controller is changed. So, clear the cached active controller, and wait for next retry to get the updated active controller from `controllerNodeProvider` 3.2. else, complete the request and respond back to client In this bug, we have 2 issues existed: 1. "NOT_CONTROLLER" exception won't be correctly send back to the requester, instead, `UNKNOWN_SERVER_ERROR` will be returned. The reason is the `NotControllerException` is wrapped by a `CompletionException` when the `Future` completeExceptionally. And the `CompletionException` will not match any Errors we defined, so the `UNKNOWN_SERVER_ERROR` will be returned. Even if we don't want the `NotControllerException` return back to client, we need to know it to do some check. fix 1: unwrap the `CompletionException` before encoding the exception to error. 2. Even if we fixed 1st bug, we still haven't fixed this issue. After the 1st bug fixed, the client can successfully get `NotControllerException` now, and keep retrying... until timeout. So, why won't it meet the flow `3.1` mentioned above, since it has `NotControllerException`? The reason is, we wrapped the original request with `EnvelopeRequest` and forwarded to active controller. So, after the active controller completed the request, responded with `NotControllerException`, and then, wrapped into an `EnvelopeResponse` **with no error**, and then send the `EnvelopeResponse` back. That is, in the flow `3.1`, we only got "no error" from `EnvelopeResponse`, not the `NotControllerException` inside. fix 2: Make the envelope response return `NotControllerException` if the controller response has `NotControllerException`. So that we can catch the `NotControllerException` on envelopeResponse to update the active controller. Reviewers: wenbingshen <oliver.shen999@gmail.com>, Ismael Juma <ismael@juma.me.uk>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
…in KRaft mode (apache#10794) In Kafka Raft mode, the flow sending request from client to controller is like this: 1. client send request to a random controller (ex: A-controller) 2. A-controller will forward the request to active controller (ex: B-controller) to handle the request 3. After active B-controller completed the request, the A-controller will receive the response, and do a check: 3.1. if the response has "disconnected" or "NOT_CONTROLLER" error, which means the cached active controller is changed. So, clear the cached active controller, and wait for next retry to get the updated active controller from `controllerNodeProvider` 3.2. else, complete the request and respond back to client In this bug, we have 2 issues existed: 1. "NOT_CONTROLLER" exception won't be correctly send back to the requester, instead, `UNKNOWN_SERVER_ERROR` will be returned. The reason is the `NotControllerException` is wrapped by a `CompletionException` when the `Future` completeExceptionally. And the `CompletionException` will not match any Errors we defined, so the `UNKNOWN_SERVER_ERROR` will be returned. Even if we don't want the `NotControllerException` return back to client, we need to know it to do some check. fix 1: unwrap the `CompletionException` before encoding the exception to error. 2. Even if we fixed 1st bug, we still haven't fixed this issue. After the 1st bug fixed, the client can successfully get `NotControllerException` now, and keep retrying... until timeout. So, why won't it meet the flow `3.1` mentioned above, since it has `NotControllerException`? The reason is, we wrapped the original request with `EnvelopeRequest` and forwarded to active controller. So, after the active controller completed the request, responded with `NotControllerException`, and then, wrapped into an `EnvelopeResponse` **with no error**, and then send the `EnvelopeResponse` back. That is, in the flow `3.1`, we only got "no error" from `EnvelopeResponse`, not the `NotControllerException` inside. fix 2: Make the envelope response return `NotControllerException` if the controller response has `NotControllerException`. So that we can catch the `NotControllerException` on envelopeResponse to update the active controller. Reviewers: wenbingshen <oliver.shen999@gmail.com>, Ismael Juma <ismael@juma.me.uk>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
In Kafka Raft mode, the flow sending request from client to controller is like this:
3.1. if the response has "disconnected" or "NOT_CONTROLLER" error, which means the cached active controller is changed. So, clear the cached active controller, and wait for next retry to get the updated active controller from
controllerNodeProvider
3.2. else, complete the request and respond back to client
In this bug, we have 2 issues existed:
UNKNOWN_SERVER_ERROR
will be returned. The reason is theNotControllerException
is wrapped by aCompletionException
when theFuture
completeExceptionally. And theCompletionException
will not match any Errors we defined, so theUNKNOWN_SERVER_ERROR
will be returned. Even if we don't want theNotControllerException
return back to client, we need to know it to do some check.fix 1: unwrap the
CompletionException
before encoding the exception to error.NotControllerException
now, and keep retrying... until timeout. So, why won't it meet the flow3.1
mentioned above, since it hasNotControllerException
? The reason is, we wrapped the original request withEnvelopeRequest
and forwarded to active controller. So, after the active controller completed the request, responded withNotControllerException
, and then, wrapped into anEnvelopeResponse
with no error, and then send theEnvelopeResponse
back. That is, in the flow3.1
, we only got "no error" fromEnvelopeResponse
, not theNotControllerException
inside.new: fix 2: Make the envelope response return
NotControllerException
if the controller response hasNotControllerException
. So that we can catch theNotControllerException
on envelopeResponse to update the active controller.old: fix 2: in flow
3.1
, parse theEnvelopeResponse
to check if there'sNotControllerException
inside.Note: in the jira ticket we think there's
Recorded new controller
log, which should already changed to new active controller:It's is true, but after further investigation, I found it only changed for this thread:
broker0_:BrokerToControllerChannelManager broker=0 name=heartbeat
. The topic creation thread is forwarding thread:broker0_:BrokerToControllerChannelManager broker=0 name=forwarding
, which still using old active controller.Committer Checklist (excluded from commit message)