Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed OutOfMemoryError with CPU scheduler in recursive mode. #643

Closed

Conversation

akarnokd
Copy link
Member

Found an issue in Rx.NET regarding an out-of-memory situation due to long chained subscriptions when using recursive scheduling.

The following test program crashes or just hangs indefinitely:

public class RunRecursiveScheduling {
    /**
     * Generates an observable sequence by iterating a state from an initial 
     * state until the condition returns false.
     */
    public static <TState, R> OnSubscribeFunc<R> generate(
            final TState initialState,
            final Func1<TState, Boolean> condition,
            final Func1<TState, TState> iterate,
            final Func1<TState, R> resultSelector,
            final Scheduler scheduler) {
        return new OnSubscribeFunc<R>() {
            @Override
            public Subscription onSubscribe(final Observer<? super R> observer) {
                return scheduler.schedule(initialState, new Func2<Scheduler, TState, Subscription>() {
                    @Override
                    public Subscription call(Scheduler s, TState state) {
                        boolean hasNext;
                        try {
                            hasNext = condition.call(state);
                        } catch (Throwable t) {
                            observer.onError(t);
                            return Subscriptions.empty();
                        }
                        if (hasNext) {
                            R result;
                            try {
                                result = resultSelector.call(state);
                            } catch (Throwable t) {
                                observer.onError(t);
                                return Subscriptions.empty();
                            }
                            observer.onNext(result);

                            TState nextState;
                            try {
                                nextState = iterate.call(state);
                            } catch (Throwable t) {
                                observer.onError(t);
                                return Subscriptions.empty();
                            }

                            return s.schedule(nextState, this);
                        }
                        observer.onCompleted();
                        return Subscriptions.empty();
                    }
                });
            }
        };
    }

    public static void main(String[] args) throws Exception {
        Observable<Integer> source = Observable.create(generate(
                0, a -> true, a -> a + 1, a -> a, Schedulers.threadPoolForComputation()));

        CountDownLatch latch = new CountDownLatch(1);
        Subscription s = source.subscribe(v -> { 
                    if (v % 100_000 == 0) {
                        System.out.println(v);
                    }
                    if (v >= 10_000_000) { 
                        latch.countDown(); 
                    } 
                }, 
                e -> { e.printStackTrace(); latch.countDown(); }, 
                latch::countDown);

        latch.await();

        System.out.println("Wait done.");

        s.unsubscribe();

        System.out.println("Unsubscribe done.");
    }
}

The issue lies in the fact that CPU scheduler (and perhaps the others) are not really reentrant, therefore, they create new composite subscriptions whenever a recursive scheduling happens. So instead of showing a simple subscription to the outside world, a chain of subscription is extended on every recursive schedule call.

A working solution is to have a different scheduler shown to the Func2<Scheduler, T, Subscription> than the actual CPU scheduler. This new scheduler, called ReentrantScheduler, maintains internal subscriptions which get replaced if a recursive scheduling is executed.

I found three issues with this new approach and the old tests:

  • In SchedulersTest.testRecursiveScheduler2, the logic relied on the expectation that the call on L338 is executed at least once if the outer subscription is unsubscribed. The new logic stops the schedule chain instantly, so it is very unlikely the call on L338 gets executed after this.
  • In OperationObserveOn.Observation tries to solve the underlying issue as well with its own subscription replacer logic. I haven't changed the operator as it might not work with non CPU scheduler after that.
  • The ReentrantSchehduler maintains two subscriptions. One for the result of the schedule calls, and one for the the DiscardableActions. The former subscriptions can be swapped out without unsubscribing the previous completed schedule, but I'm not sure about the DiscardableActions; if I unsubscribe them, tests hang because it basically cancels itself and no further task is executed. This might be a conceptual error in ReentrantScheduler; perhaps it should not return the entire composite on each schedule call, but rather return the content of the actionSub only.

@cloudbees-pull-request-builder

RxJava-pull-requests #573 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

whenever a recursive scheduling happens

That sounds like a simple issue, as an internal scheduler once complete should have no reference to anything else.

If CompositeSubscription is being held, it sounds like a use for SerialSubscription or MultipleAssignmentSubscription.

I have not yet grokked why this PR is changing so much when recursion already works fine on Schedulers.

@benjchristensen
Copy link
Member

Here is infinite recursion happening inside observeOn using inner schedulers correctly and without StackOverflow or OutOfMemory. Thus, I don't believe the actual interfaces are incorrect, just something we're doing wrong with the subscriptions inside the Executor implementation, similar to how when BlockingObservable is used it doesn't correctly cleanup (#632).

    protected static void testRecursion() throws InterruptedException {
        Observable<Long> source = Observable.interval(1, TimeUnit.MILLISECONDS).observeOn(Schedulers.newThread());
        final CountDownLatch latch = new CountDownLatch(1);
        source.subscribe(new Action1<Long>() {

            @Override
            public void call(Long l) {
                if (l % 1000 == 0) {
                    System.out.println(l);
                }
            }
        });
        // wait indefinitely
        latch.await();
    }
new thread EventLoopScheduler
0
1000
2000
3000
4000
5000
...
173000

@akarnokd
Copy link
Member Author

Recursion only works when the observeOn is used, but many operators use schedulers directly which are not reentrant so they will chain up the subscriptions.

* Simple scheduler API used by the ReentrantScheduler to
* communicate with the actual scheduler implementation.
*/
public interface ReentrantSchedulerHelper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels wrong that we have a "helper" that schedulers extend from. That implies that the Scheduler interface is wrong.

/cc @headinthebox

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed this as the scheduler working with a reentrant scheduler should provide some core scheduling operations without creating chained subscriptions through the standard API. But ExecutorScheduler could implement this privately so it doesn't show up in the signature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the default (Executor)Scheduler would schedule the stateless tasks without turning them into a recursive call, there were no need to use a helper:

/**
 * Schedules an action to be executed.
 * 
 * @param action
 *            action
 * @return a subscription to be able to unsubscribe from action.
 */
public Subscription schedule(final Action0 action) {
    return schedule(null, new Func2<Scheduler, Void, Subscription>() {
        @Override
        public Subscription call(Scheduler scheduler, Void state) {
            action.call();
            return Subscriptions.empty();
        }
    });
}

@akarnokd akarnokd mentioned this pull request Dec 20, 2013
@akarnokd
Copy link
Member Author

Closing: Revised in PR #648

@akarnokd akarnokd closed this Dec 20, 2013
@benjchristensen
Copy link
Member

This seems like it's a bug inside current Schedulers as recursion should work without memory leaks. Here is the histogram showing the leak:

JVM version is 24.45-b08
Iterating over heap. This may take a while...
Object Histogram:

num       #instances    #bytes  Class description
--------------------------------------------------------------------------
1:      1488649 83360752    java.lang.Object[]
2:      2976652 47626432    java.util.concurrent.atomic.AtomicReference
3:      1488325 47626400    rx.schedulers.DiscardableAction
4:      1488324 47626368    java.util.concurrent.FutureTask
5:      1488330 35719920    java.util.ArrayList
6:      1488324 35719776    rx.subscriptions.CompositeSubscription$State
7:      1488456 23815296    java.lang.Integer
8:      1488326 23813216    java.util.concurrent.atomic.AtomicBoolean
9:      1488326 23813216    rx.operators.SafeObservableSubscription
10:     1488325 23813200    rx.subscriptions.CompositeSubscription
11:     1488324 23813184    rx.subscriptions.Subscriptions$3
12:     7036    905920  * MethodKlass
13:     7036    813896  * ConstMethodKlass
14:     499 567648  * ConstantPoolKlass
15:     499 348832  * InstanceKlassKlass
16:     447 339168  * ConstantPoolCacheKlass
17:     2051    148592  char[]
18:     702 106856  byte[]
19:     569 68872   java.lang.Class
20:     830 52568   * System ObjArray
21:     2027    48648   java.lang.String
22:     769 44128   short[]
23:     124 40800   * MethodDataKlass
24:     785 31400   java.util.TreeMap$Entry
25:     53  28408   * ObjArrayKlassKlass
26:     138 9936    java.lang.reflect.Field
27:     218 6976    java.util.concurrent.ConcurrentHashMap$HashEntry
28:     192 6568    java.lang.String[]
29:     138 4416    java.util.HashMap$Entry
30:     8   4288    * TypeArrayKlassKlass
31:     178 4272    java.util.LinkedList$Node
32:     116 3712    java.util.Hashtable$Entry
33:     97  3104    java.util.LinkedList
34:     193 3088    java.lang.Object
35:     46  2944    java.net.URL
36:     30  2848    java.util.HashMap$Entry[]
37:     66  2704    java.util.concurrent.ConcurrentHashMap$HashEntry[]
38:     66  2640    java.util.concurrent.ConcurrentHashMap$Segment
39:     72  2304    java.util.concurrent.locks.ReentrantLock$NonfairSync
40:     11  2288    * KlassKlass
41:     38  1824    sun.util.locale.LocaleObjectCache$CacheEntry
42:     36  1728    java.util.HashMap
43:     5   1696    int[]
44:     36  1440    java.util.LinkedHashMap$Entry
45:     18  1296    java.lang.reflect.Constructor
46:     16  1280    java.util.WeakHashMap$Entry[]
47:     1   1040    java.lang.Integer[]
48:     26  1040    java.lang.ref.SoftReference
49:     6   992 java.util.Hashtable$Entry[]
50:     16  896 java.util.WeakHashMap
51:     21  840 java.lang.ref.Finalizer

I will dig in to this and get it resolved. We do not need new subscription or scheduler types to solve this.

@akarnokd akarnokd deleted the RecursiveSchedulingSubscrChainFix branch January 13, 2014 09:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants