Skip to content

Scheduler Improvements 507#1166

Closed
akarnokd wants to merge 10 commits into
ReactiveX:masterfrom
akarnokd:SchedulerImprovements507
Closed

Scheduler Improvements 507#1166
akarnokd wants to merge 10 commits into
ReactiveX:masterfrom
akarnokd:SchedulerImprovements507

Conversation

@akarnokd

@akarnokd akarnokd commented May 7, 2014

Copy link
Copy Markdown
Member

NewThread/EventLoop scheduler improvements proposal.

NewThreadScheduler

There is no need to have an innerSubscription there as the underlying Executor knows what tasks are in its queue and a shutdown will cancel them anyway. In the original version, I made a small mistake by leaving out an innerSubscription.add(s) which was one of the main contribution to the speed improvements (did not affect correctness).

EventLoopScheduler

An EventLoopScheduler needs to track its tasks so it can selectively cancel them in the NewThreadScheduler. Since an ELS worker is single threaded, the addition and removal of the completed tasks are more like queue operations; adding a last item and removing a first item in CompositeSubscription can be expensive even if the size is small due to the copying and state machine overhead.

SubscriptionQueue

Therefore, I've built a special array-based (ringbuffer) queue called SubscriptionQueue which can resize itself as needed, similar to ArrayDeque, but behaves like a composite and queued items can be unsubscribed at once and provides the usual cancellation policy. When benchmarked with a simple loop of add/remove pair, it gives ~ 532 Mops/s whereas CompositeSubscription gives 16 Mops/s. It uses synchronized as generally one needs to synchronize the producer(s) of tasks with the completion of the tasks on the worker thread, where the producer count is likely small. The initial queue capacity is 8 which favors fast tasks and the array fits nicely into a typical 64 byte cache line. One unique property is that it dequeues based on object identity and not the head of the queue. The reason for this is that when there are multiple producers, queueing their subscriptions might happen in different order than their tasks are scheduled (i.e., the head of the queue points to s1 while t2 gets scheduled first).

Perf tests

I did some perf testing with ComputationSchedulerPerf but I ran into some trouble: the initial 512M memory for the benchmark doesn't seem to be enough, especially on a 4/8 core machine. Both the master and this PR goes really slow or fails with GC errors because the internal queues of the Executors get flooded with tasks. Each task is about 650 byte and having tens of thousands queued up consumes lot of memory. I run the perftests with 1300M which was enough although still pounding on the GC. (Btw, I don't understand the perf code: does the subscribeOn test do 1M one-time subscription, or subscribes to a stream of 1M elements? If the latter, what are the tasks that hammer the executor?)

Master (https://gist.github.com/akarnokd/1fe0fb74f896c48c61a8)

Benchmark                                   (size)   Mode   Samples         Mean   Mean error    Units
r.s.ComputationSchedulerPerf.observeOn           1   avgt         5     1944,238       64,954    ns/op
r.s.ComputationSchedulerPerf.observeOn        1024   avgt         5   119353,236     1968,424    ns/op
r.s.ComputationSchedulerPerf.observeOn     1048576   avgt         5 129596502,875 11464347,421    ns/op
r.s.ComputationSchedulerPerf.subscribeOn         1   avgt         5     1659,862      462,385    ns/op
r.s.ComputationSchedulerPerf.subscribeOn      1024   avgt         5  4643038,650 39587231,914    ns/op
r.s.ComputationSchedulerPerf.subscribeOn   1048576   avgt         5   317353,904   566003,080    ns/op

This proposal (https://gist.github.com/akarnokd/6d9ba66761c5bdb8ecd7)

Benchmark                                   (size)   Mode   Samples         Mean   Mean error    Units
r.s.ComputationSchedulerPerf.observeOn           1   avgt         5     1858,486       73,600    ns/op
r.s.ComputationSchedulerPerf.observeOn        1024   avgt         5   117680,475     2305,467    ns/op
r.s.ComputationSchedulerPerf.observeOn     1048576   avgt         5 129298288,453 14093401,564    ns/op
r.s.ComputationSchedulerPerf.subscribeOn         1   avgt         5      839,078      293,626    ns/op
r.s.ComputationSchedulerPerf.subscribeOn      1024   avgt         5    51253,754   150446,345    ns/op
r.s.ComputationSchedulerPerf.subscribeOn   1048576   avgt         5   371325,179   692740,819    ns/op

The ObserveOn test benefits minimally from the changes. For the 1 onSubscribe, it drops to 840ns. For the other sizes, I suspect hectic GC overhead so I can't declare a winner.

(Generally, since we use a lot of AtomicXYZ classes, they add 24 bytes to the memory footprint every time a Subscription is present. In order to get rid of them, one would need to replace it with volatile fields and Unsafe calls to get the CAS functionality back.)

Benchmarked on an i7 920 @ 2.66GHz, 4/8 hyperthreaded cores, 6GB total RAM, Windows 7 x64, Java 7u55 x64.

@daschl

daschl commented May 7, 2014

Copy link
Copy Markdown
Contributor

@akarnokd you can use the atomic field updaters :) http://normanmaurer.me/blog/2013/10/28/Lesser-known-concurrent-classes-Part-1/

@akarnokd

akarnokd commented May 7, 2014

Copy link
Copy Markdown
Member Author

Thanks, I barely remembered them. I'd go for them but I have doubts on their performance due to lot of security checks (and how Android would behave). I found this thread which says even Java 7 moved away.

@daschl

daschl commented May 7, 2014

Copy link
Copy Markdown
Contributor

Oh ok. They use it in netty which also works on android IIRC. Might be wirth giving a shot and comparing GC. Also mem in your case will go down?

@akarnokd

akarnokd commented May 7, 2014

Copy link
Copy Markdown
Member Author

I'll continue experimenting with the memory footprint tomorrow.

@benjchristensen

Copy link
Copy Markdown
Member

the addition and removal of the completed tasks are more like queue operations

I believe this also covers throttleLast and debounce use cases where we are unscheduling previously scheduled tasks, correct?

@akarnokd

akarnokd commented May 8, 2014

Copy link
Copy Markdown
Member Author

I thought about this and found a very common case which makes this still slow and/or a memory hog: let's schedule a task T1 with a long delay, then start scheduling immediate tasks. Because the T1 is not removed from the head of the queue, the dequeueing of subsequent tasks end up doing O(n) lookup. In addition, since the head pointer is pinned, once the tail wraps around, it will grow the queue. I have a few ideas to resolve this:

  • have a CompositeSubscription track the delayed tasks and SubscriptionQueue track the immediate tasks. If using the improvements from PR Large CompositeSubscription performance improvements #1145, both sides should be performing well.
  • in SubscriptionQueue do a compaction when (head == tail && size < capacity) and grow otherwise.

Unfortunately, throttleLast and debounce fall into this problem as well. If there are no other tasks on the scheduler, both perform well. If interleaved with other tasks, they get slow and wasteful. The resolution may help them both.

@cloudbees-pull-request-builder

Copy link
Copy Markdown

RxJava-pull-requests #1086 SUCCESS
This pull request looks good

@akarnokd

akarnokd commented May 8, 2014

Copy link
Copy Markdown
Member Author

Did some changes and experiments.

Switching to AtomicReferenceFieldUpdater in CompositeSubscription along with the Set switchover logic made the class speed up from 15.6Mops/s to 17.1Mops/s while reducing the memory footprint of 1M scheduled tasks from 289 MB to 269 MB. In addition, scheduling 1M tasks with long delays takes 12 minutes in master and 12 seconds with this construct. Benchmarking with 512M memory still takes very long, therefore, I quit most apps and gave 3GB to the benchmark. The detailed results are here.

r.s.ComputationSchedulerPerf.observeOn           1   avgt         5     1942,807       61,622    ns/op
r.s.ComputationSchedulerPerf.observeOn        1024   avgt         5   118349,757      347,097    ns/op
r.s.ComputationSchedulerPerf.observeOn     1048576   avgt         5 127616477,514 11356451,968    ns/op
r.s.ComputationSchedulerPerf.subscribeOn         1   avgt         5      966,550      475,269    ns/op
r.s.ComputationSchedulerPerf.subscribeOn      1024   avgt         5    40487,316   249552,416    ns/op
r.s.ComputationSchedulerPerf.subscribeOn   1048576   avgt         5    92865,282   351942,131    ns/op

The single call case got a tiny bit more expensive because the emission of a single item unsubscribes the worker with two subscription containers.

The memory consumption was measured via heap dump when this program pauses:

public class CompositeSubscriptionMemoryOverhead {
    public static void main(String[] args) throws IOException {
        int n = 1024 * 1024;
        Worker w = Schedulers.computation().createWorker();

        for (int i = 0; i < n; i++) {
            if (i % (128 * 1024) == 0) {
                System.out.println(i);
            }
            w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
        }

        System.out.print("Press ENTER to quit");
        System.in.read();
    }
}

@daschl

daschl commented May 8, 2014

Copy link
Copy Markdown
Contributor

@akarnokd did you also check with your proposed Unsafe call? Would be interesting how that changes in comparison to the atomic field updater :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if it makes much difference, but if you switch these if-statements, there's one less method call if unsubscribed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I swithc them, then if delayTime <= 0 there will be two isUnsubscribed call which is likely more expensive.

@akarnokd

akarnokd commented May 8, 2014

Copy link
Copy Markdown
Member Author

I started out with Unsafe on CompositeSubscription and gave 17.2Mops/s for add/remove, but the build started to complain about using it. I guessed @benjchristensen wouldn't want that and the performance difference is so small it was not worth it.

@akarnokd

Copy link
Copy Markdown
Member Author

I'll post a new PR to catch-up with master.

@akarnokd akarnokd closed this May 12, 2014
@akarnokd akarnokd deleted the SchedulerImprovements507 branch January 20, 2015 15:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants