Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: use monotonic clock for replica fetcher DelayedItem #8517

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.Test;

Expand Down Expand Up @@ -70,17 +69,17 @@ public void testShouldRecord() {
MetricConfig debugConfig = new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG);
MetricConfig infoConfig = new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO);

Sensor infoSensor = new Sensor(null, "infoSensor", null, debugConfig, new SystemTime(),
Sensor infoSensor = new Sensor(null, "infoSensor", null, debugConfig, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(infoSensor.shouldRecord());
infoSensor = new Sensor(null, "infoSensor", null, debugConfig, new SystemTime(),
infoSensor = new Sensor(null, "infoSensor", null, debugConfig, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertTrue(infoSensor.shouldRecord());

Sensor debugSensor = new Sensor(null, "debugSensor", null, infoConfig, new SystemTime(),
Sensor debugSensor = new Sensor(null, "debugSensor", null, infoConfig, Time.SYSTEM,
0, Sensor.RecordingLevel.INFO);
assertTrue(debugSensor.shouldRecord());
debugSensor = new Sensor(null, "debugSensor", null, infoConfig, new SystemTime(),
debugSensor = new Sensor(null, "debugSensor", null, infoConfig, Time.SYSTEM,
0, Sensor.RecordingLevel.DEBUG);
assertFalse(debugSensor.shouldRecord());
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time

import scala.math._

Expand All @@ -52,6 +53,7 @@ import scala.math._
abstract class AbstractFetcherThread(name: String,
clientId: String,
val sourceBroker: BrokerEndPoint,
val time: Time,
failedPartitions: FailedPartitions,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true,
Expand Down Expand Up @@ -633,7 +635,7 @@ abstract class AbstractFetcherThread(name: String,
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
if (!currentFetchState.isDelayed) {
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset,
currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state))
currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay, time)), currentFetchState.state))
}
}
}
Expand Down Expand Up @@ -787,14 +789,14 @@ case class PartitionFetchState(fetchOffset: Long,

def isTruncating: Boolean = state == Truncating && !isDelayed

def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
def isDelayed: Boolean = delay.exists(_.isDelayed)

override def toString: String = {
s"FetchState(fetchOffset=$fetchOffset" +
s", currentLeaderEpoch=$currentLeaderEpoch" +
s", state=$state" +
s", lag=$lag" +
s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
s", delay=$delay" +
s")"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package kafka.server

import kafka.cluster.BrokerEndPoint
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time

class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
time: Time,
replicaManager: ReplicaManager,
quotaManager: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats)
Expand All @@ -31,7 +33,7 @@ class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,

override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaAlterLogDirsThread = {
val threadName = s"ReplicaAlterLogDirsThread-$fetcherId"
new ReplicaAlterLogDirsThread(threadName, sourceBroker, brokerConfig, failedPartitions, replicaManager,
new ReplicaAlterLogDirsThread(threadName, sourceBroker, brokerConfig, time, failedPartitions, replicaManager,
quotaManager, brokerTopicStats)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@ import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.Time

import scala.jdk.CollectionConverters._
import scala.collection.{mutable, Map, Seq, Set}

class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint,
brokerConfig: KafkaConfig,
time: Time,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats)
extends AbstractFetcherThread(name = name,
clientId = name,
sourceBroker = sourceBroker,
time: Time,
failedPartitions,
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
isInterruptible = false,
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time

class ReplicaFetcherManager(brokerConfig: KafkaConfig,
private val time: Time,
protected val replicaManager: ReplicaManager,
metrics: Metrics,
time: Time,
threadNamePrefix: Option[String] = None,
quotaManager: ReplicationQuotaManager)
extends AbstractFetcherManager[ReplicaFetcherThread](
Expand All @@ -35,8 +35,8 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, failedPartitions, replicaManager,
metrics, time, quotaManager)
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, time, failedPartitions,
replicaManager, metrics, quotaManager)
}

def shutdown(): Unit = {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class ReplicaFetcherThread(name: String,
fetcherId: Int,
sourceBroker: BrokerEndPoint,
brokerConfig: KafkaConfig,
time: Time,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
metrics: Metrics,
time: Time,
quota: ReplicaQuota,
leaderEndpointBlockingSend: Option[BlockingSend] = None)
extends AbstractFetcherThread(name = name,
clientId = name,
sourceBroker = sourceBroker,
time = time,
failedPartitions,
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
isInterruptible = false,
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class ReplicaManager(val config: KafkaConfig,
)
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(time, quotaManagers.alterLogDirs, brokerTopicStats)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@volatile var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
Expand Down Expand Up @@ -1754,11 +1754,11 @@ class ReplicaManager(val config: KafkaConfig,
}

protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
new ReplicaFetcherManager(config, time, this, metrics, threadNamePrefix, quotaManager)
}

protected def createReplicaAlterLogDirsManager(quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats)
protected def createReplicaAlterLogDirsManager(time: Time, quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
new ReplicaAlterLogDirsManager(config, time, this, quotaManager, brokerTopicStats)
}

protected def createReplicaSelector(): Option[ReplicaSelector] = {
Expand Down
21 changes: 7 additions & 14 deletions core/src/main/scala/kafka/utils/DelayedItem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,17 @@ import java.util.concurrent._

import org.apache.kafka.common.utils.Time

import scala.math._

class DelayedItem(val delayMs: Long) extends Delayed with Logging {

private val dueMs = Time.SYSTEM.milliseconds + delayMs

def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
class DelayedItem(val delayMs: Long, private val time: Time) extends Logging {
private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
lbradstreet marked this conversation as resolved.
Show resolved Hide resolved

/**
* The remaining delay time
* true if the item is still delayed
*/
def getDelay(unit: TimeUnit): Long = {
unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
def isDelayed: Boolean = {
time.nanoseconds < dueNs
}

def compareTo(d: Delayed): Int = {
val other = d.asInstanceOf[DelayedItem]
java.lang.Long.compare(dueMs, other.dueMs)
override def toString: String = {
s"DelayedItem(delayMs=${TimeUnit.NANOSECONDS.toMillis(dueNs-time.nanoseconds())})"
}

}
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.cluster

import java.nio.ByteBuffer
import java.util.{Optional, Properties}
import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit, TimeoutException}
import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import com.yammer.metrics.core.Metric
Expand All @@ -34,9 +34,9 @@ import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
import org.apache.kafka.common.utils.Time
import org.junit.Test
import org.junit.Assert._
import org.mockito.Mockito._
Expand Down Expand Up @@ -1549,7 +1549,7 @@ class PartitionTest extends AbstractPartitionTest {
val topicPartition = new TopicPartition("test", 1)
val partition = new Partition(
topicPartition, 1000, ApiVersion.latestVersion, 0,
new SystemTime(), mock(classOf[PartitionStateStore]), mock(classOf[DelayedOperations]),
Time.SYSTEM, mock(classOf[PartitionStateStore]), mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]))

val replicas = Seq(0, 1, 2, 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kafka.message.NoCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.MockTime
import kafka.utils.TestUtils
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -819,6 +820,7 @@ class AbstractFetcherThreadTest {
extends AbstractFetcherThread("mock-fetcher",
clientId = "mock-fetcher",
sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt()),
time = new MockTime(),
failedPartitions,
brokerTopicStats = new BrokerTopicStats) {

Expand Down