Skip to content

Commit

Permalink
Add a simplified API for wrapping scheduled tasks. (reactor#1546)
Browse files Browse the repository at this point in the history
`Schedulers.onScheduleHook(String, Function<Runnable,Runnable>)` is a new
API for wrapping the tasks submitted to Reactor's schedulers.

The hooks intercepts `Runnable`s before they passed to the executor.
  • Loading branch information
bsideup authored and OlegDokuka committed Apr 24, 2019
1 parent ba0d1ac commit 9608c33
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 2 deletions.
Expand Up @@ -588,5 +588,4 @@ static <T, P extends Publisher<T>> Publisher<T> addAssemblyInfo(P publisher, Ass
}
return new FluxOnAssembly<>((Flux<T>) publisher, stacktrace);
}

}
99 changes: 99 additions & 0 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import reactor.core.Disposable;
Expand Down Expand Up @@ -458,6 +459,7 @@ public static void setFactory(Factory factoryInstance) {
* for this key.
* @see #setExecutorServiceDecorator(String, BiFunction)
* @see #removeExecutorServiceDecorator(String)
* @see Schedulers#onScheduleHook(String, Function)
*/
public static boolean addExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) {
synchronized (DECORATORS) {
Expand All @@ -479,6 +481,7 @@ public static boolean addExecutorServiceDecorator(String key, BiFunction<Schedul
* @param decorator the executor service decorator to add, if key not already present.
* @see #addExecutorServiceDecorator(String, BiFunction)
* @see #removeExecutorServiceDecorator(String)
* @see Schedulers#onScheduleHook(String, Function)
*/
public static void setExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) {
synchronized (DECORATORS) {
Expand Down Expand Up @@ -558,6 +561,93 @@ else if (owner instanceof DelegateServiceScheduler) {
return factory.decorateExecutorService(schedulerType, () -> beforeFactory);
}

/**
* Add or replace a named scheduling {@link Function decorator}. With subsequent calls
* to this method, the onScheduleHook hook can be a composite of several sub-hooks, each
* with a different key.
* <p>
* The sub-hook is a {@link Function} taking the scheduled {@link Runnable}.
* It returns the decorated {@link Runnable}.
*
* @param key the key under which to set up the onScheduleHook sub-hook
* @param decorator the {@link Runnable} decorator to add (or replace, if key is already present)
* @see #resetOnScheduleHook(String)
* @see #resetOnScheduleHooks()
*/
public static void onScheduleHook(String key, Function<Runnable, Runnable> decorator) {
synchronized (onScheduleHooks) {
onScheduleHooks.put(key, decorator);
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}

/**
* Reset a specific onScheduleHook {@link Function sub-hook} if it has been set up
* via {@link #onScheduleHook(String, Function)}.
*
* @param key the key for onScheduleHook sub-hook to remove
* @see #onScheduleHook(String, Function)
* @see #resetOnScheduleHooks()
*/
public static void resetOnScheduleHook(String key) {
synchronized (onScheduleHooks) {
onScheduleHooks.remove(key);
if (onScheduleHooks.isEmpty()) {
onScheduleHook = Function.identity();
}
else {
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}
}

/**
* Remove all onScheduleHook {@link Function sub-hooks}.
*
* @see #onScheduleHook(String, Function)
* @see #resetOnScheduleHook(String)
*/
public static void resetOnScheduleHooks() {
synchronized (onScheduleHooks) {
onScheduleHooks.clear();
onScheduleHook = null;
}
}

/**
* Applies the hooks registered with {@link Schedulers#onScheduleHook(String, Function)}.
*
* @param runnable a {@link Runnable} submitted to a {@link Scheduler}
* @return decorated {@link Runnable} if any hook is registered, the original otherwise.
*/
public static Runnable onSchedule(Runnable runnable) {
Function<Runnable, Runnable> hook = onScheduleHook;
if (hook != null) {
return hook.apply(runnable);
}
else {
return runnable;
}
}

/**
* Clear any cached {@link Scheduler} and call dispose on them.
*/
Expand Down Expand Up @@ -698,6 +788,11 @@ default Scheduler newSingle(ThreadFactory threadFactory) {

static volatile Factory factory = DEFAULT;

private static final LinkedHashMap<String, Function<Runnable, Runnable>> onScheduleHooks = new LinkedHashMap<>(1);

@Nullable
private static Function<Runnable, Runnable> onScheduleHook;

/**
* Get a {@link CachedScheduler} out of the {@code reference} or create one using the
* {@link Supplier} if the reference is empty, effectively creating a single instance
Expand Down Expand Up @@ -829,6 +924,7 @@ static Disposable directSchedule(ScheduledExecutorService exec,
Runnable task,
long delay,
TimeUnit unit) {
task = onSchedule(task);
SchedulerTask sr = new SchedulerTask(task);
Future<?> f;
if (delay <= 0L) {
Expand All @@ -847,6 +943,7 @@ static Disposable directSchedulePeriodically(ScheduledExecutorService exec,
long initialDelay,
long period,
TimeUnit unit) {
task = onSchedule(task);

if (period <= 0L) {
InstantPeriodicWorkerTask isr =
Expand Down Expand Up @@ -876,6 +973,7 @@ static Disposable workerSchedule(ScheduledExecutorService exec,
Runnable task,
long delay,
TimeUnit unit) {
task = onSchedule(task);

WorkerTask sr = new WorkerTask(task, tasks);
if (!tasks.add(sr)) {
Expand Down Expand Up @@ -907,6 +1005,7 @@ static Disposable workerSchedulePeriodically(ScheduledExecutorService exec,
long initialDelay,
long period,
TimeUnit unit) {
task = onSchedule(task);

if (period <= 0L) {
InstantPeriodicWorkerTask isr =
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;

Expand Down
@@ -0,0 +1,172 @@
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.scheduler;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.junit.After;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

public class SchedulersHooksTest {

@After
public void resetAllHooks() {
Schedulers.resetOnScheduleHooks();
}

@Test
public void onScheduleIsAdditive() throws Exception {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1));
Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10));
Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100));

CountDownLatch latch = new CountDownLatch(3);
Schedulers.newElastic("foo").schedule(latch::countDown);
latch.await(5, TimeUnit.SECONDS);

assertThat(tracker).as("3 decorators invoked").hasValue(111);
}

@Test
public void onScheduleReplaces() throws Exception {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1));
Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 10));
Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 100));

CountDownLatch latch = new CountDownLatch(1);
Schedulers.newElastic("foo").schedule(latch::countDown);
latch.await(5, TimeUnit.SECONDS);

assertThat(tracker).hasValue(100);
}

@Test
public void onScheduleWorksWhenEmpty() throws Exception {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1));
Schedulers.resetOnScheduleHook("k1");

CountDownLatch latch = new CountDownLatch(1);
Schedulers.newElastic("foo").schedule(latch::countDown);
latch.await(5, TimeUnit.SECONDS);

assertThat(tracker).hasValue(0);
}

@Test
public void onScheduleIgnoresUnknownRemovals() {
assertThatCode(() -> Schedulers.resetOnScheduleHook("k1"))
.doesNotThrowAnyException();
}

@Test
public void onScheduleResetOne() throws InterruptedException {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1));
Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10));
Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100));
Schedulers.resetOnScheduleHook("k2");

CountDownLatch latch = new CountDownLatch(3);
Schedulers.newElastic("foo").schedule(latch::countDown);
latch.await(5, TimeUnit.SECONDS);

assertThat(tracker).hasValue(101);
}

@Test
public void onScheduleResetAll() throws InterruptedException {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1));
Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10));
Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100));
Schedulers.resetOnScheduleHooks();

CountDownLatch latch = new CountDownLatch(1);
Schedulers.newElastic("foo").schedule(latch::countDown);
latch.await(5, TimeUnit.SECONDS);

assertThat(tracker).hasValue(0);
}

@Test
public void onSchedulesAreOrdered() throws Exception {
CopyOnWriteArrayList<String> items = new CopyOnWriteArrayList<>();
Schedulers.onScheduleHook("k1", new ApplicationOrderRecordingDecorator(items, "k1"));
Schedulers.onScheduleHook("k2", new ApplicationOrderRecordingDecorator(items, "k2"));
Schedulers.onScheduleHook("k3", new ApplicationOrderRecordingDecorator(items, "k3"));

CountDownLatch latch = new CountDownLatch(1);
Schedulers.newElastic("foo").schedule(latch::countDown);
latch.await(5, TimeUnit.SECONDS);

assertThat(items).containsExactly(
"k1#0",
"k2#0",
"k3#0"
);
}

private static class TrackingDecorator implements Function<Runnable, Runnable> {
final AtomicInteger tracker;
final int dx;

private TrackingDecorator(AtomicInteger tracker, int dx) {
this.tracker = tracker;
this.dx = dx;
}

@Override
public Runnable apply(Runnable runnable) {
return () -> {
tracker.addAndGet(dx);
runnable.run();
};
}
}

private static class ApplicationOrderRecordingDecorator
implements Function<Runnable, Runnable> {

final List<String> items;

final String id;

final AtomicInteger counter = new AtomicInteger();

public ApplicationOrderRecordingDecorator(List<String> items, String id) {
this.items = items;
this.id = id;
}

@Override
public Runnable apply(Runnable runnable) {
items.add(id + "#" + counter.getAndIncrement());
return runnable;
}
}
}

0 comments on commit 9608c33

Please sign in to comment.