New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-1215: Rack-Aware replica assignment option #132

Closed
wants to merge 61 commits into
base: trunk
from

Conversation

Projects
None yet
8 participants
@allenxwang
Copy link
Contributor

allenxwang commented Aug 11, 2015

Please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment for the overall design.

The update to TopicMetadataRequest/TopicMetadataResponse will be done in a different PR.

@asfbot

This comment has been minimized.

Copy link

asfbot commented Aug 11, 2015

kafka-trunk-git-pr #127 SUCCESS
This pull request looks good

Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Conflicts:
	core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

Updated test to remove JUnit3Suite
@asfbot

This comment has been minimized.

Copy link

asfbot commented Aug 14, 2015

kafka-trunk-git-pr #145 SUCCESS
This pull request looks good

* 1 -> 3,1,5
* 2 -> 1,5,4
* 3 -> 5,4,2
* 4 -> 4,2,1

This comment has been minimized.

@hachikuji

hachikuji Aug 18, 2015

Contributor

4,2,0?

This comment has been minimized.

@allenxwang

allenxwang Aug 19, 2015

Contributor

That's correct. Will fix.

val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
val leader = arrangedBrokerList(firstReplicaIndex)
var replicaList = List(leader)
if (!useRackAware) {

This comment has been minimized.

@hachikuji

hachikuji Aug 18, 2015

Contributor

It might make this code a little easier to follow if you split the rack-aware and default assignments into separate functions. What do you think?

This comment has been minimized.

@allenxwang

allenxwang Aug 19, 2015

Contributor

It will be difficult to separate them out as they actually share quite a lot of common logic, specifically around choosing the leader of the partition. The code change may seem a lot but actually very little for the default assignment algorithm other than changing the name of brokerList to arrangedBrokerList.

I can try separate out the logic of choosing followers into different functions for default vs. rack aware assignment and see how it looks like.

This comment has been minimized.

@hachikuji

hachikuji Aug 19, 2015

Contributor

Yep, there would probably be some redundancy, but at least the default path would be uncluttered with all the rack-aware logic. I don't think it's too bad as it is, but clearer separation would be nice if possible.

result.reverse
}

private[admin] def getReverseMap(brokerRackMap: Map[Int, String]): Map[String, List[Int]] = {

This comment has been minimized.

@hachikuji

hachikuji Aug 18, 2015

Contributor

Maybe getInverseMap instead?

This comment has been minimized.

@allenxwang

allenxwang Aug 19, 2015

Contributor

sure.

Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Conflicts:
	core/src/main/scala/kafka/server/KafkaApis.scala
	core/src/main/scala/kafka/server/KafkaConfig.scala
	core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Separate out function to get replica list for replica assignment.
@asfbot

This comment has been minimized.

Copy link

asfbot commented Aug 21, 2015

kafka-trunk-git-pr #187 FAILURE
Looks like there's a problem with this pull request

@allenxwang

This comment has been minimized.

Copy link
Contributor

allenxwang commented Aug 21, 2015

The failed test in the last PR build passed on my laptop locally. It seems to be flaky as it also failed on an earlier build without my changes.

@asfbot

This comment has been minimized.

Copy link

asfbot commented Aug 21, 2015

kafka-trunk-git-pr #193 SUCCESS
This pull request looks good

@joestein

This comment has been minimized.

Can you comment this a bit more please?

TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
val assignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, Seq(topic))
.map(p => p._1.partition -> p._2)
val brokerRackMap = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack2");

This comment has been minimized.

@joestein

joestein Jan 11, 2016

Contributor

can you add some negative testing please, folks do weird and odd things in their properties by accident and we want to guard against that too, etc

class SimpleRackLocator(zkClient: ZkClient, props: Properties) extends RackLocator(zkClient, props) {
override def getRackInfo(): Map[Int, String] = {
val brokerRackMap = props.asScala.map {
case (key, value) => (key.toInt, value)

This comment has been minimized.

@joestein

joestein Jan 11, 2016

Contributor

something about the scala of this makes me want to say it should be an implicit, that is a much bigger topic and change so i would say maybe not introduce that now but here is one of a lot of places we could without losing readability or performance reduce code. Maybe even try it with this change as your converting type only in RacLocator

"--partitions", "18",
"--replication-factor", "3",
"--rack-locator-class", "kafka.admin.SimpleRackLocator",
"--rack-locator-properties", "0=rack1,1=rack2,2=rack2,3=rack1,4=rack3,5=rack3",

This comment has been minimized.

@joestein

joestein Jan 11, 2016

Contributor

rack-locator might be a bit confusing to the user when just coming and looking at the new command / api changes in a release. why not rack-aware or rack-placement-class (keep the SimplaRackLocator class as is) and then rack-placement-properties? or something of the sort?

@@ -486,6 +486,8 @@ class KafkaConfigTest {
case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricReporterClassesProp => // ignore string
case KafkaConfig.RackLocatorClassProp => // ignore

This comment has been minimized.

@joestein

joestein Jan 11, 2016

Contributor

what/why are we ignoring here? not looking at entire class just seeing the diff hard to say if this makes sense or not to ignore

@joestein

This comment has been minimized.

Copy link
Contributor

joestein commented Jan 11, 2016

one overall general comment on the implementation is that the brokers properties themselves could cary this information making it so the topic creator doesn't have to know this. the fact is that still a lot of humans run the topic command but in many cases it is some software system operationally doing it. in either case if the broker had a property rack=0 or whatever it could then just be the way you have the topic distribute that information it should already be able to gather. Granted, this implementation saves a lot of having to store it in zookeeper so rationally speaking this is better than putting any code in to that.

Sorry if I missed the entire discussion thread on this just seeing it for first time. I like it, would love to see this get into trunk and start to be used and also in the next release.

Nice work so far!!!

@allenxwang

This comment has been minimized.

Copy link
Contributor

allenxwang commented Jan 11, 2016

Please see KIP-36 for the latest proposal. (https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment)

The biggest difference is that the rack information is added as a broker meta data in ZooKeeper. Consequently the inter-broker (UpdateMetadataRequest) and client to broker meta data query protocol (TopicMetadataResponse) will be changed to have rack information.

Once the KIP is accepted, I will update this PR to incorporate these new ideas.

@sslavic

This comment has been minimized.

Copy link
Member

sslavic commented Jan 19, 2016

Wouldn't it be nice to have metric per topic partition, in how many different racks do ISRs live?

allenxwang added some commits Jan 29, 2016

Merge branch 'trunk' of github.com:apache/kafka into KAFKA-1215
Conflicts:
	clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
	core/src/main/scala/kafka/admin/AdminUtils.scala
	core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
	core/src/main/scala/kafka/admin/TopicCommand.scala
	core/src/main/scala/kafka/server/KafkaApis.scala
	core/src/main/scala/kafka/server/KafkaConfig.scala
First pass of adding rack as a configuration and a member for Broker.…
… Updated ZkUtils for serializing and deserializing broker information with rack. Refactoring AdminUtils.assignReplicaToBrokers to support rack aware assignment.
Use NULLABLE_STRING for rack in UpdateMetadataRequest protocol. Make …
…ReassigPartitionCommand rack aware. Fix Junit tests.
for (_ <- 0 until replicationFactor - 1) {
var done = false
while (!done) {
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))

This comment has been minimized.

@junrao

junrao Mar 12, 2016

Contributor

This doesn't seem to be 100% safe in that we can potentially assign 2 replicas to the same broker. Consider the following example.

rack : a b a a
broker: 0 1 2 3

At some point, we assign the 1st replica to broker 1. Suppose that nextReplicaShift is 0. We then assign 2nd replica to broker 2. When assigning the 3rd replica, we will be skipping 3 and 0 and assign broker 2 to the 3rd replica again.

Before we don't have this issue since when assigning replicas other than the 1st, we cycle through the brokers sequentially without skipping. The new logic allows skipping. So, it's possible for us to hit the same broker.

This comment has been minimized.

@allenxwang

allenxwang Mar 12, 2016

Contributor

That's a very good point. I will address this in my next update.

This comment has been minimized.

@allenxwang

allenxwang Mar 12, 2016

Contributor

Thinking a little bit more on this, I think this situation is actually covered by the algorithm. In this case, there are three replicas and only two racks. Once replicas are assigned to 1 and 2, we know that all racks have replicas for the partition and skipping behavior will stop.

This comment has been minimized.

@junrao

junrao Mar 12, 2016

Contributor

Right, this example actually works. But the following won't. Consider the following broker to rack mapping.

rack : a b c a a
broker: 0 1 2 3 4

Let's say you want to have 4 replicas and the first replica is assigned to broker 2. Then you assign 2nd replica to 3. Then you skip broker 4 and 0 since both are on rack a and not all racks are filled yet. Then you assign 3rd replica to 1. Finally, you will assign 4th replica to broker 3 again.

This comment has been minimized.

@allenxwang

allenxwang Mar 12, 2016

Contributor

Excellent example. Added the logic to prevent assigning replica twice to the same broker for the same partition.

import org.junit.Test
import scala.collection.Map

class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest {

This comment has been minimized.

@junrao

junrao Mar 12, 2016

Contributor

I still think that this test and AdminRackAwareTest (except for testGetBrokerMetadatas) can still be simplified. For example, if we restructure the code a bit by wrapping assignReplicasToBrokers in another helper method that takes the following signature, then we should be able to test all kinds of rack/rackAwareMode combinations w/o needing to start broker/ZK, right?

(brokerMetadatas: Seq[Broker],
rackAwareMode: RackAwareMode,
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1)

This comment has been minimized.

@allenxwang

allenxwang Mar 12, 2016

Contributor

I don't think that will help from test's perspective. Even if we add rackAwareMode here, we still need to make sure that for auto topic creation and command line tools (where you can disable rack aware) the right RackAwareMode is used.

The tests that have dependency on broker/ZK make sure no matter how underlying API is structured, the end result is correct. So I think there are values in the tests.

}

@Test
def testAssignmentWithRackAwareWithNotEnoughPartitions() {

This comment has been minimized.

@junrao

junrao Mar 12, 2016

Contributor

Hmm, what does NotEnoughPartitions mean?

This comment has been minimized.

@allenxwang

allenxwang Mar 12, 2016

Contributor

I will fix the confusing name. The test makes sure the algorithm works when the number of partition is not multiple of brokers.


if (verifyRackAware) {
val partitionRackMap = distribution.partitionRacks
assertEquals(List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.size))

This comment has been minimized.

@junrao

junrao Mar 12, 2016

Contributor

Is filling the same value expensive? Would it be more efficient to just iterate each size and do a check?

This comment has been minimized.

@ijuma

ijuma Mar 12, 2016

Contributor

This is really cheap compared to other things we do in our tests and it gives better error messages.

replicationFactor: Int,
verifyRackAware: Boolean = true,
verifyLeaderDistribution: Boolean = true,
verifyReplicasDistribution: Boolean = true) {

This comment has been minimized.

@junrao

junrao Mar 12, 2016

Contributor

Should we also verify that no two replicas from the same partition are assigned to the same broker?

This comment has been minimized.

@allenxwang

allenxwang Mar 12, 2016

Contributor

Yes, I will add that check.



@Test
def testAssignmentWithRackAwareWith12Partitions() {

This comment has been minimized.

@junrao

junrao Mar 12, 2016

Contributor

Is there anything special with 12 partitions?

This comment has been minimized.

@allenxwang

allenxwang Mar 12, 2016

Contributor

Probably not. :)

In general, these tests run very fast since all they do is operate on collections in memory. So I have not thought about reducing the number of tests.

@junrao

This comment has been minimized.

Copy link
Contributor

junrao commented Mar 12, 2016

@granthenke : For your previous question on the json version for ZK registration, my preference is still to do the version change now. This way, our hands are not tied for potential future changes and it's also easier to document this. As for compatibility, most people will probably be on 0.9.0.1 before they upgrade to 0.10.0. So the impact should be limited.

allenxwang added some commits Mar 12, 2016

Merge branch 'KAFKA-1215' of github.com:allenxwang/kafka into KAFKA-1215
Conflicts:
	core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala

Added logic to prevent assigning replicas twice to the same broker for the same partition and enhanced tests for that.
val numPartitions = 6
val replicationFactor = 4
val brokerMetadatas = toBrokerMetadata(rackInfo)
assertEquals(brokerList, brokerMetadatas.map(_.id))

This comment has been minimized.

@ijuma

ijuma Mar 13, 2016

Contributor

Nitpick: we don't really need this assertEquals or the brokerList val since that is checking that toBrokerMetadata works correctly, which is not the purpose of this test.

This comment has been minimized.

@allenxwang

allenxwang Mar 14, 2016

Contributor

I think there is value in checking this to make sure test set up is correct. Otherwise if toBrokerMetadata is changed, there are two possibilities:

  • Test fails and it is difficult to debug why it fails
  • Test passes but is actually weakened

This comment has been minimized.

@ijuma

ijuma Mar 14, 2016

Contributor

@allenxwang, we use toBrokerMetadata in many other tests and we don't check its behaviour in the other cases, so it looks a bit inconsistent. In my opinion, if we want to be sure about its behaviour, we should write a test for it instead of checking its behaviour inside other tests. In any case, this is a very minor point and I'm fine if we leave as is.

@@ -304,7 +461,8 @@ object AdminUtils extends Logging {

/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
* @param zkUtils Zookeeper utilities used to write the config to ZK
*
* @param zkUtils Zookeeper utilities used to write the config to ZK

This comment has been minimized.

@ijuma

ijuma Mar 13, 2016

Contributor

This was probably an accidental reformatting by IntelliJ.

@@ -316,7 +474,8 @@ object AdminUtils extends Logging {

/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
* @param zkUtils Zookeeper utilities used to write the config to ZK
*
* @param zkUtils Zookeeper utilities used to write the config to ZK

This comment has been minimized.

@ijuma

ijuma Mar 13, 2016

Contributor

This was probably an accidental reformatting by IntelliJ.

}

@Test
def testGetBrokerMetadatas() {

This comment has been minimized.

@junrao

junrao Mar 14, 2016

Contributor

Only this test needs ZK. Could we pull this test to a different class and remove the ZK dependency from this class? Otherwise, each test will unnecessarily start a ZK server, which will slow down the test.

This comment has been minimized.

@allenxwang

allenxwang Mar 14, 2016

Contributor

Will do.

verifyReplicasDistribution: Boolean = true) {
// always verify that no broker will be assigned for more than one replica
for ((_, brokerList) <- assignment) {
assertEquals(brokerList.toSet.size, brokerList.size)

This comment has been minimized.

@junrao

junrao Mar 14, 2016

Contributor

Could we add an error message in assertEquals? Ditto in the assertEquals below.

@granthenke

This comment has been minimized.

Copy link
Contributor

granthenke commented Mar 14, 2016

@junrao Thanks for the confirmation. I understand json version will change.

allenxwang and others added some commits Mar 14, 2016

Merge remote-tracking branch 'apache/trunk' into KAFKA-1215
* apache/trunk:
  KAFKA-3013: Display the topic-partition in the exception message for expired batches in recordAccumulator
  KAFKA-3375; Suppress deprecated warnings where reasonable and tweak compiler settings
  KAFKA-3373; add 'log' prefix to configurations in KIP-31/32
  MINOR: Remove unused method, redundant in interface definition and add final for object used in sychronization
  KAFKA-3395: prefix job id to internal topic names
  KAFKA-2551; Update Unclean leader election docs
  KAFKA-3047: Explicit offset assignment in Log.append can corrupt the log
@junrao

This comment has been minimized.

Copy link
Contributor

junrao commented Mar 15, 2016

Thanks for the patch. LGTM. Could you rebase?

@ijuma

This comment has been minimized.

Copy link
Contributor

ijuma commented Mar 15, 2016

@allenxwang, the following merges master into your branch: allenxwang#5

Merge pull request #5 from ijuma/KAFKA-1215
KAFKA-1215: Merge master and fix conflicts

@asfgit asfgit closed this in 951e30a Mar 15, 2016

hachikuji added a commit to hachikuji/kafka that referenced this pull request Feb 27, 2017

hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 2, 2017

hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 3, 2017

hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 9, 2017

hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 10, 2017

apurvam pushed a commit to apurvam/kafka that referenced this pull request Mar 15, 2017

ijuma pushed a commit to ijuma/kafka that referenced this pull request Mar 22, 2017

hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 22, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment