Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Subscription Object Allocation #1281

Conversation

benjchristensen
Copy link
Member

Changes

Atomic State Machine -> Mutation & Locks

The CompositeSubscription implementation was changed from using an atomic state machine to using locks. The state machine requires immutable State that is swapped using CAS. This means an object allocation is needed each time.

It now uses locks to protect mutable state so very few objects are created.

ChainedSubscription

The CompositeSubscription requires support of randomly removing a Subscription via the remove method. The Subscriber type does not expose this so can be optimized. There is now a ChainedSubscription that is used by Subscriber instead. This allows using a LinkedList or ArrayList rather than HashSet as random access is never needed. This provides a slight performance boost and reduces memory usage (1 minute test shows 16.5GB allocation for Composite versus 14.4GC for Chained).

Allocation Comparison

This shows Java Flight Recorder output from master without these changes:

master-without-changing-compositesubscription

This shows object allocation after changing CompositeSubscription:

master-with-new-compositesubscription ong

Throughput Comparison

Testing this code:

    public void mapTransformation(UseCaseInput input) throws InterruptedException {
        input.observable.map(i -> {
            return String.valueOf(i);

        }).map(i -> {
            return Integer.parseInt(i);
        }).subscribe(input.observer);
        input.awaitCompletion();
    }
Rx 0.16.1
Run: 10 - 2,879,355 ops/sec 
Run: 11 - 3,236,245 ops/sec 
Run: 12 - 4,468,275 ops/sec 
Run: 13 - 3,237,293 ops/sec 
Run: 14 - 4,683,840 ops/sec 
Rx 0.17.6 - using OnSubscribeFunc
Run: 10 - 3,621,876 ops/sec 
Run: 11 - 6,702,412 ops/sec 
Run: 12 - 7,401,924 ops/sec 
Run: 13 - 6,653,359 ops/sec 
Run: 14 - 5,834,305 ops/sec 
Rx 0.17.6 - using OnSubscribe
Run: 10 - 3,320,053 ops/sec 
Run: 11 - 4,520,795 ops/sec 
Run: 12 - 7,107,320 ops/sec 
Run: 13 - 5,089,058 ops/sec 
Run: 14 - 5,534,034 ops/sec 

Run: 10 - 4,930,966 ops/sec 
Run: 11 - 6,119,951 ops/sec 
Run: 12 - 7,062,146 ops/sec 
Run: 13 - 6,514,657 ops/sec 
Run: 14 - 6,369,426 ops/sec 
Rx 0.18.3 - using OnSubscribe
Run: 10 - 4,178,854 ops/sec 
Run: 11 - 4,446,420 ops/sec 
Run: 12 - 5,458,515 ops/sec 
Run: 13 - 8,006,405 ops/sec 
Run: 14 - 7,849,293 ops/sec 

Run: 10 - 4,145,936 ops/sec 
Run: 11 - 6,553,079 ops/sec 
Run: 12 - 7,645,259 ops/sec 
Run: 13 - 7,385,524 ops/sec 
Run: 14 - 7,830,853 ops/sec 
Rx 0.19 master - CompositeSubscription state machine
Run: 10 - 10,576,302 ops/sec 
Run: 11 - 10,518,786 ops/sec 
Run: 12 - 10,554,646 ops/sec 
Run: 13 - 10,314,063 ops/sec 
Run: 14 - 10,666,439 ops/sec 
Rx 0.19 master - CompositeSubscription with synchronized HashSet
Run: 10 - 9,949,060 ops/sec 
Run: 11 - 10,122,379 ops/sec 
Run: 12 - 10,018,032 ops/sec 
Run: 13 - 10,072,522 ops/sec 
Run: 14 - 10,132,636 ops/sec 
Rx 0.19 master - ChainedSubscription with synchronized ArrayList
Run: 10 - 11,086,351 ops/sec 
Run: 11 - 10,932,426 ops/sec 
Run: 12 - 11,002,431 ops/sec 
Run: 13 - 10,888,620 ops/sec 
Run: 14 - 11,157,227 ops/sec 

Run: 10 - 9,371,192 ops/sec 
Run: 11 - 9,829,169 ops/sec 
Run: 12 - 10,139,005 ops/sec 
Run: 13 - 10,099,785 ops/sec 
Run: 14 - 10,017,631 ops/sec 
Rx 0.19 master - ChainedSubscription with synchronized LinkedList
Run: 10 - 10,619,431 ops/sec 
Run: 11 - 11,063,293 ops/sec 
Run: 12 - 11,001,463 ops/sec 
Run: 13 - 11,054,243 ops/sec 
Run: 14 - 10,898,826 ops/sec 

Run: 10 - 10,075,465 ops/sec 
Run: 11 - 9,780,716 ops/sec 
Run: 12 - 9,885,134 ops/sec 
Run: 13 - 9,584,143 ops/sec 
Run: 14 - 10,000,700 ops/sec 

@benjchristensen
Copy link
Member Author

Grrr, some unit tests are failing ...

