-
Notifications
You must be signed in to change notification settings - Fork 528
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
When running the following E2E test, it was observed that the Controller Leader switched, and the child Response of BatchResponse also returned a NOT_CONTROLLER error, but the channel did not switch to the new Leader.
TC_PATHS="tests/kafkatest/tests/core/snapshot_test.py::TestSnapshots.test_controller" bash tests/docker/run_tests.sh
The errorCounts of the new APIs added by AutoMQ do not include the errors from child responses, causing the NodeToControllerChannelManager to not update the channel.
automq/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
Lines 332 to 355 in 7372871
| private[server] def handleResponse(queueItem: NodeToControllerQueueItem)(response: ClientResponse): Unit = { | |
| debug(s"Request ${queueItem.request} received $response") | |
| if (response.authenticationException != null) { | |
| error(s"Request ${queueItem.request} failed due to authentication error with controller. Disconnecting the " + | |
| s"connection to the stale controller ${activeControllerAddress().map(_.idString).getOrElse("null")}", | |
| response.authenticationException) | |
| maybeDisconnectAndUpdateController() | |
| queueItem.callback.onComplete(response) | |
| } else if (response.versionMismatch != null) { | |
| error(s"Request ${queueItem.request} failed due to unsupported version error", | |
| response.versionMismatch) | |
| queueItem.callback.onComplete(response) | |
| } else if (response.wasDisconnected()) { | |
| updateControllerAddress(null) | |
| requestQueue.putFirst(queueItem) | |
| } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { | |
| debug(s"Request ${queueItem.request} received NOT_CONTROLLER exception. Disconnecting the " + | |
| s"connection to the stale controller ${activeControllerAddress().map(_.idString).getOrElse("null")}") | |
| maybeDisconnectAndUpdateController() | |
| requestQueue.putFirst(queueItem) | |
| } else { | |
| queueItem.callback.onComplete(response) | |
| } | |
| } |
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working