Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy behaviour onCompleted #2320

Closed
Widar91 opened this issue Jan 3, 2015 · 9 comments
Closed

GroupBy behaviour onCompleted #2320

Widar91 opened this issue Jan 3, 2015 · 9 comments

Comments

@Widar91
Copy link

Widar91 commented Jan 3, 2015

Hi everyone, I was trying out the following code:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .groupBy(_ % 2 == 0)
      .subscribe(
        (v) => println("onNext " + v),
        (e) => println("onError " + e),
        () => println("onCompleted")
      )

As expected this outputs 2 tuples (k, obs), but the stream is never completed.

onNext (false,rx.lang.scala.JavaConversions$$anon$3@64bfbc86)
onNext (true,rx.lang.scala.JavaConversions$$anon$3@7e0b0338)

Is this behavior intended, in particular in the case in which the GroupedObservables created by the operator are ignored (i.e. not subscribed to)? If so, what is the reason?

Thank you!

@davidmoten
Copy link
Collaborator

Looks desirable behaviour to me. Could you give more details of a use case
that ignores the grouped observables?
On 3 Jan 2015 13:47, "Eddy Bertoluzzo" notifications@github.com wrote:

Hi everyone, I was trying out the following code:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.groupBy(_ % 2 == 0)
.subscribe(
(v) => println("onNext " + v),
(e) => println("onError " + e),
() => println("onCompleted")
)

As expected this outputs 2 tuples (k, obs), but the stream is never
completed.

onNext (false,rx.lang.scala.JavaConversions$$anon$3@64bfbc86)
onNext (true,rx.lang.scala.JavaConversions$$anon$3@7e0b0338)

Is this behavior intended, in particular in the case in which the
GroupedObservables created by the operator are ignored (i.e. not subscribed
to)? If so, what is the reason?

Thank you!


Reply to this email directly or view it on GitHub
#2320.

@benjchristensen
Copy link
Member

The GroupedObservables can not be ignored. See http://reactivex.io/RxJava/javadoc/rx/Observable.html#groupBy(rx.functions.Func1)

Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like Observable.take(int)(0) to them.

If you want to ignore a GroupedObservable then you need to unsubscribe it, such as with take(0).

Or just filter out all elements for that group before it goes through the groupBy operator.

If this wasn't the case then a stream could onComplete before all the children groups were completed asynchronously on other threads. This is why each GroupedObservable must be dealt with and the onComplete can not be propagated through until all children GroupedObservables have finished (either they have each terminated with onCompleted or onError) or have been unsubscribed.

@benjchristensen
Copy link
Member

You can also read more about the "time gap" issue that the RxJava groupBy handles: #844

@Widar91
Copy link
Author

Widar91 commented Jan 5, 2015

You can also read more about the "time gap" issue that the RxJava groupBy handles: #844

This is exactly what I wanted to understand, thank you for your help!

@Widar91 Widar91 closed this as completed Jan 5, 2015
@benjchristensen
Copy link
Member

Great, glad that helped. The groupBy implementation definitely has tradeoffs and was not trivial. It took us a few tries!

@mikea
Copy link

mikea commented Feb 28, 2015

While I understand the tradeoff, I disagree with it. I prefer groupBy() emitting observables that can be ignored and/or processed independently. Here's a simple function that I wrote: https://gist.github.com/mikea/6493bdccad356297d471

I'm sure it is full of holes from rx-devs perspective, but it works for me.

@djensen47
Copy link

observables that can be ignored and/or processed independently

I like this idea, any chance of either a new operator or a version of GroupBy that can handle this?

@mikea Is your groupBy operator available in a library anywhere?

@davidmoten
Copy link
Collaborator

@mikea glad to hear it works for you but there are holes in what you've got there. Without looking at your logic in any detail for starters never hold a lock when emitting (synchronized methods/blocks). Backpressure support usually requires some complexity as well and doesn't appear to be present Use .unsafeSubscribe() within a Transformer/Operator and unsubscription doesn't look like it will work properly. I'd suggest thinking a bit outside the box to get what you want with existing operators. Without any detailed consideration of this use case (very limited time till my bubba wakes up) remember you can do things like materialize() to play with terminal events if needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants