Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect Operator #701

Merged
merged 1 commit into from
Dec 27, 2013
Merged

Conversation

benjchristensen
Copy link
Member

Added with @headinthebox while reviewing Java 8 Streams #678

benjchristensen added a commit that referenced this pull request Dec 27, 2013
@benjchristensen benjchristensen merged commit 4d57677 into ReactiveX:master Dec 27, 2013
@benjchristensen benjchristensen deleted the collect branch December 27, 2013 21:58
@cloudbees-pull-request-builder

RxJava-pull-requests #618 SUCCESS
This pull request looks good

@akarnokd
Copy link
Member

Not exactly how Stream works:

    <R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

and

<R, A> R collect(Collector<? super T, A, R> collector);

where

public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
}

The main difference is that, the Steam and my version "exits" the stream/observable.

The second is that it doesn't let the initial state be per subscriber. So if I want to collect into an ArrayList, multiple subscribers will collect into the same ArrayList.

@benjchristensen
Copy link
Member Author

The second is that it doesn't let the initial state be per subscriber. So if I want to collect into an ArrayList, multiple subscribers will collect into the same ArrayList.

The reduce and scan operator would have the same problem then, correct?

@benjchristensen
Copy link
Member Author

Not exactly how Stream works

Understood. It's not meant to. Stream is seeking to achieve something that works in both sequential and parallel execution, Observable only ever emits items sequentially.

The parallel operator is the only thing in RxJava that currently "enables" concurrent execution.

The valid discussion would be whether we want Observable.toParallel() that returns a ParallelObservable with different overloads of certain operators. Thus far as @headinthebox and I have discussed it there has not been a valid reason for this complication.

@benjchristensen
Copy link
Member Author

Wouldn't this cause dependency problems?

What dependency problems?

@akarnokd
Copy link
Member

The reduce and scan operator would have the same problem then, correct?

That's true, but these are used to transfer values which are "stateless" such as Integer and Double. Of course, one could just create a new ArrayList in reduce on each function call and be that the new state, but this collect doesn't allow to do that. So basically its a trivial wrapping around a Func2 and reduce and as such, might not worth adding IMO.

Understood. It's not meant to. Stream is seeking to achieve something that works in both sequential and parallel execution, Observable only ever emits items sequentially.

I wasn't referring to the parallel capability of the stream but to two things:

  • You don't end up with a Stream after the collection but a direct value of R.
  • You can specify a per-consumer state object via function callback. Now Stream is single consumer only, but Observable is not.

What dependency problems?

Dependency problem: a reference to jUnit in Observable?

import static org.junit.Assert.*

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants