Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x JavaFxScheduler rewrite, fix for #48 #50

Merged
merged 1 commit into from Feb 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
317 changes: 137 additions & 180 deletions src/main/java/rx/schedulers/JavaFxScheduler.java
@@ -1,12 +1,12 @@
/**
* Copyright 2016 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,194 +18,151 @@
import javafx.animation.KeyFrame;
import javafx.animation.Timeline;
import javafx.application.Platform;
import javafx.event.ActionEvent;
import javafx.event.EventHandler;
import javafx.util.Duration;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* Executes work on the JavaFx UI thread.
* This scheduler should only be used with actions that execute quickly.
*/
public final class JavaFxScheduler extends Scheduler {
private static final JavaFxScheduler INSTANCE = new JavaFxScheduler();

/* package for unit test */JavaFxScheduler() {
}

public static JavaFxScheduler getInstance() {
return INSTANCE;
}
public static JavaFxScheduler platform() {
return INSTANCE;
}

private static void assertThatTheDelayIsValidForTheJavaFxTimer(long delay) {
if (delay < 0 || delay > Integer.MAX_VALUE) {
throw new IllegalArgumentException(String.format("The JavaFx timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
}
}

@Override
public Worker createWorker() {
return new InnerJavaFxScheduler();
}

private static class InnerJavaFxScheduler extends Worker implements Runnable {

private final CompositeSubscription tracking = new CompositeSubscription();

/** Allows cheaper trampolining than invokeLater(). Accessed from EDT only. */
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
/** Allows cheaper trampolining than invokeLater(). Accessed from EDT only. */
private int wip;

@Override
public void unsubscribe() {
tracking.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return tracking.isUnsubscribed();
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
long delay = Math.max(0,unit.toMillis(delayTime));
assertThatTheDelayIsValidForTheJavaFxTimer(delay);

class DualAction implements EventHandler<ActionEvent>, Subscription, Runnable {
private Timeline timeline;
final SerialSubscription subs = new SerialSubscription();
boolean nonDelayed;

private void setTimer(Timeline timeline) {
this.timeline = timeline;
}

@Override
public void handle(ActionEvent event) {
run();
}

@Override
public void run() {
if (nonDelayed) {
try {
if (tracking.isUnsubscribed() || isUnsubscribed()) {
return;
}
action.call();
} finally {
subs.unsubscribe();
}
} else {
timeline.stop();
timeline = null;
nonDelayed = true;
trampoline(this);
}
}

@Override
public boolean isUnsubscribed() {
return subs.isUnsubscribed();
}

@Override
public void unsubscribe() {
subs.unsubscribe();
}
public void set(Subscription s) {
subs.set(s);
}
}

final DualAction executeOnce = new DualAction();
tracking.add(executeOnce);

final Timeline timer = new Timeline(new KeyFrame(Duration.millis(delay), executeOnce));
executeOnce.setTimer(timer);
timer.play();

executeOnce.set(Subscriptions.create(() -> {
timer.stop();
tracking.remove(executeOnce);
}));

return executeOnce;
}

@Override
public Subscription schedule(final Action0 action) {
final BooleanSubscription s = BooleanSubscription.create();
Runnable runnable = () -> {
try {
if (tracking.isUnsubscribed() || s.isUnsubscribed()) {
return;
}
action.call();
} finally {
tracking.remove(s);
}
};
tracking.add(s);

if (Platform.isFxApplicationThread()) {
if (trampoline(runnable)) {
return Subscriptions.unsubscribed();
}
}else {
queue.offer(runnable);
Platform.runLater(this);
}

// wrap for returning so it also removes it from the 'innerSubscription'
return Subscriptions.create(() -> tracking.remove(s));
}
/**
* Uses a fast-path/slow path trampolining and tries to run
* the given runnable directly.
* @param runnable
* @return true if the fast path was taken
*/
boolean trampoline(Runnable runnable) {
// fast path: if wip increments from 0 to 1
if (wip == 0) {
wip = 1;
runnable.run();
// but a recursive schedule happened
if (--wip > 0) {
do {
Runnable r = queue.poll();
r.run();
} while (--wip > 0);
}
return true;
}
queue.offer(runnable);
run();
return false;
}
@Override
public void run() {
if (wip++ == 0) {
do {
Runnable r = queue.poll();
r.run();
} while (--wip > 0);
}
}
}
private static final JavaFxScheduler INSTANCE = new JavaFxScheduler();

/* package for unit test */JavaFxScheduler() {
}

public static JavaFxScheduler getInstance() {
return INSTANCE;
}

public static JavaFxScheduler platform() {
return INSTANCE;
}

private static void assertThatTheDelayIsValidForTheJavaFxTimer(long delay) {
if (delay < 0 || delay > Integer.MAX_VALUE) {
throw new IllegalArgumentException(String.format("The JavaFx timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
}
}

@Override
public Worker createWorker() {
return new JavaFxWorker();
}

/**
* A Worker implementation which manages a queue of QueuedRunnable for execution on the Java FX Application thread
* For a simpler implementation the queue always contains at least one element.
* {@link #head} is the element, which is in execution or was last executed
* {@link #tail} is an atomic reference to the last element in the queue, or null when the worker was disposed
* Recursive actions are not preferred and inserted at the tail of the queue as any other action would be
* The Worker will only schedule a single job with {@link Platform#runLater(Runnable)} for when the queue was previously empty
*/
private static class JavaFxWorker extends Worker implements Runnable {
private volatile QueuedRunnable head = new QueuedRunnable(null); /// only advanced in run(), initialised with a starter element
private final AtomicReference<QueuedRunnable> tail = new AtomicReference<>(head); /// points to the last element, null when disposed

private static class QueuedRunnable extends AtomicReference<QueuedRunnable> implements Subscription, Action0 {
private volatile Action0 action;

private QueuedRunnable(Action0 action) {
this.action = action;
}

@Override
public void unsubscribe() {
action = null;
}

@Override
public boolean isUnsubscribed() {
return action == null;
}

@Override
public void call() {
Action0 action = this.action;
if (action != null) {
action.call();
}
this.action = null;
}
}

@Override
public void unsubscribe() {
tail.set(null);
QueuedRunnable qr = this.head;
while (qr != null) {
qr.unsubscribe();
qr = qr.getAndSet(null);
}
}

@Override
public boolean isUnsubscribed() {
return tail.get() == null;
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
long delay = Math.max(0, unit.toMillis(delayTime));
assertThatTheDelayIsValidForTheJavaFxTimer(delay);

final QueuedRunnable queuedRunnable = new QueuedRunnable(action);
if (delay == 0) { // delay is too small for the java fx timer, schedule it without delay
return schedule(queuedRunnable);
}

final Timeline timer = new Timeline(new KeyFrame(Duration.millis(delay), event -> schedule(queuedRunnable)));
timer.play();

return Subscriptions.create(() -> {
queuedRunnable.unsubscribe();
timer.stop();
});
}

@Override
public Subscription schedule(final Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}

final QueuedRunnable queuedRunnable = action instanceof QueuedRunnable ? (QueuedRunnable) action : new QueuedRunnable(action);

QueuedRunnable tailPivot;
do {
tailPivot = tail.get();
} while (tailPivot != null && !tailPivot.compareAndSet(null, queuedRunnable));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop appends elements and is the only critical operation for the queue.
It's a simple, atomic, linked list.
The current tail element is fetched from the pointer tail and remembered as tailPivot for later evaluation.
Iff tailPivot==null the worker was disposed/unsubscribed and we bail out (see line 146 as well).
Otherwise we try to append the new queuedRunnable to the tailPivot, which we expect to have no successor and thus do CAS with null. This fails iff some other thread successfully appended its element. In this case we start over.
When we successfully appended our element to the queue it is available for execution, but we need to advance tail to point to the new element as all other threads will be busy waiting for the correct tailPivot. This is done in line 148. The CAS there will fail iff the worker was disposed/unsubscribed which means tail has been cleared and all waiting threads will bail out anyways.

If a thread fails to append its element another succeeded. Deadlock and Livelock are not possible but a very unlucky thread might starve in this loop. But it is extremely unlikely and requires an never ending stream of scheduling requests. I could introduce counter measures against starvation, but I don't believe the overhead and added complexity is worth it when the worker is busy with an infinite amount of work.


if (tailPivot == null) {
queuedRunnable.unsubscribe();
} else {
tail.compareAndSet(tailPivot, queuedRunnable); // can only fail with a concurrent dispose and we don't want to override the disposed value
if (tailPivot == head) {
if (Platform.isFxApplicationThread()) {
run();
} else {
Platform.runLater(this);
}
}
}
return queuedRunnable;
}

@Override
public void run() {
for (QueuedRunnable qr = head.get(); qr != null; qr = qr.get()) {
qr.call();
head = qr;
}
}
}
}