Skip to content

Commit

Permalink
3.x: Introduce property rx3.scheduler.use-nanotime (#7169)
Browse files Browse the repository at this point in the history
Issue-Id: #7154

Co-authored-by: Sergej Isbrecht <sergej.isbrecht@gmail.com>
  • Loading branch information
SergejIsbrecht and Sergej Isbrecht committed Jan 28, 2021
1 parent a99b2e0 commit 171c846
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 6 deletions.
42 changes: 36 additions & 6 deletions src/main/java/io/reactivex/rxjava3/core/Scheduler.java
Expand Up @@ -60,8 +60,9 @@
* interface which can grant access to the original or hooked {@code Runnable}, thus, a repeated {@code RxJavaPlugins.onSchedule}
* can detect the earlier hook and not apply a new one over again.
* <p>
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Scheduler} implementations can override this
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx3.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Scheduler} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand All @@ -88,6 +89,34 @@
* All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
*/
public abstract class Scheduler {
/**
* Value representing whether to use {@link System#nanoTime()}, or default as clock for {@link #now(TimeUnit)}
* and {@link Scheduler.Worker#now(TimeUnit)}
* <p>
* Associated system parameter:
* <ul>
* <li>{@code rx3.scheduler.use-nanotime}, boolean, default {@code false}
* </ul>
*/
static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rx3.scheduler.use-nanotime");

/**
* Returns the current clock time depending on state of {@link Scheduler#IS_DRIFT_USE_NANOTIME} in given {@code unit}
* <p>
* By default {@link System#currentTimeMillis()} will be used as the clock. When the property is set
* {@link System#nanoTime()} will be used.
* <p>
* @param unit the time unit
* @return the 'current time' in given unit
* @throws NullPointerException if {@code unit} is {@code null}
*/
static long computeNow(TimeUnit unit) {
if(!IS_DRIFT_USE_NANOTIME) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
}

/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
Expand Down Expand Up @@ -156,7 +185,7 @@ public static long clockDriftTolerance() {
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down Expand Up @@ -362,8 +391,9 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
* track the individual {@code Runnable} tasks while they are waiting to be executed (with or without delay) so that
* {@link #dispose()} can prevent their execution or potentially interrupt them if they are currently running.
* <p>
* The default implementation of the {@link #now(TimeUnit)} method returns current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Worker} implementations can override this
* The default implementation of the {@link #now(TimeUnit)} method returns current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx3.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Worker} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand Down Expand Up @@ -482,7 +512,7 @@ public Disposable schedulePeriodically(@NonNull Runnable run, final long initial
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java
Expand Up @@ -40,6 +40,8 @@
* <li>{@code rx3.single-priority} (int): sets the thread priority of the {@link #single()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.purge-enabled} (boolean): enables periodic purging of all {@code Scheduler}'s backing thread pools, default is {@code false}</li>
* <li>{@code rx3.purge-period-seconds} (int): specifies the periodic purge interval of all {@code Scheduler}'s backing thread pools, default is 1 second</li>
* <li>{@code rx3.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)},
* instead of default {@link System#currentTimeMillis()} ({@code false})</li>
* </ul>
*/
public final class Schedulers {
Expand Down
38 changes: 38 additions & 0 deletions src/test/java/io/reactivex/rxjava3/core/SchedulerTest.java
Expand Up @@ -16,10 +16,48 @@
package io.reactivex.rxjava3.core;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class SchedulerTest {
private static final String DRIFT_USE_NANOTIME = "rx3.scheduler.use-nanotime";

@After
public void cleanup() {
// reset value to default in order to not influence other tests
Scheduler.IS_DRIFT_USE_NANOTIME = false;
}

@Test
public void driftUseNanoTimeNotSetByDefault() {
assertFalse(Scheduler.IS_DRIFT_USE_NANOTIME);
assertFalse(Boolean.getBoolean(DRIFT_USE_NANOTIME));
}

@Test
public void computeNow_currentTimeMillis() {
TimeUnit unit = TimeUnit.MILLISECONDS;
assertTrue(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
}

@Test
public void computeNow_nanoTime() {
TimeUnit unit = TimeUnit.NANOSECONDS;
Scheduler.IS_DRIFT_USE_NANOTIME = true;

assertFalse(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
assertTrue(isInRange(System.nanoTime(), Scheduler.computeNow(unit), TimeUnit.NANOSECONDS, 250, TimeUnit.MILLISECONDS));
}

private boolean isInRange(long start, long stop, TimeUnit source, long maxDiff, TimeUnit diffUnit) {
long diff = Math.abs(stop - start);
return diffUnit.convert(diff, source) <= maxDiff;
}

@Test
public void clockDriftCalculation() {
Expand Down

0 comments on commit 171c846

Please sign in to comment.