@Override
public void unsubscribe() {
synchronized (this) {
if (unsubscribed || subscriptions == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the chain is never added, this won't set the chain to unsubscribed = true.

@benjchristensen
Copy link
Member Author

Fixed the failing unit tests ... silly mistake. Thanks @akarnokd for getting involved in this.

I agree we can further optimize this. For example, I think we should look at the data structure from #1235

return;
} else {
newState = oldState.clear();
unsubscribe = new ArrayList<Subscription>(subscriptions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't clear the subscriptions, just copies it. You could just pass out the Set and replace it with null.

Set<Subscription> unsubscribe = null;
synchronized (this) {
    if (unsubscribed || subscriptions == null) {
        return;
    }
    unsubscribe = subscriptions;
    subscriptions = null;
}
unsubscribeFromAll(unsubscribe);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

@benjchristensen
Copy link
Member Author

Anyone have a better name than ChainedSubscription before this gets released and is permanent forever?

if (oldState.isUnsubscribed) {
return;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the composite is empty, this will never set the unsubscribed flag.

@akarnokd
Copy link
Member

SubscriptionList and SubscriptionSet perhaps.

@benjchristensen
Copy link
Member Author

I like SubscriptionSet

@benjchristensen
Copy link
Member Author

Do you have a different take on how to solve #1204 based on the findings I've posted there?

@headinthebox
Copy link
Contributor

What the difference with CompositeSubscription?

@akarnokd
Copy link
Member

For immediate results, using plain Set and List based composites is okay. We need to recheck the operators and use the appropriate one. For the schedulers, an MPSC-based queue still creates a lot of garbage; my SubscriptionQueue based on a ringbuffer avoids this issue at the expense of synchronization instead of atomics.

For even less allocation on the new composites, we would need to implement a more light-weight HashSet (e.g., a linear probing hashset, although we would lose the benefits from the Java 8 HashMap enhancements).

@cloudbees-pull-request-builder

RxJava-pull-requests #1179 SUCCESS
This pull request looks good

@Xorlev
Copy link

Xorlev commented May 29, 2014

Aside from the extra dependency, Disruptor seems like it would be a pretty decent model for the SubscriptionQueue. It would be interesting to see if it'd perform well non-contending cases vs. the MPSC queue.

@akarnokd
Copy link
Member

Disruptor's RingBuffer is bounded so you'd need to either set it to a decent size or parametrize the schedulers. In addition, it is prone to deadlock because the reader may put new tasks into the queue in case of a recursive schedule.

@benjchristensen
Copy link
Member Author

For even less allocation on the new composites, we would need to implement a more light-weight HashSet

Agreed, if we can come up with one that is better.

For the schedulers, an MPSC-based queue still creates a lot of garbage; my SubscriptionQueue based on a ringbuffer avoids this issue at the expense of synchronization instead of atomics.

Yes, I want to get those changes pulled in before we release.

For immediate results, using plain Set and List based composites is okay.

We can evaluate the MPSC queue to replace these before releasing.

We need to recheck the operators and use the appropriate one.

Yes, I only migrated the ones that directly injected a CompositeSubscription into Subscriber.

@benjchristensen
Copy link
Member Author

Talking with @headinthebox he was wondering if we should remove the remove/clear methods from CompositeSubcription and leave that the default, and have a more specific implementation for the few places needing remove functionality.

I'm averse to a breaking change of that kind, even though we're pre-1.0 as we're so late in the game that breaking changes are really painful.

What do you think? Anyone else have an opinion on this?

The two signatures are:

SubscriptionA {
 public synchronized boolean isUnsubscribed();
 public void add(final Subscription s)
 public void remove(final Subscription s)
 public void clear()
 public void unsubscribe()
}
SubscriptionB {
 public synchronized boolean isUnsubscribed();
 public void add(final Subscription s)
 public void unsubscribe()
}

The SubscriptionB signature is the one most people probably need when they think of CompositeSubscription.

@cloudbees-pull-request-builder

RxJava-pull-requests #1180 SUCCESS
This pull request looks good

- significant reduction in object allocations
- details on research available at ReactiveX#1204
@cloudbees-pull-request-builder

RxJava-pull-requests #1181 SUCCESS
This pull request looks good

@akarnokd
Copy link
Member

I never used clear and can't see a need for it. What I usually need is a remove variant that doesn't unsubscribe the subscription removed. Usually this comes into play when a scheduled action is finished and needs to remove itself from the tracking composite, or a multi-stage scheduling happens.

@abersnaze
Copy link
Contributor

Wouldn't the only use case SubscriptionB be for an entirely linear Observable chain (does not use merge, zip, groupBy, ...)?

@benjchristensen
Copy link
Member Author

Wouldn't the only use case SubscriptionB be for ...

Which is almost all the time. The merge operator for example will add a SubscriptionA to the SubscriptionB. This way the right data structure is used in the right places.

@benjchristensen
Copy link
Member Author

Should we deprecate remove() and clear() on CompositeSubscription in 0.18.x and leave something else to have the remove capability?

@benjchristensen
Copy link
Member Author

Merging ... then we can bike shed over naming before releasing 0.19.

benjchristensen added a commit that referenced this pull request May 29, 2014
…performance

Reduce Subscription Object Allocation
@benjchristensen benjchristensen merged commit 798fa7e into ReactiveX:master May 29, 2014
@benjchristensen benjchristensen deleted the composite-subscription-performance branch May 29, 2014 22:21
@benjchristensen
Copy link
Member Author

@akarnokd There are several things as discussed above that we can improve here, let's pick them up in separate pull requests and coordinate them with #1235

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants