Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishSubject's HashMap makes groupBy and such non-deterministic. #282

Closed
Treora opened this issue May 30, 2013 · 17 comments
Closed

PublishSubject's HashMap makes groupBy and such non-deterministic. #282

Treora opened this issue May 30, 2013 · 17 comments

Comments

@Treora
Copy link
Contributor

Treora commented May 30, 2013

In a few cases already I noticed unexpected and non-deterministic behaviour occurring when subscribing (directly or via a chain of observables) to a PublishSubject source. This happens when subscribing from within an onNext method that is directly or indirectly triggered by that same source. The newly subscribed observer may or may not directly receive the onNext call from the same event. An example where this is annoying:

connectToEventStream()
    .publish()
    .groupBy(event -> event.source).subscribe(eventStream -> {
        LogFile log = new LogFile(eventStream.getKey() + ".log");
        eventStream.subscribe(event -> log.write(event.getMessage()));
    });

In this example the first event of each source might be skipped and not be logged, but other times it will work fine. To me this seems undesired behaviour. There may be cases where it actually is preferable that the current item will be skipped when subscribing from an onNext method, but this happening unpredictably is never a good idea.

The cause of the unpredictability is the implementation of Map that is used in PublishSubject, which when iterating on the map's values sometimes includes new items:

Similarly, Iterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration.

And supposedly whether an item will be iterated over or not depends on the hashes of the subscriptions, thus totally unpredictable. A sensible option would be to use a different implementation of Map that does iterate over any items that are added during the iterating loop, but it looks like this implementation then has to be written or found first as java seems not to provide anything like this.

As a quick hack I added this code to the PublishSubject, but I considered this solution too ugly to be worth a pull request (commit a7fc861):

public void onNext(T args) {
    int observersSize;
    Set<Observer<T>> notifiedObservers = new HashSet<Observer<T>>();
    do {
        observersSize = observers.size();
        for (Observer<T> observer : observers.values()) {
            if (!notifiedObservers.contains(observer)) {
                observer.onNext(args);
                notifiedObservers.add(observer);
            }
        }
    } while (observers.size() > observersSize);
}
@benjchristensen
Copy link
Member

Short Version: It looks like a bug with groupBy not publish.

Long Version:

PublishSubject will send events through to whatever subscribers it currently has and not replay past events as new subscribers arrive. This is why the publish operator returns a ConnectedObservable that allows a developer to wait for all Observers to subscribe and then connect the source that is publishing.

The ConcurrentHashMap implementation of Map used by PublishSubject is the correct thread-safe implementation to be using. It allows concurrent modifications and each time values() is invoked (on each onNext call) it gives the current state of the data (all subscribers) in the map in a thread-safe manner. Thus it is thread-safe for the onNext call to continue being invoked while subscribers come and go, but it will naturally be a race condition as to whether the subscription occurs before-or-after the onNext if they are done concurrently.

As to the issue you're seeing I think it's not PublishSubject with the issue but the implementation of groupBy which seems to re-subscribe to the underlying Observable on each group that gets emitted. That would definitely not work well with publish since it would not receive the item previously emitted, only subsequent ones.

I have replicated the issue and see this in my output:

*** Subscribing to EventStream ***
============> PublishSubject onNext: Event => source: 0 message: Event-0  observers: 1
GroupedObservable Key: 0
============> PublishSubject onNext: Event => source: 1 message: Event-1  observers: 2
GroupedObservable Key: 1
Source: 1  Message: Event-1
============> PublishSubject onNext: Event => source: 0 message: Event-2  observers: 3

Note how the observers count increases as each group is triggered. The first observer is the groupBy and the following two are the observer for each of the 2 groups.

Removing the publish operator this is confirmed here:

*** Subscribing to EventStream ***
GroupedObservable Key: 0
*** Subscribing to EventStream ***
Source: 0  Message: Event-0
GroupedObservable Key: 1
*** Subscribing to EventStream ***

I don't have time tonight to figure out what needs to change with groupBy but from what I can tell this is definitely a bug.


Regarding the given sample code, since the publish operator is not going to publish anything unless connect is invoked on it so I assume that sample code is just representative as it will not actually emit any data.

The code would need to look more like this:

        ConnectableObservable<Event> es = connectToEventStream().publish();

        es.groupBy(event -> event.source).subscribe(eventStream -> {
            eventStream.subscribe(event -> System.out.println(event.message));
        });

        es.connect();

I also generally recommend not doing nested subscribes as the control of the subscriptions is lost.

        ConnectableObservable<Event> es = connectToEventStream().publish();

        es.groupBy(event -> event.source).mapMany(eventGroupedObservable -> {
            return eventGroupedObservable.map(event -> {
                return "Source: " + event.source + "  Message: " + event.message;
            });
        }).subscribe(outputMessage -> {
            System.out.println(outputMessage);
        });

        es.connect();

Are you using publish just to handle groupBy subscribing to it multiple times, or do you have multiple legit subscribers to the same eventstream?


Following is the full test case I built (in normal Java so it fits into the Java 6 project and not Java 8 so it is far more verbose):

package rx.subjects;

import static org.junit.Assert.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

public class TestPublishSubject {

    @Test
    public void testPublishCount() throws Exception {

        final AtomicInteger counter = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(1);
        int count = 100;

        ConnectableObservable<Event> es = connectToEventStream(count).publish();
//        Observable<Event> es = connectToEventStream(count);

        es.groupBy(new Func1<Event, Integer>() {

            @Override
            public Integer call(Event e) {
                return e.source;
            }
        }).mapMany(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {

            @Override
            public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
                System.out.println("GroupedObservable Key: " + eventGroupedObservable.getKey());

                return eventGroupedObservable.map(new Func1<Event, String>() {

                    @Override
                    public String call(Event event) {
                        return "Source: " + event.source + "  Message: " + event.message;
                    }
                });

            };
        }).subscribe(new Observer<String>() {

            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(Exception e) {
                e.printStackTrace();
                latch.countDown();
            }

            @Override
            public void onNext(String outputMessage) {
                System.out.println(outputMessage);
                counter.incrementAndGet();
            }
        });

        es.connect();

        latch.await(5000, TimeUnit.MILLISECONDS);
        assertEquals(count, counter.get());

    }

    public static Observable<Event> connectToEventStream(final int count) {
        return Observable.create(new Func1<Observer<Event>, Subscription>() {

            @Override
            public Subscription call(final Observer<Event> observer) {
                System.out.println("*** Subscribing to EventStream ***");
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        for (int i = 0; i < count; i++) {
                            Event e = new Event();
                            e.source = i % 2;
                            e.message = "Event-" + i;
                            observer.onNext(e);
                            try {
                                Thread.sleep(50);
                            } catch (Exception ex) {
                                // ignore
                            }
                        }
                    }

                }).start();
                return Subscriptions.empty();
            }

        });
    };

    public static class Event {
        int source;
        String message;

        @Override
        public String toString() {
            return "Event => source: " + source + " message: " + message;
        }
    }

}

@Treora
Copy link
Contributor Author

Treora commented May 31, 2013

Thanks for the elaborate reply. From your answer I draw the conclusion that both PublishSubject and groupBy contain an error.

PublishSubject
You say that on a call to onNext, PublishSubject should take its current list of subscribed observers and iterate over those. However your following assumption looks incorrect:

each time values() is invoked (on each onNext call) it gives the current state of the data (all subscribers) in the map in a thread-safe manner

ConcurrentHashMap.values however does not return a snapshot of the map's values:

The collection is backed by the map, so changes to the map are reflected in the collection, and vice-versa

And as quoted before, the iterator does not make guarantees about including or excluding items that are added to the map after creation of the iterator (from the same doc):

Similarly, Iterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration.

Those two properties combined allow PublishSubject.onNext to sometimes include observers that are added while iterating. And, at least in the implementation I am using (OpenJDK8), it does this unpredictably, presumably based on hash values.

I guess this bug should be fixed either by always including the newly added observers, or by always excluding them. You say they should be excluded. I think this does sound like the most intuitive behaviour, but maybe it would be more practical to opt for including them, as for example it would directly fix the problem with groupBy.
I do not know which behaviour would be desired in other cases, that means any case where one (possibly indirectly) subscribes to a PublishSubject as a reaction to an (indirect) onNext from that same PublishSubject (via nested subscribes or mapMany or whatever).
A reason to include them could be that you can always use skip(1) to exclude them. Actually the other way around can also be realised, by calling onNext on the new observer when subscribing it to the PublishSubject, though this looks a bit more hacky as you're calling onNext from a different place (and maybe even a different thread?).

groupBy
If you choose to exclude newly added observers, then there indeed also is a bug in groupBy. I do not know how to solve this if you would want groupBy to keep re-subscribing to the underlying source for each new group. A possibility would be that groupBy dispatches the incoming items to the appropriate self-produced observables, which would then behave like (or be) PublishSubjects.
As an example, what should this (rather contrived) code do?

randomIntSource().groupBy(x -> x%2).mapMany(subsource -> {
    Counter counter = new Counter();
    return subsource.map(x -> counter);
}).subscribe(counter -> counter.increment());

It would currently create a new random source for the even numbers and one for the odds, though my first expectation would be that it would allow to count the amount of evens and odds in a single random source.


About my example code in the first post, I indeed forgot the connect() call.

Are you using publish just to handle groupBy subscribing to it multiple times, or do you have multiple legit subscribers to the same eventstream?

That was just to keep the example short. In real code I usually have one observable that reads incoming events and then I publish() it to allow several observers to read the events, some of which may want to use groupBy.

Thanks for the tip to use mapMany, I had not used it before. When more group-dependent variables are used in the observer I guess mapMany becomes less convenient though, and nesting subscribes becomes the more legible way.

@benjchristensen
Copy link
Member

Why do you consider the following to be a bug, and how does this have anything to do with the groupBy issue?

Those two properties combined allow PublishSubject.onNext to sometimes include observers that are added while iterating.

This is done in a thread-safe manner as that is the point of ConcurrentHashMap and seems perfectly reasonable and correct for concurrent pub/sub behavior.

The nature of PublishSubject is that it will receive subscriptions concurrently with onNext invocations. Thus, as expected with concurrent pub/sub it is always a race as to whether a given observer will be part of a given onNext event. If the pub/sub always occurs between onNext boundaries then there will be no race.

If the pub/sub is not being concurrently modified then none of this applies.

Here is a flow showing concurrent subscriptions:

publish-subject

@benjchristensen
Copy link
Member

nesting subscribes becomes the more legible way

I would suggest you re-consider as you lose the subscription and it can result in memory leaks, observable sequences never closing and other such behavior.

You can read others saying the same thing here: http://www.introtorx.com/content/v1.0.10621.0/07_Aggregation.html

The code above would work, but it is not good practice to have these nested subscribe calls. We have lost control of the nested subscription, and it is hard to read. When you find yourself creating nested subscriptions, you should consider how to apply a better pattern.

@Treora
Copy link
Contributor Author

Treora commented May 31, 2013

Why do you consider the following to be a bug, and how does this have anything to do with the groupBy issue?

When one subscribes to the pubsub from a different thread it is indeed totally reasonable that he may or may not receive the item that is currently being handled. What I am talking about is the case where one subscribes to the pubsub from the same thread as where the pubsub onNext is being handled. This happens when, in the image, B.onNext(8) will subscribe a new observer D to the pubsub. The question is then whether this D will or will not receive the value 8. As this is all happening in a single thread, there is no need for this action to be unpredictable. The subscription of D is undoubtably happening after pubsub.onNext was called, because it was caused by it. That makes it weird that D only might receive 8 because there is no race condition. It either should or it should not include D in the set of obervers that receive 8, depending on which behaviour you want pubsub to have (as discussed in my last post).
To extend the image:
publishsubjectissue

The relation to groupBy:
What groupBy does is emitting observables that are descendents of its own source. When using groupBy on a pubsub, when one subscribes to these emitted observables (using nested subscribes or mapMany or whatever), the situation described above occurs (perhaps with some operators between B.onNext and pubsub.subscribe(D)). This results in not knowing whether or not the first item of the group will be received.

I hope my explanation is clear enough, just let me know if it is not or if you find some flaw in my thoughts.

@benjchristensen
Copy link
Member

That makes it weird that D only might receive 8 because there is no race condition.

Can you give me a test case when an onNext action re-subscribes to the same PublishSubject on the same thread and it behaves non-deterministically?

the situation described above occurs (perhaps with some operators between B.onNext and pubsub.subscribe(D)).

That is the bug with groupBy, it should not re-subscribe.

@benjchristensen
Copy link
Member

Please review these changes: #283

This unit test in particular was derived from this discussion: https://github.com/Netflix/RxJava/pull/283/files#L0R365

@Treora
Copy link
Contributor Author

Treora commented Jun 1, 2013

That commit looks like a good step on first impression, I will scrutinize it tomorrow or Monday.
I just read the Rx.Net implementation of groupBy which works in a similar way as your newly committed one, but I also still have to figure out if there are some bugs in there or if I just don't fully understand it yet.

I will also write an example for PublishSubject.

@benjchristensen
Copy link
Member

I replaced pull #283 with #284

@Treora
Copy link
Contributor Author

Treora commented Jun 3, 2013

Hi,
I did not have time to scrutinize your code today, I will do so tomorrow.

Here is an example that shows the unpredictable results of PublishSubject. Implemented once using nested subjects and once using mapMany. It does use lambda expressions, I hope that's not a problem for you.

@benjchristensen
Copy link
Member

Thanks for those code examples. I'm able to replicate the issue and working on a fix.

@benjchristensen
Copy link
Member

I updated PublishSubject to take snapshots of the subscribed observers before iterating so that it behaves deterministically and only emits value to observers already subscribed once onNext starts.

This makes the nested subscription behavior deterministic. What it was doing before was allowing new subscriptions from the nested subscribes to get into the iteration ... and that is based on the internal data structure and likely as you suggest the hashing of the object id (which is the only variant that changes between each run).

The unit test for this is at https://github.com/Netflix/RxJava/pull/288/files#L0R479

The specific fix for onNext is this line: https://github.com/Netflix/RxJava/pull/288/files#L0R162

@Treora
Copy link
Contributor Author

Treora commented Jun 5, 2013

Please review these changes: #283

I first looked into how the Rx.Net implementation of groupBy works (which took longer than I wanted because of some bugs in there, more on this below), and then read your committed code (#284) and tested it. I have four comments on your code:

  1. There was a small error in using the numGroupSubscriptions counter causing it to drop the parent subscription too eagerly. I added a unit test for it, and then fixed it in commit 22cd318. Maybe it still needs some locking though to prevent race conditions.
  2. Is there a reason to allow only one observer to listen to a group? I guess the alternative would be to use a PublishSubject for each GroupedObservable. Rx.Net also works this way, it wraps a Subject (the equivalent of PublishSubject) into a GroupedObservable object. No need to worry about the problem of PublishSubject omitting the current item, as it will not occur here because observer.onNext(gs) is finished before gs.onNext(value.value) is called. I did not make this change.
  3. The same question can be asked for GroupBy itself: it also does not allow subscribing more than once. I assume this is an error, and fixed this (commit fc89a79) by putting some fields and methods into the spawned Observer, instead of in the GroupBy class. I named this formerly anonymous class GroupByObserver. The GroupedObservables now regard this GroupByObserver instead of the GroupBy object as their parent. Perhaps it would be nice to create a RefCountSubscription class like Rx.Net has, to group the reference counting logic in there instead of scattering it around GroupBy and GroupByObserver.
  4. Should it maybe be disallowed to subscribe to a group after the primary observer (the one receiving the GroupedObservables) has been unsubscribed? See the thoughts on this further below.

Rx.Net
Actually it looks like Rx.Net suffers some problems with the subscriptions itself, so I spent a while trying to figure out how it should work. Could someone with a .NET development environment and more C# skills than I have perhaps test whether I really found some bugs here or if I am talking nonsense?
@headinthebox: Erik, what do you think about the groupBy issues described below?

  1. GroupBy creates a Sink (an Observer) on subscribe, which spawns GroupedObservables which keep references to the _refCountDisposable field in GroupBy, while instead this field should be in the Sink that spawned them. If I understand correctly, when subscribing two observers to one groupBy operator, this will lead to being unable to unsubscribe the first one. This is problem is similar to my third comment above.
  2. Apart from that, it looks like a subscription to a GroupBy Observable will keep emitting new GroupedObservables even when it has been unsubscribed from. The used RefCountDisposable is made to unsubscribe from the source only when the primary observable and all emitted observables have been unsubscribed from, which is fine but then the created Sink should check if it has been disposed of before it emits a new GroupedObservable.
    In essence, it needs a line like this:
    if (!_refCountDisposable.isPrimaryDisposed())
    before doing this:
    _observer.OnNext(group); (GroupBy.cs)
  3. A third quirk in Rx.Net is that whether one can still subscribe to a GroupedObservable can depend on if any other GroupedObservables are still having any subscribers. The RefCountDisposable disposes of its core disposable when both the primary observer and all emitted observables have unsubscribed. If one subscribes to a GroupedObservable hereafter, no items will be received anymore. However if some other subscription to some GroupedObservable of another group is still keeping the RefCountDisposable alive, the new subscription will receive items. This could lead to unexpected behaviour.

Behaviour of groups after unsubscribing the primary observer
A question to be answered here for both Rx.Net and RxJava is how the groupBy should behave after unsubscribing the primary observer. Should we still be able to subscribe to the GroupedObservables that were emitted earlier? If not, the mentioned third quirk in Rx.Net could be fixed by allowing to subscribe to a GroupedObservable only when the RefCountDisposable's _isPrimaryDisposed is false (again we need an isPrimaryDisposed() method). A similar thing can be done in RxJava by checking unsubscribeRequested when subscribing to a GroupedObservable.
If you do want to allow storing the GroupedObservables for later use (after the GroupByObservable has been disconnected from), the approach of groupBy forms a problem because then we can never know if we can unsubscribe, so it may be better to opt for the first type of behaviour. The most normal usage of groupBy is to subscribe directly anyway so the issue is probably not noticed often, but still it would be best if it does not produce weird results when it is used differently.

@benjchristensen
Copy link
Member

@zsxwing This is another if you have the interest that could use someone's attention and is rather important.

@zsxwing
Copy link
Member

zsxwing commented Oct 9, 2013

@benjchristensen I'll try to handle it.

@benjchristensen
Copy link
Member

Thank you @zsxwing

@benjchristensen
Copy link
Member

Closing this and it will be addressed via #570

rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
- changed to take snapshot of observers.values() before iterating in onNext/onError/onCompleted so that nested subscriptions that add to observers can't change the values() iteration
- single-threaded nested subscriptions are now deterministic
- multi-threaded subscriptions will no longer be allowed to race to get into an interating onNext/onError/onCompleted loop, they will always wait until the next
- also improved terminal state behavior when subscribing to a PublishSubject that has already received onError/onCompleted

ReactiveX#282
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
ReactiveX#282

Refactored to maintain a single subscription that propagates events to the correct child GroupedObservables.
jarroddrury pushed a commit to jarroddrury/RxJava that referenced this issue Apr 26, 2024
ReactiveX/RxJava#282

Refactored to maintain a single subscription that propagates events to the correct child GroupedObservables.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants