Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8443 Broker support for fetch from followers #6832

Merged
merged 56 commits into from Jul 4, 2019

Conversation

@mumrah
Copy link
Contributor

mumrah commented May 28, 2019

Follow on to #6731, this PR adds broker-side support for KIP-392 (fetch from followers).

Changes:

  • All brokers will handle FetchRequest regardless of leadership
  • Leaders can compute a preferred replica to return to the client
  • New ReplicaSelector interface for determining the preferred replica
  • Incremental fetches will include partitions with no records if the preferred replica has been computed
  • Adds new JMX to expose the current preferred read replica of a partition in the consumer

Two new conditions were added for completing a delayed fetch. They both relate to communicating the high watermark to followers without waiting for a timeout:

  • For regular fetches, if the high watermark changes within a single fetch request
  • For incremental fetch sessions, if the follower's high watermark is lower than the leader

A new JMX attribute preferred-read-replica was added to the kafka.consumer:type=consumer-fetch-manager-metrics,client-id=some-consumer,topic=my-topic,partition=0 object. This was added to support the new system test which verifies that the fetch from follower behavior works end-to-end. This attribute could also be useful in the future when debugging problems with the consumer.

@mumrah

This comment has been minimized.

Copy link
Contributor Author

mumrah commented Jun 6, 2019

retest this please

@mumrah

This comment has been minimized.

Copy link
Contributor Author

mumrah commented Jun 10, 2019

retest this please

Copy link
Contributor

hachikuji left a comment

Thanks, looks pretty clean. Left a few comments.

@mumrah

This comment has been minimized.

Copy link
Contributor Author

mumrah commented Jun 11, 2019

Retest this please

Copy link
Contributor

hachikuji left a comment

Thanks for the updates. Did another pass.

/**
* The number of milliseconds (if any) since the last time this replica was caught up to the high watermark.
*/
Optional<Long> lastCaughtUpTimeMs();

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jun 12, 2019

Contributor

Perhaps we can document what this returns for the leader?

This comment has been minimized.

Copy link
@mumrah

mumrah Jun 12, 2019

Author Contributor

Hmm, what would that be exactly? The fetch time? Or would it remain at zero for the leader replica?

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jun 12, 2019

Contributor

Yes, that is what I was wondering 😉 . Possibly this should be a delta since the last caught up time or something. Then for the leader, it would always be 0.

This comment has been minimized.

Copy link
@mumrah

mumrah Jun 14, 2019

Author Contributor

Looks like this value only gets updated for follower replicas, so the leader will have the initial value of 0L. This maps to Optional.empty() for a ReplicaView. Sound reasonable?

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jun 19, 2019

Contributor

Did you not like my suggestion to treat this as a delta since the last time we were caught up? In that case, the value is just always 0 for a leader.

core/src/main/scala/kafka/server/MetadataCache.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/ReplicaManager.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/ReplicaManager.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/ReplicaManager.scala Outdated Show resolved Hide resolved
@mumrah

This comment has been minimized.

Copy link
Contributor Author

mumrah commented Jun 13, 2019

retest this please

Copy link
Contributor

junrao left a comment

@mumrah : Thanks for the PR. Made a pass of the server side changes. A few comments below.

case None =>
Node.noNode()
}}).toMap
.filter(pair => !pair._2.isEmpty)

This comment has been minimized.

Copy link
@junrao

junrao Jun 27, 2019

Contributor

Could we use case to avoid unnamed reference _2?

metadata => findPreferredReadReplica(tp, metadata, replicaId, fetchInfo.fetchOffset))

if (preferredReadReplica.isDefined && !preferredReadReplica.contains(localBrokerId)) {
// If the a preferred read-replica is set and is not this replica (the leader), skip the read

This comment has been minimized.

Copy link
@junrao

junrao Jun 27, 2019

Contributor

typo "the a"

.map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L))
.get

replicaInfoSet.add(leaderReplica)

This comment has been minimized.

Copy link
@junrao

junrao Jun 27, 2019

Contributor

The leader replica has been added to replicaInfoSet earlier. Do we need to add it again?

This comment has been minimized.

Copy link
@mumrah

mumrah Jun 27, 2019

Author Contributor

It's actually not in there since Partition#getReplica returns None for the leader replica after a recent change. I'll update the code to be more clear.

@@ -329,7 +329,7 @@ public int hashCode() {
result = 31 * result + Long.hashCode(highWatermark);
result = 31 * result + Long.hashCode(lastStableOffset);
result = 31 * result + Long.hashCode(logStartOffset);
result = 31 * result + (preferredReadReplica != null ? preferredReadReplica.hashCode() : 0);
result = 31 * result + Objects.hashCode(preferredReadReplica);

This comment has been minimized.

Copy link
@junrao

junrao Jun 27, 2019

Contributor

In FetchRequest.java, it would be useful to add a comment above the following line to summarize the changes.

private static final Schema FETCH_REQUEST_V11 = new Schema(

@@ -357,12 +357,38 @@ class Log(@volatile var dir: File,

def lastStableOffsetLag: Long = highWatermark - lastStableOffset

/**
* Fully materialize and return an offset snapshot including segment position info. This method will update
* the LogOffsetMetadata for the high watermark and log start offset if they are message-only. Throws an

This comment has been minimized.

Copy link
@junrao

junrao Jun 27, 2019

Contributor

It seems that we are updating LogOffsetMetadata for last stable offset, not log start offset?

This comment has been minimized.

Copy link
@mumrah

mumrah Jun 27, 2019

Author Contributor

Good catch! The "LSO" tripped me up

@@ -42,7 +43,8 @@ case class FetchMetadata(fetchMinBytes: Int,
fetchIsolation: FetchIsolation,
isFromFollower: Boolean,
replicaId: Int,
fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)]) {
fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)],
hasFetchSession: Boolean) {

This comment has been minimized.

Copy link
@junrao

junrao Jun 27, 2019

Contributor

Should we add the new param to toString()?

if (fetchMetadata.isFromFollower && fetchMetadata.hasFetchSession) {
// Case G check if the follower has the correct HW from the leader
val followerHW = followerHighwatermarks(topicPartition)
if (followerHW.isEmpty || followerHW.exists(hw => offsetSnapshot.highWatermark.messageOffset > hw)) {

This comment has been minimized.

Copy link
@junrao

junrao Jun 27, 2019

Contributor

Hmm, it's possible for a follower to always use FullFetchContext due to no available session slots. In that case, followerHW is always empty, but we don't always want to return a fetch response immediately.

mumrah added 4 commits Jun 27, 2019
This adds highWatermark field to Replica that gets updated along with the other fields
upon a successful read. This is used in ReplicaManager and DelayedFetch to determine
if we should return early due to the follower having an old HW.
core/src/main/scala/kafka/cluster/Replica.scala Outdated Show resolved Hide resolved

// If the last last sent HW for each requested partitions is behind the HW of the local log,
// we should return immediately to ensure the follower has an up-to-date HW
val allPartitionsNeedHwUpdate: Boolean = if (isFromFollower) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jun 28, 2019

Contributor

Have you also considered doing this check in updateFollowerFetchState? It is a bit annoying to need another pass over the partitions when we already had the context we needed to make a decision.

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Jul 2, 2019

retest this please

1 similar comment
@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Jul 2, 2019

retest this please

@@ -1521,6 +1527,21 @@ class ReplicaManager(val config: KafkaConfig,
}
}

private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = {
nonOfflinePartition(topicPartition) match {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

We could probably simplify this a little bit. Maybe something like this:

nonOfflinePartition(topicPartition).map(_.getReplica(followerId)) match {
  case Some(replica) => 
    replica.updateLastSentHighWatermark(highWatermark)
  case None =>
    warn(s"While updating the HW for follower $followerId for partition $topicPartition, " +
         s"the replica could not be found.")
}
def offsetSnapshot: LogOffsetSnapshot = {
val highWatermark: LogOffsetMetadata = if (_highWatermarkMetadata.messageOffsetOnly) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

Technically _highWatermarkMetadata might change between the time we call messageOffsetOnly and the time we access it below. Maybe we could write it like this:

var highWatermark = _highWatermarkMetadata
if (highWatermark.messageOffsetOnly) {
  ...
@@ -226,6 +230,10 @@ case class FetchSession(val id: Int,
Option(partitionMap.find(new CachedPartition(topicPartition))).map(_.fetchOffset)
}

def getFollowerHighWatermark(topicPartition: TopicPartition): Option[Long] = synchronized {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

Seems we didn't need this?

snapshot.partitionStates.get(topic).flatMap(_.get(partitionId)).map(partitionInfo => {
val replicaIds = partitionInfo.basePartitionState.replicas
replicaIds.asScala
.map(replicaId => replicaId.intValue() -> {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

We could make this a flatMap and skip the filter below.

@@ -856,16 +869,31 @@ class ReplicaManager(val config: KafkaConfig,
logReadResultMap.put(topicPartition, logReadResult)
}

val allPartitionsNeedHwUpdate: Boolean = isFromFollower &&
logReadResults.forall {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

Wouldn't we want to respond if any of the high watermarks need updating?

Also, we can probably turn this into a var and check it in the pass above where we are accumulating bytesReadable.


// Check if the HW known to the follower is behind the actual HW
val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
.exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

Just checking my understanding, but this doesn't account for hw changes as a result of the fetch, right?

This comment has been minimized.

Copy link
@mumrah

mumrah Jul 3, 2019

Author Contributor

Correct, since the highWatermark returned in the readInfo is the HW before the read occurs, that's what we're comparing here. If the read caused the HW to advance, we wouldn't know about it here.

@@ -195,6 +196,24 @@ class MetadataCache(brokerId: Int) extends Logging {
}
}

def getPartitionReplicaEndpoints(topic: String, partitionId: Int, listenerName: ListenerName): Map[Int, Node] = {
val snapshot = metadataSnapshot
snapshot.partitionStates.get(topic).flatMap(_.get(partitionId)).map(partitionInfo => {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

nit: conventionally, we write this as

.map { partitionInfo =>
...
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
replicaSelector.select(tp, clientMetadata, partitionInfo).asScala
.filter(!_.endpoint.isEmpty)
.filter(!_.equals(leaderReplica))

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

nit: should be able to use !=

I think it's worth a comment explaining why we only return the preferred read replica if it is not the leader.

Option.empty
} else {
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(tp.topic(), tp.partition(), new ListenerName(clientMetadata.listenerName))
val now = time.milliseconds

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 2, 2019

Contributor

nit: we can pass fetchTimeMs from readFromLocalLog

core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/Log.scala Show resolved Hide resolved
Copy link
Contributor

hachikuji left a comment

LGTM. Thanks for the patch!

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Jul 4, 2019

retest this please

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Jul 4, 2019

Submitted #7033 for previous failures.

@hachikuji hachikuji merged commit 23beeea into apache:trunk Jul 4, 2019
1 of 3 checks passed
1 of 3 checks passed
JDK 11 and Scala 2.12 FAILURE 10291 tests run, 65 skipped, 0 failed.
Details
JDK 8 and Scala 2.11 FAILURE 11627 tests run, 67 skipped, 1 failed.
Details
JDK 11 and Scala 2.13 SUCCESS 11627 tests run, 67 skipped, 0 failed.
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

4 participants
You can’t perform that action at this time.