New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.0 Design: Naming #2787

Closed
akarnokd opened this Issue Feb 28, 2015 · 25 comments

Comments

Projects
None yet
4 participants
@akarnokd
Copy link
Member

akarnokd commented Feb 28, 2015

Since we are aiming at Java 9 and j.u.c.Flow as the base API, we have the opportunity to use Java-native functional interfaces instead of our Func and Action interfaces, plus rename other components.

I propose the following changes:

  • Observable -> Flowable
    • Plus: no more name conflict with j.u.Observable or RxJava 1.0's Observable
  • XXXSubject -> XXXProcessor
  • XXXSubscription -> XXXDisposable
  • Func0 -> j.u.c.f.Supplier
  • Func1 -> j.u.c.f.Function
  • Func1<T, Boolean> -> j.u.c.f.Predicate
  • Func2 -> j.u.c.f.BiFunction
  • Func3..FuncN -> rx2.functions.TriFunction or rx2.functions.Function3 etc.
  • Action0 -> Runnable
  • Action1 -> j.u.c.f.Consumer
  • Action2 -> j.u.c.f.BiConsumer
  • Action3..ActionN -> rx2.functions.TriConsumer or rx2.functions.Consumer3 etc.

@akarnokd akarnokd added the Discussion label Feb 28, 2015

@akarnokd akarnokd added this to the 2.0 milestone Mar 2, 2015

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Mar 5, 2015

I personally am not a fan of Flow or Flowable. The Observable type is part of the Reactive Extensions brand and domain knowledge, so I don't think it's good to move away from that name.

Similar with Subject. I think that can implement Processor but doesn't need to stop being a Subject.

RxJava/1 Subscription does indeed need to become Disposable.

The various Java 8 functions are a tricky one. I don't like how confusing they are. We named ours in a way that they sort correctly in docs, they are more descriptive, and we also support arities beyond 3.

I think we should first explore whether the Func* and Action* interfaces can just be refactored to implement those interfaces (with apply instead of call) for example. I'm not convinced of the value of using the java.util.function interfaces, particularly when we're targeting Java 8 where lambdas mean it should be rare to concretely implement one of them, it is just the shape of the lambda. We could move Action to Consumer but we retain Consumer0, Consumer1, Consumer2 etc. Same with Func.

  • How would we deal with Func4-Func9 and Action4-Action9?
  • What is the value of adopting the Java 8 types directly instead of just extending them?
  • API design and discoverability is important. I find the Java 8 names to be highly non-discoverable.
@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Mar 5, 2015

Similar with Subject. I think that can implement Processor but doesn't need to stop being a Subject
I had to introduce ProcessorEx just to add our extra methods, so I guess Subject stays.

Java 8 functional interfaces indicate the purpose not just the shape, but it is a matter of taste. If lambdas are used, it doesn't really matter except when the user has some Runnable r = () -> { ... } instance already and still has to convert it to Action0. JIT will probably inline the call properly, but if the call site ends up being megamorphic, we'll still get an extra wrapper object. Defining Action0 on top of Runnable this way:

interface Action0 extends Runnable { default void run() { call(); } void call(); }

This helps if one wants to send an Action0 to a executor but not the other way around.

Still, there could be name conflicts between Func0/Action0 and the higher arity functions when compared to JDK types: Supplier.get vs Function.apply and Runnable.run vs Consumer.accept. By itself, Callable already has method name conflict with Supplier.

For discoverability, we may define Func0 on top of Supplier and so on, but the method naming will be inconsistent across arities.

One thing I'd like to emphasize is the use of Predicate instead of Func1<T, Boolean>. I've read somewhere that the latter boxing/unboxing is not always optimized away by HotSpot and we get a reference equality check everywhere instead of just a CPU flag test.

It is hard to decide: an independent set of FuncX, PredicateX and ActionX gives us more control, but RxJava 2.0 can be accused of having its own functional types instead of standard Java ones. Having such interfaces name their single method call would certainly help with stateful implementations (as it does in RxJava 1.0).

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 21, 2015

I'm thinking about this right now ... wondering if Flowable is the right thing as opposed to Observable.

I still think this:

The Observable type is part of the Reactive Extensions brand and domain knowledge, so I don't think it's good to move away from that name.

However, what I'm wondering is whether we should have both Observable and Flowable. If we had both, Flowable would implemented ReactiveStreams Publisher and Observable would not.

-> Observable -> no backpressure -> hot data sources
-> Flowable -> backpressure -> cold data sources, or hot ones converted to Flowable with flow control strategy

Is this something we should consider and pursue?

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Aug 21, 2015

The name conflict is a smaller issue compared to the re-learning of the entry class name and scrub all the historical knowledge. I don't believe there will be many that use both versions in the same project and those who switch have to revisit every use place due to the package/API changes anyway.

Let's stick to Observable

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 21, 2015

What about different types to represent hot and cold?

@headinthebox

This comment has been minimized.

Copy link
Contributor

headinthebox commented Aug 21, 2015

-> Observable -> no backpressure -> hot data sources
-> Flowable -> backpressure -> cold data sources, or hot ones converted to Flowable with flow control strategy

That is what I have been proposing for a while, and what I am pursuing with RxMobile, since it only needs to deal with semi realtime hot streams that have no intrinsic support for back-pressure.

Semantically, conceptually, and scenario-wise Observable and Flowable are very different. In the current RxJava the lines have gotten blurred. This is a perfect place where having two types makes the distinction crystal clear. You are dealing with one kind of stream, and you need to take care of certain constraints, or you are dealing with a whole other kind of streams and you have different concerns.

It is not Flowable either or Observable, but Observable and also Flowable.

Like any collection, both types support the same kind of operators like map, flatMap, filter, ... That is where you re-use your knowledge.

KISS FTW

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 27, 2015

Been thinking more about this. I conceptually like the idea of them being separate, and Observable not supporting backpressure, but I don't know how to solve the flatMap issue without an unbounded buffer, and I really don't think we should have an unbounded buffer unless a developer asks for it.

In Rx.Net merge/flatMap doesn't have an unbounded buffer – because it blocks emissions when contention occurs by synchronizing (at least as of a couple years ago).

In RxJava, where we're targeting environments with event loops like Netty, merge can't be implemented using synchronization. To be non-blocking merge must allow asynchronous enqueuing of items when contention occurs. This means either (a) unbounded buffers, or (b) bounded buffers and backpressure.

@headinthebox I know that you argue that unbounded buffers are not always bad, and I agree that they aren't always bad. I think they're just fine when a developer opts in to it. I do however think they are bad when hidden, and don't feel comfortable reverting the decisions of 1.x to allow hidden unbounded buffers again.

So, "KISS FTW" sounds nice, but it doesn't address the real issue of not-simple-to-debug silent latency and memory bloat of unbounded buffers. I have experienced this with RxJava 0.x pre-backpressure in production systems.

So, if we do have an Observable without backpressure, how do we address merge/flatMap such that we don't have an unbounded buffer?

@headinthebox

This comment has been minimized.

Copy link
Contributor

headinthebox commented Aug 27, 2015

Let me state upfront that I do not understand why Netty excludes using synchronization, but that is my total ignorance on how Netty works.

However, I think that you can (roughly) achieve the same effect as non-blocking merge by doing an observeOn with the trampoline scheduler afterwards (if you squint at the queue draining code you will see it looks very much like a trampoline).

If that is true, then the use of a buffer becomes explicit due to the observeOn and it obvious in your code that you have to be careful, and no worse than when you use onBackPressureBuffer.

(BTW with async code there is no such thing as "simple-to-debug" ;-)

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 27, 2015

I do not understand why Netty excludes using synchronization

It's not just Netty. Anything using event loops, including Schedulers.computation(), Aeron, CouchBase, most if not all memcached clients I'm aware of, Akka, and virtually anything else using NIO.

Synchronization of 2 event loops together would result in 1 thread being blocked. That is an absolute no for RxJava to cause.

observeOn

I focused on merge/flatMap, butobserveOnandzipare the other 2 major operators that have unbounded buffers and are problems.mergecould indeed be implemented likeobserveOn`, but that again is an unbounded buffer without backpressure.

(BTW with async code there is no such thing as "simple-to-debug" ;-)

There are gradients of "simplicity", it's not black-or-white. I'd rather encapsulate as much complexity in the implementation as possible so operating it is simpler. I'd also rather require a little more thought at time of creating an Observable than when consuming it or during system operation.

I think the biggest mistake in RxJava v1 after adding backpressure was continuing to let the raw Observable.create be the primary way of creating an Observable. It gave all the power tools without guiding towards the common "hot" and "cold" usage scenarios.

Unless we don't offer flatMap/merge/zip/observeOn operators (which is unlikely), I am not comfortable having an Observable type with unbounded buffers hidden within operators. Thus, it leads me back to having a single type (either Observable or Flowable), and a createHot method (or something with a similar name) that requires a choice of what to do if the consumer is too slow. By default it is to fail, but other strategies can be drop, sample, throttle, buffer (bounded or unbounded), etc.

cc @stevegury and @tmontgomery who I consulted with on this topic in case either of them want to weigh in.

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 27, 2015

I've opened a new issue to move this conversation about bounded/unbounded: #3213

@headinthebox

This comment has been minimized.

Copy link
Contributor

headinthebox commented Aug 27, 2015

If this is so important for you, then we should only have Flowable and always do back pressure, and not call it Observable anymore.

[I remain of the opinion that it is absolutely OK that it is the developer's responsibility to make sure their app runs under the required memory & time constrains, and that given we are in a Turing complete world, unbounded buffers are no different than any other unbounded use of memory, or time. But obviously I am alone in this viewpoint, so will happily surrender to the majority vote.]

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 28, 2015

@headinthebox Is this perspective driven by theoretical purity about Observable being "purely reactive", or is there a functional or usability concern with the RxJava v1 Observable type that drives this?

@headinthebox

This comment has been minimized.

Copy link
Contributor

headinthebox commented Aug 28, 2015

  • I feel that v1 have become somewhat of a liger, where I want either a lion, or a tiger. But not at the same time.
  • You argue that lion's are too dangerous, so then we should just have tigers, which seems what most people want anyway.
@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 28, 2015

That seems to be a purely theoretical and subjective argument then, not one based on functionality. Is there functionality or behavior that causes problems?

@tilal6991

This comment has been minimized.

Copy link
Contributor

tilal6991 commented Aug 28, 2015

From a design standpoint, I feel that separating out the backpressure/hot stream concept is a huge point. One of the most confusing things to get your head round in Rx is the concept of backpressure since the distinction between them is very blurred in both the docs and the actual implementation.

From a technical perspective, I can see it being a burden to maintain possibly two concurrent sets of operators but I'm approaching this purely from a design standpoint.

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 28, 2015

It was mentioned to me that it may help to restate what I'm seeking for here as it could be interpreted as a theoretical and polarized argument. Good point, so I'll try. It is somewhat long.


I am open to having 2 types in RxJava to represent bounded/unbounded, hot/cold, Observable/Flowable.

However, I do want solid reasons for the decisions, as we spent over a year debating many proposals in the 0.x phase before landing on an Observable with reactive-pull backpressure and choosing to adopt it as the single type. The community, and @headinthebox, were involved in that design and decision.

I and others then spent many months of debate formalizing the pattern in Reactive Streams, demonstrating across many companies and projects that a async stream type with reactive-pull backpressure is valid and wanted. Those types are now proposed for inclusion in JDK 9 (j.u.c.Flow), demonstrating fairly broad agreement.

In the time since while using the reactive-pull Observable type, I have not found a streaming use case it does not work well for, including "hot" streams. In fact, I find the backpressure signal to be great on "hot" streams, as it forces me to consider how to behave when production is faster than slower, and I have the tools to apply a strategy.

That said, I also recognize that in many environments, an Observable with unbounded buffers works just fine when only working with small finite streams of data, particularly request/response environments (which can use the Single) type.

I prefer not making users have to choose between two virtually identical types (Observable and Flowable) unless there are important functional, usability or performance benefits to doing so.

If we do have two types, here is how I would envision them co-existing:

Observable (no backpressure, unbounded operators)

Creation is as expected, emit however one wishes:

create(s -> {
  onNext(t)
})

Cold generation though should be discouraged, and should return a Flowable. For example:

Observable.from(iterable) -> Flowable

That case should just return a Flowable as that is what it is. Or we just don't have from or just overloads, which would make more sense, since Observable is intended to be "hot", otherwise the type doesn't help communicate the distinction.

The merge/flatMap and observeOn cases would be unbounded with their associated dangers.

I suggest that zip(Observable, Observable) not exist, but allow observable.zipWith(Iterable) or Observable.zipWith(Flowable)

An Observable could become a Flowable with a backpressure strategy:

observable.toFlowable(Strategy.DROP).
Flowable (backpressure, bounded operators)

The Flowable type could be used for both "hot" or "cold", but "hot" Flowable instances would always have a strategy for dealing with a backpressure signal: drop, buffer, whatever.

Flowable.createHot(s -> {
   s.onNext(...)
}, Strategy.FAIL)

// cold sync generator
Flowable.createSync(...)

// cold async generator
Flowable.createAsync(...)
API Design

How would these two types be used when producing public APIs in libraries? I think it will be more confusing than today where this is just one type unless it is very clear that Observable represents "hot" data. If we are clear in that distinction then it could work well.

/**
* Observable signals a "hot" stream where you must account for flow control yourself or risk unbounded latency and memory growth.
*/
Observable<T> getStuff();

/**
* Flowable signals a "cold" or "hot" stream that will adapt its flow control, or emit an error if it overwhelms your consumption. 
*/
Flowable<T> getStuff2();
The Question

What functional, performance or usability items warrant making people choose between 2 very similar types, when Flowable can do all of the behavior?

Is the "confusion" of backpressure in v1 Observable just because we did a poor job of API design on Observable.create? Or is it more fundamental?

I believe it is just a usability issue of Observable.create that makes people have to think about backpressure too much. I think that issue should effectively be hidden from almost everyone.

I think it's possible to have a single type that is easy to use for all of these use cases ("hot", "cold", backpressured, not-backpressured), and it's purely an implementation detail of the operators.

I think it would be far more confusing to choose when to expose an Observable vs a Flowable than to just have a single type with proper "creation methods" that enable simply creating the correct "hot" or "cold" behavior.

Then again, perhaps it is worth communicating via a type that something is "hot": #2785

Community ... please provide your insight.

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Aug 29, 2015

Do we have the capacity to maintain 4 versions at the same time (1.x, 2.x backpressure, 2.x unbounded, Single)?

I wouldn't use the name Flowable unless it extends j.u.c.Flow and I'd stick to Observable for the 2.x backpressurre version.

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 29, 2015

We can likely support Java 9 with 2.x without giving up Java 8 compatibility with multi-release jar files: http://openjdk.java.net/jeps/238 I intend on implementing Flow and Publisher (if the versioned jar feature works).

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 29, 2015

The capacity to maintain multiple variants is a very good question, and valid for decision making. Particularly when all use cases are addressed by 1 implementation.

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Aug 29, 2015

Though, most operators would behave the same on both. We could probably make most of this just a public API division with most of the implementation sharing. If it makes sense to have two types.

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Sep 4, 2015

Here is an attempt at separating out behavior between Observable and Flowable:

Observable : async push without backpressure
Flowable : async pull, and push/pull with backpressure via batch requesting

Below I separate out the operators (such as temporal flow control) and creators/generators as applicable to each type.

I'd appreciate feedback on this direction.

Observable

This is for creating a true "push" Observable without backpressure

  • Observable.create(Observer o) :: Observable

The subscribe behavior should have take an Observer without backpressure

  • subscribe(Observer<? super T>)

A key question is whether Observable should implement the Publisher interface for seemless interop between Flowable and Observable. To be true to the types, Observable should not implement Publisher and we would need to call obs.toFlowable(strategy) when interacting with Flowables subscribe(Subscriber<? super T>) // if we implement this, then we need a default OnBackpressureStrategy that would throw MissingBackpressureException

This would exist to convert to a Flowable

  • toFlowable(OnBackpressureStrategy strategy) :: Flowable

The following should all either not exist on Observable, or return Flowable instead

  • create(Publisher)
  • fromArray(T...)
  • fromIterable(Iterable<? extends T>)
  • just(T)
  • range(int, int)

These would have unbounded buffering-

  • merge(int, int, Iterable<? extends Publisher<? extends T>>)
  • merge(int, int, Publisher<? extends T>...)
  • merge(int, Iterable<? extends Publisher<? extends T>>)
  • merge(int, Publisher<? extends T>...)
  • merge(Iterable<? extends Publisher<? extends T>>)
  • merge(Publisher<? extends Publisher<? extends T>>)
  • merge(Publisher<? extends Publisher<? extends T>>, int)
  • merge(Publisher<? extends T>...)
  • mergeDelayError(boolean, Iterable<? extends Publisher<? extends T>>)
  • mergeDelayError(int, int, Iterable<? extends Publisher<? extends T>>)
  • mergeDelayError(int, int, Publisher<? extends T>...)
  • mergeDelayError(int, Iterable<? extends Publisher<? extends T>>)
  • mergeDelayError(int, Publisher<? extends T>...)
  • mergeDelayError(Publisher<? extends Publisher<? extends T>>)
  • mergeDelayError(Publisher<? extends Publisher<? extends T>>, int)
  • mergeDelayError(Publisher<? extends T>...)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, boolean)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, boolean, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, boolean, int, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, boolean)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, boolean, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, boolean, int, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, int)
  • observeOn(Scheduler)
  • observeOn(Scheduler, boolean)
  • observeOn(Scheduler, boolean, int)

I don't think zip should exist on Observable as it is dangerous, and honestly should be a Flowable

  • zip(Publisher<? extends Publisher<? extends T>>, Function<Object[], R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, BiFunction<? super T1, ? super T2, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, BiFunction<? super T1, ? super T2, ? extends R>, boolean)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, BiFunction<? super T1, ? super T2, ? extends R>, boolean, int)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Function3<? super T1, ? super T2, ? super T3, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Function5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Function6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Publisher<? extends T7>, Function7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Publisher<? extends T7>, Publisher<? extends T8>, Function8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Publisher<? extends T7>, Publisher<? extends T8>, Publisher<? extends T9>, Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R>)
  • zipArray(Function<? super Object[], ? extends R>, boolean, int, Publisher<? extends T>...)
  • zipIterable(Function<? super Object[], ? extends R>, boolean, int, Iterable<? extends Publisher<? extends T>>)
  • zip(Iterable<? extends Publisher<? extends T>>, Function<? super Object[], ? extends R>)

we could however have something like this

  • zipWith(Flowable f)
  • zipWith(Iterable i)

these would go away

  • onBackpressureBuffer()
  • onBackpressureBuffer(boolean)
  • onBackpressureBuffer(int)
  • onBackpressureBuffer(int, boolean)
  • onBackpressureBuffer(int, boolean, boolean)
  • onBackpressureBuffer(int, boolean, boolean, Runnable)
  • onBackpressureBuffer(int, Runnable)
  • onBackpressureDrop()
  • onBackpressureDrop(Consumer<? super T>)
  • onBackpressureLatest()

these should return a Single instead of Observable

  • toList() :: Single<List>
  • toList(int) :: Single<List>
  • toList(Supplier) :: Single<List>
  • toMap(Function<? super T, ? extends K>) :: Single<Map<K, T>>
  • toMap(Function<? super T, ? extends K>, Function<? super T, ? extends V>) :: Single<List<T
  • toMap(Function<? super T, ? extends K>, Function<? super T, ? extends V>, Supplier<? extends Map<K, V>>) :: Single<Map<K, T>>
  • toMultimap(Function<? super T, ? extends K>) :: Single<Map<K, T>>
  • toMultimap(Function<? super T, ? extends K>, Function<? super T, ? extends V>) :: Single<Map<K, T>>
  • toMultimap(Function<? super T, ? extends K>, Function<? super T, ? extends V>, Supplier<? extends Collection<? super V>>) :: Single<Map<K, T>>
  • toSortedList() :: Single<List>
  • toSortedList(Comparator<? super T>) :: Single<List>
  • toSortedList(Comparator<? super T>, int) :: Single<List>
  • toSortedList(int) :: Single<List>

Most of the other operators make sense to leave as they are, but I've stopped itemizing them for brevity. Perhaps we can continue using Publisher as the type so Observable and Flowable can compose together (but we need to answer what that means as I presented above)

Flowable

This is for creating a "push-pull" Flowable with backpressure

  • Flowable.create(Publisher o)

We should also have generators for the common (and hard to implement) cases (these are being designed in 1.x right now)

  • Flowable.createSync(... functions here ...)
  • Flowable.createAsyncSync(... functions here ...)
    • the other expected factory methods for creation include-
  • from(Publisher o)
  • fromArray(T...)
  • fromIterable(Iterable<? extends T>)
  • just(T)

The subscribe behavior should be the Reactive Stream & juc.Flow Subscriber with backpressure

  • subscribe(Subscriber<? super T>)

but we can also have the Observer overload for the default and common behavior of request(Long.MAX_VALUE)

  • subscribe(Observer<? super T>)

The following should all either not exist on Observable, or return Flowable instead

  • create(Publisher)
  • fromArray(T...)
  • fromIterable(Iterable<? extends T>)
  • just(T)
  • range(int, int)

These would have bounded buffering-

  • merge(int, int, Iterable<? extends Publisher<? extends T>>)
  • merge(int, int, Publisher<? extends T>...)
  • merge(int, Iterable<? extends Publisher<? extends T>>)
  • merge(int, Publisher<? extends T>...)
  • merge(Iterable<? extends Publisher<? extends T>>)
  • merge(Publisher<? extends Publisher<? extends T>>)
  • merge(Publisher<? extends Publisher<? extends T>>, int)
  • merge(Publisher<? extends T>...)
  • mergeDelayError(boolean, Iterable<? extends Publisher<? extends T>>)
  • mergeDelayError(int, int, Iterable<? extends Publisher<? extends T>>)
  • mergeDelayError(int, int, Publisher<? extends T>...)
  • mergeDelayError(int, Iterable<? extends Publisher<? extends T>>)
  • mergeDelayError(int, Publisher<? extends T>...)
  • mergeDelayError(Publisher<? extends Publisher<? extends T>>)
  • mergeDelayError(Publisher<? extends Publisher<? extends T>>, int)
  • mergeDelayError(Publisher<? extends T>...)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, boolean)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, boolean, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, boolean, int, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends R>>, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, boolean)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, boolean, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, boolean, int, int)
  • flatMap(Function<? super T, ? extends Publisher<? extends U>>, BiFunction<? super T, ? super U, ? extends R>, int)
  • observeOn(Scheduler)
  • observeOn(Scheduler, boolean)
  • observeOn(Scheduler, boolean, int)
  • zip(Publisher<? extends Publisher<? extends T>>, Function<Object[], R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, BiFunction<? super T1, ? super T2, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, BiFunction<? super T1, ? super T2, ? extends R>, boolean)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, BiFunction<? super T1, ? super T2, ? extends R>, boolean, int)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Function3<? super T1, ? super T2, ? super T3, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Function5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Function6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Publisher<? extends T7>, Function7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Publisher<? extends T7>, Publisher<? extends T8>, Function8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R>)
  • zip(Publisher<? extends T1>, Publisher<? extends T2>, Publisher<? extends T3>, Publisher<? extends T4>, Publisher<? extends T5>, Publisher<? extends T6>, Publisher<? extends T7>, Publisher<? extends T8>, Publisher<? extends T9>, Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R>)
  • zipArray(Function<? super Object[], ? extends R>, boolean, int, Publisher<? extends T>...)
  • zipIterable(Function<? super Object[], ? extends R>, boolean, int, Iterable<? extends Publisher<? extends T>>)
  • zip(Iterable<? extends Publisher<? extends T>>, Function<? super Object[], ? extends R>)

these should return a Single instead of Flowable

  • toList() :: Single<List>
  • toList(int) :: Single<List>
  • toList(Supplier) :: Single<List>
  • toMap(Function<? super T, ? extends K>) :: Single<Map<K, T>>
  • toMap(Function<? super T, ? extends K>, Function<? super T, ? extends V>) :: Single<List<T
  • toMap(Function<? super T, ? extends K>, Function<? super T, ? extends V>, Supplier<? extends Map<K, V>>) :: Single<Map<K, T>>
  • toMultimap(Function<? super T, ? extends K>) :: Single<Map<K, T>>
  • toMultimap(Function<? super T, ? extends K>, Function<? super T, ? extends V>) :: Single<Map<K, T>>
  • toMultimap(Function<? super T, ? extends K>, Function<? super T, ? extends V>, Supplier<? extends Collection<? super V>>) :: Single<Map<K, T>>
  • toSortedList() :: Single<List>
  • toSortedList(Comparator<? super T>) :: Single<List>
  • toSortedList(Comparator<? super T>, int) :: Single<List>
  • toSortedList(int) :: Single<List>

I question any of the following existing on Flowable since they are temporal in nature which is better suited to Observable. Generally if time is involved, then Flowable style flow control does not work, except as a signal to choose a strategy which would work when composing via the Observable.toFlowable(OnBackpressureStrategy) choice.

  • interval(long, long, TimeUnit)
  • interval(long, long, TimeUnit, Scheduler)
  • interval(long, TimeUnit)
  • interval(long, TimeUnit, Scheduler)
  • intervalRange(long, long, long, long, TimeUnit)
  • intervalRange(long, long, long, long, TimeUnit, Scheduler)
  • buffer(long, long, TimeUnit)
  • buffer(long, long, TimeUnit, Scheduler)
  • buffer(long, long, TimeUnit, Scheduler, Supplier)
  • buffer(long, TimeUnit)
  • buffer(long, TimeUnit, int)
  • buffer(long, TimeUnit, int, Scheduler)
  • buffer(long, TimeUnit, int, Scheduler, Supplier, boolean)
  • buffer(long, TimeUnit, Scheduler)
  • debounce(Function<? super T, ? extends Publisher>)
  • debounce(long, TimeUnit)
  • debounce(long, TimeUnit, Scheduler)
  • sample(long, TimeUnit)
  • sample(long, TimeUnit, Scheduler)
  • skip(long, TimeUnit, Scheduler)
  • skipLast(long, TimeUnit)
  • skipLast(long, TimeUnit, boolean)
  • skipLast(long, TimeUnit, Scheduler)
  • skipLast(long, TimeUnit, Scheduler, boolean)
  • skipLast(long, TimeUnit, Scheduler, boolean, int)
  • take(long, TimeUnit, Scheduler)
  • takeLast(long, long, TimeUnit)
  • takeLast(long, long, TimeUnit, Scheduler)
  • takeLast(long, long, TimeUnit, Scheduler, boolean, int)
  • takeLast(long, TimeUnit)
  • takeLast(long, TimeUnit, boolean)
  • takeLast(long, TimeUnit, Scheduler)
  • takeLast(long, TimeUnit, Scheduler, boolean)
  • takeLast(long, TimeUnit, Scheduler, boolean, int)
  • takeLastBuffer(int, long, TimeUnit)
  • takeLastBuffer(int, long, TimeUnit, Scheduler)
  • takeLastBuffer(long, TimeUnit)
  • takeLastBuffer(long, TimeUnit, Scheduler)
  • throttleFirst(long, TimeUnit)
  • throttleFirst(long, TimeUnit, Scheduler)
  • throttleLast(long, TimeUnit)
  • throttleLast(long, TimeUnit, Scheduler)
  • throttleWithTimeout(long, TimeUnit)
  • throttleWithTimeout(long, TimeUnit, Scheduler)
  • timeInterval()
  • timeInterval(Scheduler)
  • timeInterval(TimeUnit)
  • timeInterval(TimeUnit, Scheduler)
  • window(long, long, TimeUnit)
  • window(long, long, TimeUnit, Scheduler)
  • window(long, long, TimeUnit, Scheduler, int)
  • window(long, TimeUnit)
  • window(long, TimeUnit, long)
  • window(long, TimeUnit, long, boolean)
  • window(long, TimeUnit, Scheduler)
  • window(long, TimeUnit, Scheduler, long)
  • window(long, TimeUnit, Scheduler, long, boolean)
  • window(long, TimeUnit, Scheduler, long, boolean, int)

Most of the other operators make sense to leave as they are, but I've stopped itemizing them for brevity.

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Sep 4, 2015

Additionally, I think Single<T> has been a good addition, so it should continue to exist.

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Sep 4, 2015

I'd estimate a 75% of the operators require 3 implementations as backpressure awareness is subtle in many operators and guaranteed single value by itself allows different optimizations. I guess we still want high performance for Flowable, Observable and Single, right? The fact that there isn't one Subscriber type for all of them drags in individual XXXOperator<R, T> implementations (interface FlowableOperator, SingleOperator, ObservableOperator).

My question is that will these be prerequisites to a release or can we publish RS backpressure supporting Flowable and do the rest in 2.1?

@benjchristensen

This comment has been minimized.

Copy link
Member

benjchristensen commented Sep 5, 2015

I'd estimate a 75% of the operators require 3 implementations as backpressure awareness is subtle in many operators and guaranteed single value by itself allows different optimizations. I guess we still want high performance for Flowable, Observable and Single, right?

Yes we will want high performance for each. But internal implementations of operators is definitely something that can evolve and change over time after we release 2.0. The Flowable operators are the ones that can be leveraged across all 3 by injecting unbounded buffering and using Long.MAX_VALUE. It won't achieve the possible performance gains for Observable and Single, but those can come as we have time. Additionally, the perf and implementation differences will be most significant in merge, observeOn, and a handful of others. The rest are mostly synchronous, without buffers, and will have little gain if any from the change behind the MAX_VALUE fast-paths we already do (such as filter, map, take).

To get the public APIs right with Observable, Flowable, and Single, I suggest we use the Flowable operator implementations for them all and focus on API, correct behavior, composition, and stability. Then as we go we will end up with custom operators as needed (merge will happen sooner for example).

My question is that will these be prerequisites to a release or can we publish RS backpressure supporting Flowable and do the rest in 2.1?

Getting the relationship between the types is very important and will affect the public API of each over, thus I view this is an essential thing to get done before 2.0 is released since we can't change APIs of the Flowable type in 2.1, and as my examples above show, it will definitely cause Flowable to change.

@akarnokd akarnokd added this to the 2.0 RC 1 milestone Jun 22, 2016

@akarnokd akarnokd removed this from the 2.0 milestone Jun 22, 2016

@akarnokd

This comment has been minimized.

Copy link
Member

akarnokd commented Aug 24, 2016

I think the naming has come to a reasonable standing.

@akarnokd akarnokd closed this Aug 24, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment