Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x JavaFxScheduler rewrite, fix for #48 #50

Merged
merged 1 commit into from Feb 23, 2017

Conversation

protogenes
Copy link
Contributor

This rewrite of the JavaFxScheduler is based on queued execution of very action. With a shortpath for the original Timeline issue in schedule(Runnable, long, TimeUnit).
It reliable solves the problems in #27 and #48, as it ensures schedule calls from within the FX thread as well as its recursive actions will be processed before returning control to the caller.
The pull request is for the fully queued version, which does not prefer recursive actions over pending or asynchronously scheduled actions. This implementation is easier and the behaviour matches that of a traditional Executor.
I also did a minimalistic benchmark and the timings for one million tasks are 800ms for the new version vs. 51800ms for the old version.

@thomasnield
Copy link
Collaborator

Looking at these now.

@thomasnield
Copy link
Collaborator

This 1.x branch looks good. All unit tests are passing so I can't object to anything about it. I created a test application too and tried to undermine it, but it looks like it is holding up. Nice work!

You'll have to forgive my hesitancy as a business-facing developer, who often takes an "if it ain't broke then don't fix it" approach to these kind of situations. But your approach does make more sense and if it eliminates overhead effectively, I am a fan.

@akarnokd before I merge this, can I recruit your wizardry to give this a quick look-over? A glance at the source code and voicing if anything pops out as problematic would be valuable.

@akarnokd
Copy link
Member

Honestly, that inlining of an MPSC linked queue logic looks fishy. Do you have tests where multiple threads bombard the scheduler with tasks to schedule? Also it looks like you have recursive scheduling/reentrancy problems again.

@thomasnield
Copy link
Collaborator

thomasnield commented Feb 16, 2017

@akarnokd I am glad I roped you in, because that is exactly what I am trying to avoid. I created this "bombard" test case, but it seems to be working fine. Do you have a better way to train-wreck it?

    @Test
    public void bombardScheduler() {
        new JFXPanel(); //initialize JavaFX

        final int max = 10000;
        final CountDownLatch latch = new CountDownLatch(max);

        Observable.range(1,max)
                .observeOn(JavaFxScheduler.getInstance())
                .flatMap(i -> Observable.just(i).subscribeOn(Schedulers.io()).observeOn(JavaFxScheduler.platform()))
                .subscribe(i -> latch.countDown());

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

@akarnokd
Copy link
Member

akarnokd commented Feb 16, 2017

Yes, cut the operators and schedule directly on the same worker of the JavaFxScheduler.platform() from multiple threads.

QueuedRunnable tailPivot;
do {
tailPivot = tail.get();
} while (tailPivot != null && !tailPivot.compareAndSet(null, queuedRunnable));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop appends elements and is the only critical operation for the queue.
It's a simple, atomic, linked list.
The current tail element is fetched from the pointer tail and remembered as tailPivot for later evaluation.
Iff tailPivot==null the worker was disposed/unsubscribed and we bail out (see line 146 as well).
Otherwise we try to append the new queuedRunnable to the tailPivot, which we expect to have no successor and thus do CAS with null. This fails iff some other thread successfully appended its element. In this case we start over.
When we successfully appended our element to the queue it is available for execution, but we need to advance tail to point to the new element as all other threads will be busy waiting for the correct tailPivot. This is done in line 148. The CAS there will fail iff the worker was disposed/unsubscribed which means tail has been cleared and all waiting threads will bail out anyways.

If a thread fails to append its element another succeeded. Deadlock and Livelock are not possible but a very unlucky thread might starve in this loop. But it is extremely unlikely and requires an never ending stream of scheduling requests. I could introduce counter measures against starvation, but I don't believe the overhead and added complexity is worth it when the worker is busy with an infinite amount of work.

@thomasnield
Copy link
Collaborator

thomasnield commented Feb 18, 2017

@akarnokd It still seems to be holding up, if I implemented your suggested test correctly as show below. @protogenes is convinced that deadlock/livelock is not a concern and thread starvation is too rare to warrant the overhead (see his comment above). I'm not good at this low-level engineering but I can't seem to stump this Scheduler at all.

    @Test
    public void bombardScheduler() {
        new JFXPanel(); //initialize JavaFX

        final int max = 10000;
        final CountDownLatch latch = new CountDownLatch(max);

        Scheduler.Worker worker = JavaFxScheduler.getInstance().createWorker();

        worker.schedule(() -> {
            Observable.merge(
                    Observable.interval(1, TimeUnit.MILLISECONDS),
                    Observable.interval(2, TimeUnit.MILLISECONDS),
                    Observable.interval(3, TimeUnit.MILLISECONDS)
            ).observeOn(JavaFxScheduler.getInstance())
             .take(max)
             .subscribe(i -> latch.countDown());
        });

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void bombardScheduler2() {
        new JFXPanel(); //initialize JavaFX

        final int max = 10000;
        final CountDownLatch latch = new CountDownLatch(max);

        Scheduler.Worker worker = JavaFxScheduler.getInstance().createWorker();

        worker.schedule(() -> Observable.merge(
                Observable.interval(1, TimeUnit.MILLISECONDS).observeOn(JavaFxScheduler.getInstance()),
                Observable.interval(2, TimeUnit.MILLISECONDS).observeOn(JavaFxScheduler.getInstance()),
                Observable.interval(3, TimeUnit.MILLISECONDS).observeOn(JavaFxScheduler.getInstance())
        )
         .take(max)
         .subscribe(i -> latch.countDown()));

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

@thomasnield
Copy link
Collaborator

Again, really trying to go haywire on this thing, and it still runs just fine:

    @Test
    public void bombardScheduler() {
        new JFXPanel(); //initialize JavaFX

        final int max = 100000;
        final CountDownLatch latch = new CountDownLatch(max);

        Scheduler.Worker worker = JavaFxScheduler.getInstance().createWorker();

        worker.schedule(() -> Observable.merge(
                Observable.interval(1, TimeUnit.MILLISECONDS, JavaFxScheduler.getInstance()).observeOn(JavaFxScheduler.getInstance()),
                Observable.interval(1, TimeUnit.MILLISECONDS)
                        .flatMap(i -> i % 2 == 0 ? Observable.just(i) : Observable.just(i).subscribeOn(JavaFxScheduler.getInstance())),
                Observable.interval(1, TimeUnit.MILLISECONDS).observeOn(JavaFxScheduler.getInstance())
        )
         .take(max)
         .subscribe(i -> latch.countDown()));

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

@akarnokd
Copy link
Member

Those tests are not stressing the same queue on the scheduler.

Worker w = JavaFXScheduler.platform().createWorker();

CountDownLatch cdl = new CountDownLatch(2);
int[] counter = { 0, 0 };

new Thread(() -> {
    for (int i = 0; i < 1_000_000; i++) {
        w.schedule(() -> { counter[0]++; });
    }
    w.schedule(cdl::countDown);
}).start();

for (int i = 0; i < 1_000_000; i++) {
    w.schedule(() -> { counter[1]++; });
}
w.schedule(cdl::countDown);

cdl.await();

assertEquals(1_000_000, counter[0]);
assertEquals(1_000_000, counter[1]);

w.unsubscribe();

…ions

implemented a queue-based JavaFxScheduler which issues only a single Platform.runLater for the first action
@thomasnield
Copy link
Collaborator

Sorry it's taking me awhile to get to this. Getting ready for an O'Reilly webcast tomorrow. I will follow up not long after I'm done.

@thomasnield
Copy link
Collaborator

Ran David's test case (pasted below) and it seems to run fine with @protogenes' latest commits. I'm not sure about this one. @akarnokd, is this something we should keep pursuing?

    @Test
    public void bombardScheduler() {
        Scheduler.Worker w = JavaFxScheduler.platform().createWorker();

        CountDownLatch cdl = new CountDownLatch(2);
        int[] counter = { 0, 0 };

        new Thread(() -> {
            for (int i = 0; i < 1_000_000; i++) {
                w.schedule(() -> counter[0]++);
            }
            w.schedule(cdl::countDown);
        }).start();

        for (int i = 0; i < 1_000_000; i++) {
            w.schedule(() -> counter[1]++);
        }
        w.schedule(cdl::countDown);

        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        assertEquals(1_000_000, counter[0]);
        assertEquals(1_000_000, counter[1]);

        w.unsubscribe();
    }

@akarnokd
Copy link
Member

That's enough confirmation for me.

@thomasnield thomasnield merged commit 90a6ca4 into ReactiveX:1.x Feb 23, 2017
@protogenes protogenes deleted the feature/scheduler-cleanup-1.x branch February 24, 2017 07:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants