New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Parallel Operator #1673

Closed
benjchristensen opened this Issue Sep 6, 2014 · 10 comments

Comments

Projects
None yet
5 participants
@benjchristensen
Member

benjchristensen commented Sep 6, 2014

Almost every time I see the 'parallel' operator used it is being used incorrectly and misunderstood. This leads me to believe we should remove it and instead educate on how to use merge and flatMap correctly which is generally what is trying to be achieved in uses of parallel.

Anyone have good reason not to eliminate it?

/cc @headinthebox

@benjchristensen benjchristensen added this to the 1.0 milestone Sep 6, 2014

@jbripley

This comment has been minimized.

Show comment
Hide comment
@jbripley

jbripley Sep 6, 2014

Contributor

I'm in favor of removing it, as you say it's mostly used incorrectly and someone who wants to control parallelism can always use the ExecutorScheduler.

Contributor

jbripley commented Sep 6, 2014

I'm in favor of removing it, as you say it's mostly used incorrectly and someone who wants to control parallelism can always use the ExecutorScheduler.

@davidmoten

This comment has been minimized.

Show comment
Hide comment
@davidmoten

davidmoten Sep 7, 2014

Contributor

Thanks for raising this one, it's been on my list to mention.

A while back I used the parallel method for an HPC task and found it confusing. I expected to find Scala collections par method or equivalent but not to be found. For clarity's sake I later switched the code to use flatMap+onSubscribe to get the desired behaviour. I think the existing method tries to do too much (why get it to apply the sharding function when it could be done prior).

Could we simplify Observable.parallel so it just does:

Observable<T> parallel() {
    return flatMap( o -> o.subscribeOn(Schedulers.computation()));
}

and an overload to specify the scheduler:

Observable<T> parallel(Scheduler scheduler) {
  return flatMap( o-> o.subscribeOn(scheduler));
}
Contributor

davidmoten commented Sep 7, 2014

Thanks for raising this one, it's been on my list to mention.

A while back I used the parallel method for an HPC task and found it confusing. I expected to find Scala collections par method or equivalent but not to be found. For clarity's sake I later switched the code to use flatMap+onSubscribe to get the desired behaviour. I think the existing method tries to do too much (why get it to apply the sharding function when it could be done prior).

Could we simplify Observable.parallel so it just does:

Observable<T> parallel() {
    return flatMap( o -> o.subscribeOn(Schedulers.computation()));
}

and an overload to specify the scheduler:

Observable<T> parallel(Scheduler scheduler) {
  return flatMap( o-> o.subscribeOn(scheduler));
}
@benjchristensen

This comment has been minimized.

Show comment
Hide comment
@benjchristensen

benjchristensen Sep 8, 2014

Member

@davidmoten Those signatures don't take a function with the work to do while in parallel so aren't correct.

I suggest we remove them and allow a new RxJavaParallel project to explore alternatives and only once it is proven and matured to merge it into RxJava itself. This would include something like a ParallelObservable that allows map, flatMap, take, filter and other basic operators to execute in parallel without the normal serialized emission restriction.

Member

benjchristensen commented Sep 8, 2014

@davidmoten Those signatures don't take a function with the work to do while in parallel so aren't correct.

I suggest we remove them and allow a new RxJavaParallel project to explore alternatives and only once it is proven and matured to merge it into RxJava itself. This would include something like a ParallelObservable that allows map, flatMap, take, filter and other basic operators to execute in parallel without the normal serialized emission restriction.

@davidmoten

This comment has been minimized.

Show comment
Hide comment
@davidmoten

davidmoten Sep 8, 2014

Contributor

Oh yeah woops, the merge means the parallel is lost. So I wonder what the rx-java analogy to the Scala collections par method is?

Contributor

davidmoten commented Sep 8, 2014

Oh yeah woops, the merge means the parallel is lost. So I wonder what the rx-java analogy to the Scala collections par method is?

@davidmoten

This comment has been minimized.

Show comment
Hide comment
@davidmoten

davidmoten Sep 8, 2014

Contributor

I suppose it's likely to be

Observable<Observable<T>> parallel() {
    return map( o -> just(o).subscribeOn(Schedulers.computation()));
}

I wonder if that's of use?

Contributor

davidmoten commented Sep 8, 2014

I suppose it's likely to be

Observable<Observable<T>> parallel() {
    return map( o -> just(o).subscribeOn(Schedulers.computation()));
}

I wonder if that's of use?

@davidmoten

This comment has been minimized.

Show comment
Hide comment
@davidmoten

davidmoten Sep 8, 2014

Contributor

I just had a look at Scala's par and it looks like it returns a special Parallel instance of the collection in question. So the analogy would be

ParallelObservable<T> parallel(Scheduler scheduler) {
    return new ParallelObservable<T>(this, scheduler);
}

where ParallelObservable<T> is really Observable<Observable<T>> under the covers but you can interact with it as though it is Observable<T>. I imagine that ParallelObservable.subscribe would perform the flatten before normal subscription.

Blimey I've got reading problems, Ben suggested exactly this. Ta Ben.

Contributor

davidmoten commented Sep 8, 2014

I just had a look at Scala's par and it looks like it returns a special Parallel instance of the collection in question. So the analogy would be

ParallelObservable<T> parallel(Scheduler scheduler) {
    return new ParallelObservable<T>(this, scheduler);
}

where ParallelObservable<T> is really Observable<Observable<T>> under the covers but you can interact with it as though it is Observable<T>. I imagine that ParallelObservable.subscribe would perform the flatten before normal subscription.

Blimey I've got reading problems, Ben suggested exactly this. Ta Ben.

@benjchristensen benjchristensen changed the title from Parallel operator? to Remove Parallel Operator Sep 23, 2014

@benjchristensen

This comment has been minimized.

Show comment
Hide comment
@benjchristensen

benjchristensen Sep 23, 2014

Member

Work on ParallelObservable will be done in https://github.com/ReactiveX/RxJavaParallel

Member

benjchristensen commented Sep 23, 2014

Work on ParallelObservable will be done in https://github.com/ReactiveX/RxJavaParallel

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Oct 2, 2014

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Oct 2, 2014

@benjchristensen

This comment has been minimized.

Show comment
Hide comment
@benjchristensen

benjchristensen Oct 2, 2014

Member

This has been completed and will be released in 0.20.5 and 1.0 RC4.

Anyone looking for a replacement to the simple "run things in parallel" use case, you likely want this:

streamOfItems.flatMap(item -> {
   doStuffWithItem(item).subscribeOn(Schedulers.io());
});

The doStuffWithItem needs to return an Observable obviously ... but the point is kick off your work for each item inside flatMap using subscribeOn to make it async, or with a function that already makes the calls be async.

Member

benjchristensen commented Oct 2, 2014

This has been completed and will be released in 0.20.5 and 1.0 RC4.

Anyone looking for a replacement to the simple "run things in parallel" use case, you likely want this:

streamOfItems.flatMap(item -> {
   doStuffWithItem(item).subscribeOn(Schedulers.io());
});

The doStuffWithItem needs to return an Observable obviously ... but the point is kick off your work for each item inside flatMap using subscribeOn to make it async, or with a function that already makes the calls be async.

@rishi-anand

This comment has been minimized.

Show comment
Hide comment
@rishi-anand

rishi-anand Sep 15, 2017

Hi, @benjchristensen , @davidmoten , @jbripley , I agree with using flatMap(). I have 1 question, flatMap will do the operation and merge it back to main thread and onNext() will be called. What if one Observable ends with Observable.error() in that case Observable.error() will be called in main thread? If yes, then main thread will shut down and what will happen to other Observables running in parallel? If no, then result from other Observable will be triggered in onNext?

Correct me If I am wrong as I have just started with RxJava.

Hi, @benjchristensen , @davidmoten , @jbripley , I agree with using flatMap(). I have 1 question, flatMap will do the operation and merge it back to main thread and onNext() will be called. What if one Observable ends with Observable.error() in that case Observable.error() will be called in main thread? If yes, then main thread will shut down and what will happen to other Observables running in parallel? If no, then result from other Observable will be triggered in onNext?

Correct me If I am wrong as I have just started with RxJava.

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Sep 15, 2017

Member

@rishi-anand please post your question on StackOverflow with details on what you have already implemented.

Member

akarnokd commented Sep 15, 2017

@rishi-anand please post your question on StackOverflow with details on what you have already implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment