-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4485; Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader #2208
Conversation
cc @junrao |
if(!inSyncReplicas.contains(replica) && | ||
assignedReplicas.map(_.brokerId).contains(replicaId) && | ||
replica.logEndOffset.offsetDiff(leaderHW) >= 0) { | ||
logReadResult.info.fetchOffsetMetadata.messageOffset >= Math.max(replica.lastLeaderLogEndOffset, logReadResult.hw)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach solves problem stated in the ticket when replication factor >= 3, but it seems not solving the problem of replication factor = 2. When replication factor is 2 and ISR only contains leader, the high watermark will essentially be the log end offset of the leader. If we compare the fetch starting offset with the max of lastLeaderLogEndOffset
and the current high watermark, it may still always appear to be lagging behind if there are small frequent produce requests.
This might not be that bad, as long as the replica is fetching faster than the producing rate, it will eventually catch up. I think this patch is an improvement but not solving all the issues. Let me think a bit more on this to see if we can have a more thorough solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just updated the solution in https://issues.apache.org/jira/browse/KAFKA-4485. I think should fix the problem for replication factor = 2 as well. Can you take a look?
@junrao @ijuma I have implemented the solution described in https://issues.apache.org/jira/browse/KAFKA-4485. Since this patch will change the way we measure ISR set and high watermark, it is better to get more eyes on this. Can you take a look? |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = { | ||
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) | ||
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = { | ||
val allLogEndOffsets = assignedReplicas.filter(curTime - _.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs).map(_.logEndOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure about this. One of the things that we have to guarantee is that a committed message (i.e., any message with offset < HW) must be present in every ISR. With this change, do we still guarantee that? It seems that assignedReplicas.filter(curTime - _.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs) could be a subset of ISR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao Yes it is still guaranteed, by making sure that HW >= LEO of every replica in ISR in maybeExpandIsr()
. assignedReplicas.filter(curTime - _.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs)
will be a superset of ISR, because lag of any replica in ISR should not exceed replicaManager.config.replicaLagTimeMaxMs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see you point. You are right, it may be subset of ISR because maybeShrink() is called periodically. I just fixed this problem by explicitly including ISR in consideration here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 : Thanks for the patch. A few comments.
if(!inSyncReplicas.contains(replica) && | ||
assignedReplicas.map(_.brokerId).contains(replicaId) && | ||
replica.logEndOffset.offsetDiff(leaderHW) >= 0) { | ||
logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw && | ||
logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we changed how HW is advanced, do we still need the second test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is still needed. Otherwise the hw watermark may decrease when a replica catches up.
For example, let's say replication factor = 2, isr includes only the leader. At time t1, hw = 100 and leader's LEO = 100. The leader receives follower's fetch request at t1 with offset = 90, and sends back fetch response in offset range [90, 100). At time t2, hw = 120 and leader's LEO = 120. The leader receives follower's fetch request at t2 with offset = 100. Suppose t2 - t1 < replicaLagTimeMaxMs, if we don't have 2nd check, the follower would be added to the isr set and hw would decrease from 120 to 100.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it? The follower can only be added to ISR if its fetch offset is >= HW, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait.. I misunderstand the 2nd check..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I just realized that the second test is assignedReplicas.map(_.brokerId).contains(replicaId)
. My comment above is related to the 3rd test logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw
.
I think we still need the 2nd test. It seems to me that the usage of this second check is not affected by this patch. I think the 2nd test is added to prevent a broker from being added to ISR if this broker is not in the replica set of this partition. A broker out of replica set may send fetch request to the leader in the case of partition reassignment, if the controller has shrunk the replica set of this partition but the out-dated follower isn't aware of this yet. This scenario will still happen after this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. If we do the test provided above, the maybeExpandIsr() will be slightly inaccurate in theory depending on how often maybeShrinkIsr() is executed. Let say replicaLagTimeMaxMs = 10 seconds and maybeShrinkIsr() is executed every 2 seconds. Then a replica in ISR may lag behind leader's LEO by up to 12 seconds, which means that HW may lag behind leader's LEO by up to 12 seconds. Thus a replica may lag behind leader's LEO by up to 12 seconds even if its fetch offset >= hw. If we apply this example to the test above, logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs
can be larger than replicaManager.config.replicaLagTimeMaxMs
even if logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw
.
Therefore I think this check is nice to have. On the other hand it should be OK to remove that check as well since the 20% inaccuracy happens with low probability and is probably not a big deal in practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another benefit is readability. Because logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw
doesn't guarantee logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs
, have this in the check allows us explicitly enforce the requirement of ISR (i.e. a replica is in ISR iff its replica lag <= replicaLagTimeMaxMs) in the code with best effort (because maybeShrinkIsr() doesn't enforce it strictly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao Thanks for review. Is there anything I need to do for this patch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 : Thanks for the explanation. It makes sense. The only thing is that currently in ReplicaFetcherThread.shouldFollowerThrottle(), we need to know if a follower is in ISR or not, and this is purely based on the fetch offset and the HW. So, just checking the HW here makes the decision on whether a replica is in-sync more consistent between the leader and the follower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I see. I just removed this check to make them more consistent. And I added comment in maybeExpandIsr() to explain it. I think both implementation will work with or without this check.
* Else if the FetchRequest reads up to the log end offset of the the leader when the previous fetch request was received, | ||
* set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was received. | ||
*/ | ||
def maybeUpdateCatchUpTimestamp(logReadResult: LogReadResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't need to change the logic in maybeExpandIsr(), could the logic in this method be just folded into updateLogReadResult()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, this can be folded into updateLogReadResult(). They need to be separately originally at the first version of this patch. I forgot to change it in the second version. I will update it now.
* set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was received. | ||
*/ | ||
def maybeUpdateCatchUpTimestamp(logReadResult: LogReadResult) { | ||
if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this check is true, the check in line 68 must also be true. So, it seems just doing the check in line 68 is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes a difference when this is the first fetch request from this follower to this leader after this leader starts. Also this can make lastCaughtUpTimeMsUnderlying
slightly more accurate in general. Thus I think this check is nice to have. But it probably still works even if we remove this check.
@@ -214,7 +216,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
|
|||
def startup() { | |||
// start ISR expiration thread | |||
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS) | |||
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs/5, unit = TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this changed intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is indented. I think this makes the guarantee of replicaLagTimeMaxMs
better. Otherwise, a replica can be stay in ISR for up to 2 x replicaLagTimeMaxMs
even if it is out of sync. Say replicaLagTimeMaxMs = 100 sec
, the follower's lastCatchUpTime = t1. maybeShrinkIsr() is executed at time t1 + 99 and time t1 + 199. The follower will be able to stay in ISR for 199 seconds and only removed from ISR at time t1 + 199. We can reduce this inaccuracy to 20% by running maybeShrinkIsr
once every replicaLagTimeMaxMs/5.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
2f5689a
to
4684fe5
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 : Thanks for the updated patch. Left a few more comments. Do you have any test results to share? Basically, how effective this change is in dealing with the issue with constant small requests at the leader and whether there is any degradation in other situations.
if(!inSyncReplicas.contains(replica) && | ||
assignedReplicas.map(_.brokerId).contains(replicaId) && | ||
replica.logEndOffset.offsetDiff(leaderHW) >= 0) { | ||
logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw && | ||
logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 : Thanks for the explanation. It makes sense. The only thing is that currently in ReplicaFetcherThread.shouldFollowerThrottle(), we need to know if a follower is in ISR or not, and this is purely based on the fetch offset and the HW. So, just checking the HW here makes the decision on whether a replica is in-sync more consistent between the leader and the follower.
@@ -298,7 +300,8 @@ class Partition(val topic: String, | |||
|
|||
// check if the HW of the partition can now be incremented | |||
// since the replica maybe now be in the ISR and its LEO has just incremented | |||
maybeIncrementLeaderHW(leaderReplica) | |||
// TODO: is this maybeIncrementLeaderHW() necessary? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need this since every time the follower's fetch offset advances, we may need to advance the HW.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So we call maybeIncrementLeaderHW()
here not because of maybeIncrementLeaderHW()
, but because of maybeIncrementLeaderHW
. I was mislead by the comment because it says ...since the replica maybe now be in the ISR and its LEO has just incremented
. Having a replica newly added to ISR should not increment HW.
It seems clear to call maybeIncrementLeaderHW()
in maybeIncrementLeaderHW()
. It's not a big deal. I will simply update the comment.
@@ -36,25 +36,41 @@ class Replica(val brokerId: Int, | |||
// for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch | |||
@volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata | |||
|
|||
// The log end offset value at the time the leader receives last FetchRequest from this follower. | |||
// This is used to determine the last catch-up time of the follower | |||
@volatile var lastFetchLeaderLogEndOffset: Long = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we initialize this to MAX_LONG? Otherwise, the first fetch request from a follower will make it a caught-up replica.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should by default consider a replica to be out-of-sync. I have made the following two fixes. I have kept lastFetchLeaderLogEndOffset
as 0L. This is because the first fetch request from a follower will set lastCaughtUpTimeMsUnderlying
to lastFetchTimeMs
, which is initialized to 0L in Replica.java.
- Initialize
lastCaughtUpTimeMsUnderlying
toAtomicLong(0L)
in Replica.java - When broker receives LeaderAndIsrRequest to make it leader for a partition, reset
lastCaughtUpTimeMsUnderlying
of all replicas of this partition to 0L.
|
||
// The time when the leader receives last FetchRequest from this follower | ||
// This is used to determine the last catch-up time of the follower | ||
@volatile var lastFetchTimeMs: Long = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem this is used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in updateLogReadResult()
. When offset of fetch request >= lastFetchLeaderLogEndOffset, the lastCatchUpTime will be lastFetchTimeMs if it is current time.
* set the lastCaughtUpTimeMsUnderlying to the time when the current fetch request was received. | ||
* | ||
* Else if the FetchRequest reads up to the log end offset of the the leader when the previous fetch request was received, | ||
* set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can add a comment why we need to do this? Basically the situation where the leader gets constant small produce requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Added comment now.
@@ -217,7 +219,8 @@ class ReplicaManager(val config: KafkaConfig, | |||
|
|||
def startup() { | |||
// start ISR expiration thread | |||
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS) | |||
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x (1 + 20%) before it is removed from ISR | |||
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 5, unit = TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. It makes sense. I am just a bit concerned of the overhead of calling maybeShrinkIsr() to frequently. Perhaps we can do /2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I will change this to /2.
On the other hand, I am not sure it is is expensive. When ISR of most partitions don't have to be shrinked, the cost of maybeShrinkIsr() is comparing the latchCatchUpTimeMs of all replicas with current time. Would this cost still be a concern if all these are done in memory without system call? I can update the patch to call time.milliseconds() only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we remove the check in maybeExpandIsr()
, it is probably more important to run maybeShrinkIsr() sooner than later. Note that if we don't shrink ISR soon enough, it is more likely to have message duplication. Here is the scenario:
- replication factor = 3 and min isr = 2. Current ISR set includes all 3 replicas.
- Replica max lag = 10 seconds, request timeout = 12 seconds, we run maybeShrink() every 5 seconds.
- At time t, A follower stops fetching but doesn't go offline.
- At time t, a produce request with ack = -1 is received by leader and the message is appended to log. Now leader will for all replicas in the ISR to fetch this message before replying to producer.
- At time t + 12, leader sends request timeout error to producer. producer retries. The message is appended to log again.
- At time t + 15, that follower is removed from ISR. HW is increased to include that message twice. Consumer will receive that message twice.
To prevent this scenario we need request timeout to be larger than max replica lag. If we allow maybeShrink() to run infrequently, then the producer's request timeout needs to be larger, which is not desirable.
@junrao Yes, this patch can observably reduce the rolling bounce time of Kafka cluster in my experiments since we only bounce a new broker when there is no URP in the cluster. Initially when I was testing rolling bounce time of a cluster of 150 brokers on 15 machines, I found that the rolling bounce can not finish and got killed by deployment tools after 1 hour because one broker just keeps reporting URP. This happens consistently for more than 5 tries. After I used this patch, the rolling bounce of the entire cluster would finish in 45 minutes. I haven't observed specific degradation due to this patch. |
2e4c970
to
d00250f
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
d1cb5b5
to
78767ef
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -77,7 +77,8 @@ | |||
public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " | |||
+ "for the response of a request. If the response is not received before the timeout " | |||
+ "elapses the client will resend the request if necessary or fail the request if " | |||
+ "retries are exhausted."; | |||
+ "retries are exhausted. request.timeout.ms should be larger than replica.lag.time.max.ms " | |||
+ "to reduce message duplication caused by unnecessary producer retry."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps make it clear that replica.lag.time.max.ms is a broker side config. Also, since this is specific to the producer, could we add it only to producer config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Fixed now.
@@ -273,20 +273,23 @@ class Partition(val topic: String, | |||
} | |||
|
|||
/** | |||
* Check and maybe expand the ISR of the partition. | |||
* Check and maybe expand the ISR of the partition. A replica is in ISR of the partition if and only if | |||
* replica's LEO >= HW and replica's lag <= replicaLagTimeMaxMs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the comment on replicaLagTimeMaxMs still relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is outdated. The comment is updated now.
replica.logEndOffset.offsetDiff(leaderHW) >= 0) { | ||
// This approximates the requirement logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs. | ||
// We don't directly specify the above requirement in order to make maybeExpandIsr() consistent with ReplicaFetcherThread.shouldFollowerThrottle() | ||
// A offset replica whose lag > ReplicaFetcherThread may still exceed hw because maybeShrinkIsr() is called periodically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment reads a bit verbose. How about the following?
Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs, even if its log end offset is >= HW. However, to be consistent with how the follower determines whether a replica is in-sync, we only check HW.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Comment is updated.
// This approximates the requirement logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs. | ||
// We don't directly specify the above requirement in order to make maybeExpandIsr() consistent with ReplicaFetcherThread.shouldFollowerThrottle() | ||
// A offset replica whose lag > ReplicaFetcherThread may still exceed hw because maybeShrinkIsr() is called periodically | ||
logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use leaderReplica.highWatermark instead of logReadResult.hw since the former is more precise. Actually, could we just keep the original code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Replaced with replica.logEndOffset.offsetDiff(leaderHW) >= 0
@@ -362,12 +365,18 @@ class Partition(val topic: String, | |||
* 1. Partition ISR changed | |||
* 2. Any replica's LEO changed | |||
* | |||
* We only increase HW if HW is smaller than LEO of all replicas whose lag <= replicaLagTimeMaxMs. | |||
* This means that if a replica does not lag much behind leader and the replica's LEO is smaller than HW, HW will | |||
* wait for this replica to catch up so that this replica can be added to ISR set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tweak the comment a bit to the following. What do you think?
The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up. This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore will never be added to ISR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Comment is updated now.
def resetLastCatchUpTime() { | ||
lastFetchLeaderLogEndOffset = 0L | ||
lastFetchTimeMs = 0L | ||
lastCaughtUpTimeMsUnderlying.set(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, when the leadership switches to a replica, this could cause an in-sync replica to be out of ISR immediately before it can make a fetch request to the new leader. In this case, we want to give every ISR the benefit of doubt and assume that it's caught up at this moment. For non-in-sync replicas, we want to assume that initially it's not caught up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since maybeShrinkIsr() is executed periodically on the order of seconds, most likely the follower's lastCaughtUpTime will be updated before maybeShrinkIsr() can be called to remove it from ISR. This method is called so that hw can be incremented despite a follower that just becomes offline.
But I just realized that we only need to call resetLastCatchUpTime() for thsoe replicas that are not in ISR. I will update the patch to fix it.
@@ -476,7 +476,8 @@ object KafkaConfig { | |||
val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" | |||
val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" | |||
val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + | |||
" the leader will remove the follower from isr" | |||
" the leader will remove the follower from isr. replica.lag.time.max.ms should be smaller than request.timeout.ms" + | |||
" to reduce message duplication caused by unnecessary producer retry." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary given the change in client config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it is good to have to in both places. But it is not necessary. I will remove it.
val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0 | ||
|
||
LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, partitionFetchSize, readToEndOfLog, None) | ||
LogReadResult(logReadInfo, initialHighWatermark, initialLogEndOffset, fetchTimeMs, partitionFetchSize, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that there are quite a few params of type long, could we use named params to instantiate LogReadResult? Ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we need named params here because the variable name is already very close to the parameter name and should explain what they stand for. I can update it to use named parameters as well. Please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for using named parameters is really to avoid accidentally passing in params in the wrong order since the types are the same. I recommend that we use that for code safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Fixed now.
@@ -126,7 +123,7 @@ class IsrExpirationTest { | |||
|
|||
// Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms | |||
for(replica <- partition0.assignedReplicas() - leaderReplica) | |||
replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), -1L, -1, false)) | |||
replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), 10L, 15L, time.milliseconds, -1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use named parameters when initializing LogReadResult? Ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Fixed now.
@@ -165,6 +162,10 @@ class IsrExpirationTest { | |||
allReplicas.foreach(r => partition.addReplicaIfNotExists(r)) | |||
// set in sync replicas for this partition to all the assigned replicas | |||
partition.inSyncReplicas = allReplicas.toSet | |||
// make the remote replica read to the end of log | |||
for(replica <- partition.assignedReplicas() - leaderReplica) | |||
replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), 10L, 10L, time.milliseconds, -1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit weird to update LogReadResult to a specific offset in a util function. Should the caller do this or at least pass in a required offset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. How about I replace 10L with 0L here? This seems more general.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; | ||
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC | ||
+ " request.timeout.ms should be larger than replica.lag.time.max.ms " | ||
+ "to reduce message duplication caused by unnecessary producer retry."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this should be in the producer config, right? Also, could we mention that replica.lag.time.max.ms is a broker side config? Otherwise, people would assume this is another producer side config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my mistake.. It is fixed now.
val r = getOrCreateReplica(replica) | ||
if (!partitionStateInfo.isr.contains(replica)) | ||
r.resetLastCatchUpTime() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For replicas in ISR, it seems that we need to set lastCaughtUpTime to be now so that they don't get dropped out of ISR immediately since shrinkIsr() can be called at any time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the broker is already the leader of this partition, then the broker already have up-to-date lastCaughtUpTime of each follower. If the broker was not the leader of this partition, in the worst case the isr will temporarily drop below min isr of this partition, making this partition unavailable for produce operation. The producer will retry upon NotEnoughReplicasException and should succeed soon. It seems OK, because when there is leadership change from one broker to another, producer need to update metadata and retry anyway.
On the other hand, if we set lastCaughtUpTime to be now, then in the worst case a replica will stay in ISR for 2 x ReplicaLagTimeMaxMs
, which seems worse because it breaks the semantics of ReplicaLagTimeMaxMs (in addition to how we execute maybeShrinkIsr()
).
What do you think? I can set lastCaughtUpTime to now if you think that is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao There is another optimization I will make after your suggestion. That is, when broker receives LeaderAndIsrRequest to become leader of a partition, for each follower of this partition, the lastFetchTimeMs will be set to curTime and lastFetchLeaderLogEndOffset will be set to current LEO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it makes sense to lastFetchTimeMs to curTime. Could we just leave lastFetchLeaderLogEndOffset to 0 or -1?
Another thing is that we probably only need to do this optimization when the leader is indeed changing.
if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) | ||
lastCaughtUpTimeMsUnderlying.set(logReadResult.fetchTimeMs) | ||
else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) | ||
lastCaughtUpTimeMsUnderlying.set(lastFetchTimeMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking what if on the first follower fetch request, lastCaughtUpTimeMsUnderlying is set to logReadResult.fetchTimeMs and in the 2nd call, it's set to lastFetchTimeMs. But this is probably not a problem since lastFetchTimeMs in the 2nd call will be the same as logReadResult.fetchTimeMs in the first call.
if(logReadResult.isReadFromLogEnd) { | ||
lastCaughtUpTimeMsUnderlying.set(time.milliseconds) | ||
} | ||
def resetLastCatchUpTime() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resetLastCatchUpTime => resetLastCaughtUpTime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed now.
val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0 | ||
|
||
LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, partitionFetchSize, readToEndOfLog, None) | ||
LogReadResult(logReadInfo, initialHighWatermark, initialLogEndOffset, fetchTimeMs, partitionFetchSize, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for using named parameters is really to avoid accidentally passing in params in the wrong order since the types are the same. I recommend that we use that for code safety.
hw = 0L, | ||
leaderLogEndOffset = 0L, | ||
fetchTimeMs =time.milliseconds, | ||
readSize = -1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is this right? It seems that we need to set lastCaughtUpTime only on the leader replica.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is right. lastCaughtUpTime doesn't matter for leader replica since leader is always the latest replica. getPartitionWithAllReplicasInIsr()
is used in this IsrExpirationTest.java
to get initialize replicas of a given partition so that all its replicas are in ISR. Thus we need to set lastCaughtUpTime of all these replicas to now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the explanation. Could you add a space after = in =time.milliseconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Fixed now.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -210,7 +210,9 @@ | |||
|
|||
/** <code>request.timeout.ms</code> */ | |||
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; | |||
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; | |||
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC | |||
+ " request.timeout.ms should be larger than replica.lag.time.max.ms " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we mention that replica.lag.time.max.ms is a broker side config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I forgot that. It is updated now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, did you push?
val r = getOrCreateReplica(replica) | ||
if (!partitionStateInfo.isr.contains(replica)) | ||
r.resetLastCaughtUpTime() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we just need to optimize for the common case. The common case is that a replica is switching from follower to leader and all existing ISRs are expected to continue to be caught up. So, setting lastCaughtUpTime to now for ISRs avoids ISR churns. This is what the original code was trying to do (by initialing lastCaughtUpTimeMsUnderlying to current time when a replica is created). It is true that in the worse case, a replica will be removed from ISR after 2 x ReplicaLagTimeMaxMs, but that should be rare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It is fixed as you suggested. Thanks for the explanation.
@@ -592,13 +600,13 @@ class ReplicaManager(val config: KafkaConfig, | |||
_: ReplicaNotAvailableException | | |||
_: OffsetOutOfRangeException) => | |||
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, | |||
partitionFetchSize, false, Some(e)) | |||
-1L, -1L, partitionFetchSize, Some(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use named params here and below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. All instantiation of LogReadResult are using named params now.
I didn't change this because most values are -1L already in this instance.
hw = 0L, | ||
leaderLogEndOffset = 0L, | ||
fetchTimeMs =time.milliseconds, | ||
readSize = -1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the explanation. Could you add a space after = in =time.milliseconds?
Refer to this link for build results (access rights to CI server needed): |
eb069f2
to
6ccc10d
Compare
…ched up to the logEndOffset of leader
…es LeaderAndIsrRequest.
6ccc10d
to
6025f9e
Compare
(assignedReplicas() - leaderReplica).foreach(replica => { | ||
val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L | ||
replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: multi-line foreach
can be written more clearly like (eliminating some parens):
foreach { replica =>
...
}
There's one other case like this in the diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I have fixed this in two places.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 : Thanks for the patience. A few more comments the latest patch.
(assignedReplicas() - leaderReplica).foreach{replica => | ||
val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L | ||
replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of things.
- I think we only need to call resetLastCaughtUpTime() if isNewLeader is true.
- It's a bit confusing to explicitly pass in lastFetchLeaderLogEndOffset and lastFetchTimeMs. I am thinking perhaps it's better to have two separate methods resetAsCaughtUp(currentTime: Long) and resetAsNotCaughtUp(). For in-sync replicas, we call the former and for other replicas, we call the latter. In resetAsCaughtUp(), we set lastCaughtUpTime to currentTime, and also set lastFetchLeaderLogEndOffset = 0 and lastFetchTimeMs = currentTime, which makes it a bit hard for the replica to get dropped out of ISR. In resetAsNotCaughtUp(), we set the lastCaughtUpTime to 0 and also set lastFetchLeaderLogEndOffset = MAX_LONG and lastFetchTimeMs = 0, which makes it a bit hard for the replica to get added back to ISR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
I think we need to call
resetLastCaughtUpTime()
even if isNewLeader is false. For example, say a follower is fully caught up before it is offline. If the leader doesn't reset lastCaughtUpTime of this follower, then in the following ReplicaLagTimeMaxMs the replica will be in ISR and hw of this partition can not increase, which essentially makes this partition unavailable to producers. -
I think we should initialize lastFetchLeaderLogEndOffset to current LEO of the leader and lastFetchTimeMs to current time regardless of whether the follower is in ISR. Although the variables are named as
lastFetch*
and updated when the fetch request is received, they are actually used to record the latest snapshot of ( leader LEO, time) pair. They should be interpreted in this way: the leader's LEO isx
at timet
. If follower's LEO >= x, then its lastCaughtTimeMs >= t. Given this interpretation, it probably doesn't make sense to setlastFetchLeaderLogEndOffset = MAX_LONG and lastFetchTimeMs = 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. Both make sense.
@@ -36,25 +36,51 @@ class Replica(val brokerId: Int, | |||
// for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch | |||
@volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata | |||
|
|||
// The log end offset value at the time the leader receives last FetchRequest from this follower. | |||
// This is used to determine the last catch-up time of the follower |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catch-up => caught up; There are a few other places like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. Previously I corrected all places by doing grep catchup -Iirn
but missed this. Just now I corrected two places after doing grep catch-up -Iirn
. I hope I have caught everything like this.
@@ -36,25 +36,51 @@ class Replica(val brokerId: Int, | |||
// for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch | |||
@volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata | |||
|
|||
// The log end offset value at the time the leader receives last FetchRequest from this follower. | |||
// This is used to determine the last catch-up time of the follower | |||
@volatile private var lastFetchLeaderLogEndOffset: Long = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to initialize the replica to the notCaughtUp state, i.e., lastCaughtUpTime = 0, lastFetchLeaderLogEndOffset = MAX_LONG and lastFetchTimeMs = 0?
@@ -210,7 +210,9 @@ | |||
|
|||
/** <code>request.timeout.ms</code> */ | |||
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; | |||
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; | |||
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC | |||
+ " request.timeout.ms should be larger than value of broker side config replica.lag.time.max.ms" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be larger than value of broker side config replica.lag.time.max.ms => should be larger than replica.lag.time.max.ms, a broker side configuration ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Fixed now.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@lindong28 : Thanks for the patch. LGTM |
Finally had a chance to go through this in detail, seems like a nice improvement. |
…ched up to the logEndOffset of leader Author: Dong Lin <lindong28@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com> Closes apache#2208 from lindong28/KAFKA-4485
No description provided.