-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Group and Txn metadata topics should be queried directly from the controller #7716
Conversation
@dhruvilshah3, @cmccabe, @ijuma I've published the broker-to-controller channel I've been working on. There are still a few things that I'd like to do (mainly on the testing side) hence the PR is a draft for now but wanted to get your opinion on this approach if it's in sync with what you're thinking of. (And if the implementation is in line with your thinking we could get into a deeper review.) |
|
Yea I was looking at |
Had a chat with @satishd yesterday and it seems like continuing with the |
f7b4ed4
to
722fe08
Compare
retest this please |
393a2d5
to
3d20888
Compare
@ijuma @cmccabe @hachikuji I rebased this and actualized it a bit. Would you please review this? Does this need more elaborate discussion such as a KIP? |
retest this please |
If this change adds or mutates jmx metrics, I think we should do a KIP. |
@abbccdda it shouldn't modify JMX metrics |
*/ | ||
private def getGroupMetadataTopicPartitionCount: Int = { | ||
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions) | ||
controllerChannel.getPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one needs to be more robust, now it's prone to timeout related errors. Working on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I got a high level question, which is whether the change considers the controller broker version, which means whether the targeted controller could answer the topic metadata request in all scenarios?
@@ -169,6 +169,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP | |||
var metadataCache: MetadataCache = null | |||
var quotaManagers: QuotaFactory.QuotaManagers = null | |||
|
|||
var controllerChannel: BrokerToControllerChannelManager = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit of initializing as default value here vs starting from null?
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
} catch { | ||
case e: Exception => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be more strict here about exception handling? I don't think we shall continue in every possible exceptions, if we could brainstorm :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, let's brainstorm! :)
My main goal with this was that ideally we should catch network related exceptions when the request fails due to disconnect events. Do you have specific ideas about what to catch here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I agree. We could just stay here as long as we are not detecting any other fatal exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tomorrow I'll write some test cases for this for some specific scenarios.
I think though that usually we should try and handle most exceptions and reconnect to a controller when possible. On the other hand we likely don't want to catch any non-Exception Throwables as those are usually more serious cases (for instance OOM).
@hachikuji @cmccabe Could you also take a look? |
…oControllerChannel instead of zkClient
3e1a304
to
a9ec2bd
Compare
@hachikuji , @cmccabe I rebased my solution. Would you please look at this and review it and suggest if it's fine, needs more tests or how can we proceed with this? |
Hey folks, I'm closing this PR due to the lack of interest. If anyone interested, please feel free to pick up the related jira. |
GroupMetadataManager and TransactionStateManager should use a direct broker-to-controller channel to query the number of partitions instead of relying on Zookeeper.
This change introduces a new class that always sends the request to the active controller. In case the cached controller isn't available or not the controller it closes the connection and tries to refresh itself from the local metadataCache until it finds the active controller.
BrokerToControllerMetadataManager
manages the request queue that is consumed by the request thread and also controls its lifecycle. Lazy initialization is used as the means of creating the thread so it won't try to create it before there is an actual need for it. The public methods of this class supposed to implement the high level functions that are queried by various classes (in this caseGroupMetadataManager
andTransactionStateManager
) and returnKafkaFuture
so that the users of this class can work asynchronously over a blocking connection that theBrokerToControllerRequestThread
implements.Committer Checklist (excluded from commit message)