Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: use monotonic clock for replica fetcher DelayedItem #8517

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

lbradstreet
Copy link
Contributor

@lbradstreet lbradstreet commented Apr 19, 2020

Switches the replica fetcher to use a monotonic clock to calculate delays.

In doing so an unnecessary time conversion was removed the replica fetcher hot path. This is a small but significant (2.2%) cost when testing clusters with high replica counts when throttling has engaged (see attached profile).

timeunit convert

We perform an unnecessary conversion in the replica fetcher hotpath.
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with Logging {
* The remaining delay time
*/
def getDelay(unit: TimeUnit): Long = {
unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
unit.convert(getDelayMs, TimeUnit.MILLISECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still used after this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it's only used in a test. So I suggest we remove it (as well as the Delayed implementation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. I'll drop it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I have removed getDelay and we no longer implement Delayed. In doing so I also switched the fetcher to a monotonic clock, as our existing implementation is dangerous.

@ijuma
Copy link
Contributor

ijuma commented Apr 19, 2020

ok to test

@ijuma
Copy link
Contributor

ijuma commented Apr 20, 2020

Same test failed in both jobs:

kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

Seems unrelated, but will retest just in case.

@ijuma
Copy link
Contributor

ijuma commented Apr 20, 2020

retest this please

@ijuma
Copy link
Contributor

ijuma commented Apr 21, 2020

Unrelated flaky test:

org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_beta]

@lbradstreet lbradstreet changed the title MINOR: avoid unnecessary delay conversion in isDelayed check MINOR: use monotonic clock for replica fetcher DelayedItem Apr 21, 2020
@@ -21,24 +21,19 @@ import java.util.concurrent._

import org.apache.kafka.common.utils.Time

import scala.math._
class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Can we not pass the time from every caller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, I was just trying to avoid the replica fetcher having to pass Time.SYSTEM in everywhere. If you prefer that I can change it. Is there any downside what I did though?

Copy link
Contributor

@ijuma ijuma Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should pass the actual time instance so that we can mock if we want. It has one in its constructor already:

class ReplicaFetcherThread(name: String,
                           fetcherId: Int,
                           sourceBroker: BrokerEndPoint,
                           brokerConfig: KafkaConfig,
                           failedPartitions: FailedPartitions,
                           replicaMgr: ReplicaManager,
                           metrics: Metrics,
                           time: Time,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes, I was just coming back to say your suggestion was better for testability reasons.

@ijuma
Copy link
Contributor

ijuma commented Apr 22, 2020

ok to test

@@ -21,24 +21,19 @@ import java.util.concurrent._

import org.apache.kafka.common.utils.Time

import scala.math._
class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
Copy link
Contributor

@ijuma ijuma Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does time need to be a public val?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll make it private.

@@ -60,6 +61,7 @@ class ReplicaAlterLogDirsThreadTest {

when(replicaManager.futureLogExists(t1p0)).thenReturn(false)

val time = new SystemTime()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use Time.SYSTEM, not create a new instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider making SystemTime package private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I followed the abstract fetcher tests which created new SystemTime(s) too. I'll see if it's easy to make SystemTime package private.

@lbradstreet
Copy link
Contributor Author

lbradstreet commented Apr 23, 2020

I updated the PR to pass Time everywhere that's required. Note that I did not switch the fetcher tests to use MockTime as it appears they are timing dependent and may depend on AbstractFetcherThread's use of await, so I would want to make the change carefully.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@ijuma
Copy link
Contributor

ijuma commented Apr 23, 2020

retest this please

@lbradstreet
Copy link
Contributor Author

@ijuma can this be merged?

@ijuma
Copy link
Contributor

ijuma commented May 13, 2020

ok to test

@ijuma
Copy link
Contributor

ijuma commented May 13, 2020

@lbradstreet Compile errors:

12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java:29: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package
12:38:04 import org.apache.kafka.common.utils.SystemTime;
12:38:04 ^
12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:33: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package
12:38:04 import org.apache.kafka.common.utils.SystemTime;
12:38:04 ^
12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:63: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package
12:38:04 private static final SystemTime SYSTEM_TIME = new SystemTime();
12:38:04 ^
12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java:98: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package
12:38:04 time = new SystemTime();
12:38:04 ^
12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:63: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package
12:38:04 private static final SystemTime SYSTEM_TIME = new SystemTime();

@lbradstreet
Copy link
Contributor Author

@ijuma thanks, I probably should have merged in before asking.

@lbradstreet
Copy link
Contributor Author

@ijuma the updated PR should compile OK. I removed the change to make SystemTime package private as it seems to be used in other tooling now.

@ijuma
Copy link
Contributor

ijuma commented May 13, 2020

ok to test

@ijuma
Copy link
Contributor

ijuma commented May 15, 2020

retest this please

@ijuma
Copy link
Contributor

ijuma commented May 17, 2020

ok to test

@ijuma
Copy link
Contributor

ijuma commented May 17, 2020

retest this please

@chia7712
Copy link
Contributor

EmbeddedKafkaCluster created by Stream UT uses MockTime to create KafkaServer(
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java#L81) so the check of delayed item gets impacted by MockTime.

@lbradstreet
Copy link
Contributor Author

EmbeddedKafkaCluster created by Stream UT uses MockTime to create KafkaServer(
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java#L81) so the check of delayed item gets impacted by MockTime.

@chia7712 thanks, nice catch. That does make things trickier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants