Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental Proposal of rx.Task #2641

Closed
wants to merge 1 commit into from

Conversation

benjchristensen
Copy link
Member

Adds rx.Task as a "scalar Observable" for representing work with a single return value.

See #1594 rx.Future/Task

This provides a type similar to Future in that it represents a scalar unit of work, but it is lazy like an Observable and many Tasks can be combined into an Observable stream. Note how Task.zip returns Task<R> whereas Task.merge returns Observable<R>.

NOTE: This is for experimentation and feedback at this time.

Items requiring review and work that I'm particularly aware of:

  • naming of OnExecute
  • naming of TaskObserver (this one in particular I don't like)
  • design and implementation of Task.Promise
  • should the public lift use the Observable.Operator or should that only be for internal reuse?
  • should we have a public lift that uses a Task.Operator?
  • the Task.toObservable implementation right now is efficient but will likely break something so it likely needs to change to use subscribe
  • implementation of this merge variant: Task<T> merge(final Task<? extends Task<? extends T>> source)
  • several operators currently just wrap as Observable to reuse existing operators ... is that okay performance wise?
  • Javadocs

Examples of using this class:

import rx.Observable;
import rx.Task;
import rx.Task.Promise;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Task<String> t1 = Task.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Task<Integer> t2 = Task.just(1);

        // synchronous error
        Task<String> error = Task.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Tasks for request/response like a Future
        getData(1).subscribe(System.out::println);
        getDataUsingPromise(2).subscribe(System.out::println);

        // combining Tasks into another Task
        Task<String> zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Tasks into an Observable stream
        Observable<String> merged = Task.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Task.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Task<String> getData(int arg) {
        return Task.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

    /**
     * Example of an async scalar execution using a Task.Promise
     * <p>
     * This shows how an eager (hot) process would work like using a Future.
     *
     * @param arg
     * @return
     */
    public static Task<String> getDataUsingPromise(int arg) {
        Task.Promise<String> p = Promise.create();

        new Thread(() -> {
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            // deliver value
                p.onSuccess("Data=" + arg);
            }).start();

        return p.getTask();
    }
}

Adds `rx.Task` as a "scalar Observable" for representing work with a single return value.

See ReactiveX#1594 rx.Future/Task

This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Task`s can be combined into an `Observable` stream. Note how `Task.zip` returns `Task<R>` whereas `Task.merge` returns `Observable<R>`.

NOTE: This is for experimentation and feedback at this time.

Items requiring review and work that I'm particularly aware of:

- naming of `OnExecute`
- naming of `TaskObserver` (this one in particular I don't like)
- design and implementation of `Task.Promise`
- should the public `lift` use the `Observable.Operator` or should that only be for internal reuse?
- should we have a public `lift` that uses a `Task.Operator`?
- the `Task.toObservable` implementation right now is efficient but will likely break something so it likely needs to change to use `subscribe`
- implementation of this merge variant: `Task<T> merge(final Task<? extends Task<? extends T>> source)`
- several operators currently just wrap as `Observable` to reuse existing operators ... is that okay performance wise?
- Javadocs

Examples of using this class:

```java
import rx.Observable;
import rx.Task;
import rx.Task.Promise;

public class TaskExamples {

    public static void main(String... args) {
        // scalar synchronous value
        Task<String> t1 = Task.create(t -> {
            t.onSuccess("Hello World!");
        });

        // scalar synchronous value using helper method
        Task<Integer> t2 = Task.just(1);

        // synchronous error
        Task<String> error = Task.create(t -> {
            t.onError(new RuntimeException("failed!"));
        });

        // executing
        t1.subscribe(System.out::println);
        t2.subscribe(System.out::println);
        error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));

        // scalar Tasks for request/response like a Future
        getData(1).subscribe(System.out::println);
        getDataUsingPromise(2).subscribe(System.out::println);

        // combining Tasks into another Task
        Task<String> zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b);

        // combining Tasks into an Observable stream
        Observable<String> merged = Task.merge(t1, t2.map(String::valueOf), getData(3));
        Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));

        zipped.subscribe(v -> System.out.println("zipped => " + v));
        merged.subscribe(v -> System.out.println("merged => " + v));
        mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
    }

    /**
     * Example of an async scalar execution using Task.create
     * <p>
     * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
     *
     * @param arg
     * @return
     */
    public static Task<String> getData(int arg) {
        return Task.create(s -> {
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    s.onSuccess("Data=" + arg);
                }).start();
        });
    }

    /**
     * Example of an async scalar execution using a Task.Promise
     * <p>
     * This shows how an eager (hot) process would work like using a Future.
     *
     * @param arg
     * @return
     */
    public static Task<String> getDataUsingPromise(int arg) {
        Task.Promise<String> p = Promise.create();

        new Thread(() -> {
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            // deliver value
                p.onSuccess("Data=" + arg);
            }).start();

        return p.getTask();
    }
}
```
@anaisbetts
Copy link

/cc @jspahrsummers who prototyped this as well in ReactiveCocoa and Found It Lacking(tm)

@benjchristensen
Copy link
Member Author

@paulcbetts Thanks for getting involved. What is the concern of pursuing this and what didn't work about the prototype in ReactiveCocoa?

This is attempting to address one of the most common requests and complaints about using Reactive Extensions – that everything is modeled as a vector when some things naturally fit a scalar representation, such as request/response IO.

This approach does not try and force behavior of a Future into Rx but retains the stream behavior when composing, laziness, virtual time, Schedulers, etc.

@benjchristensen
Copy link
Member Author

@benjchristensen
Copy link
Member Author

By the way, this is being pursued because unlike Rx.Net which had Task and Observable capable of working with each other, the JVM does not have an equivalent.

It is understood that without async/await on the JVM that the full power of the Rx.Net Task is not available. This is trying to work with what is available on the JVM to allow better expression of scalars versus vectors while allowing scalars to compose into vectors.

@anaisbetts
Copy link

@benjchristensen I don't have any concerns personally, I just remember JSS initially being all for this but then after trying to build stuff with it, was frustrated. Wanted to potentially save you a similar journey if you end up agreeing

@anaisbetts
Copy link

Wish I remember the details of why, lemme see if I can find that PR

@jspahrsummers
Copy link

My exploration of similar things was spread over several different RAC PRs, so it'd be hard to find one succinct thing to link to.

To summarize, the main problem I ran into is that it's really hard to compose single-item streams with variable-item streams. You end up doing a lot of type juggling, in which the professed safety (of knowing it's a single item) can easily be lost.

Adding operators to automatically lift Tasks into Observables when combined is an interesting idea that I didn't explore, so it may help with the composition problems.

Fundamentally, though, I'd ask (rhetorically): does this actually simplify anything, or does it merely make it easier for users who are familiar with futures/promises and want to reuse that knowledge? YMMV, but I strongly prefer to design for simplicity and minimizing the number of different concepts in play, because familiarity can always be learned, but complexity is harder to avoid.

@benjchristensen
Copy link
Member Author

Thanks @jspahrsummers for the feedback. Your reasoning and experience fits our thinking over the past 2 years on this topic and is why for so long this idea has been deferred.

The evolution of thinking after using RxJava for a couple years is that libraries that expose public APIs can benefit from being able to communicate if something is scalar or vector – despite the "functional" argument that a scalar is just a "list of one". We are able to choose to use T or List<T> when creating a synchronous API, so why not for async as well? In other words, why should the use of RxJava exclude communicating when we do know something is a scalar, which is a very common use case, and results in a simpler mental model?

When composition occurs that results in multiple values then the type changes to an Observable. Not all composition and transformation requires this though – zip, amb, map, flatMap all can return Task. Others like merge will return an Observable.

Another consideration is that APIs that expose data access are generally separate from the consuming code that composes them, and it is these data access APIs where the scalar Task is most effective at providing communicating via a static type such as:

Task<T> getData()

versus

Observable<T> getData()

does this actually simplify anything

There is one thing I think it simplifies that is more than just making it easier due to familiarity. It communicates when a user does NOT need to think about multiple values, and that does indeed simplify code.

A Task will always behave in one of 3 ways:

  1. respond with an error
  2. never respond
  3. respond with a success

An Observable on the other hand may:

  1. response with an error
  2. never respond
  3. respond successfully with no data and terminate
  4. respond successfully with a single value and terminate
  5. respond successfully with multiple values and terminate
  6. respond successfully with one or more values and never terminate (waiting for more data)

The mental model for these extra possibilities do definitely increase the complexity and are a big part of the pushback for using RxJava that I have seen for request/response models.

These more complex models do still end up happening as soon as composition occurs, but now that composition is declared by the consumer, and the consumer knows the contract of the producer to be simpler if it is a Task.

For example, in "service layer" type code we could have getters like this:

Task<T> getDataA(arg);
Task<R> getDataB(arg);
Observable<R> getDataC(arg);

This is now simpler to understand because the types narrow the scope for A and B so we know we don't need to consider multi-value or infinite cases (which generally need to be explained in the javadoc). The C type in contrast clearly indicates that it wants to support multiple values otherwise it would have just been a Task.

hard to compose single-item streams with variable-item streams

Would support of composition like the following appease that issue?

        Task<Integer> h = Task.just(1);
        Observable<Integer> o = Observable.just(2, 3, 4);
        h.mergeWith(o).subscribe(System.out::println);

familiarity can always be learned, but complexity is harder to avoid

My thinking has evolved to think that trying to force everything to be a vector results in increased complexity of a different kind that can be addressed by a slight increase in complexity of the RxJava APIs (2 types instead of 1). Static types communicate common information that simplifies the mental model and constrains available states and operations that can be applied. This seems worth the increase in API surface area.

@jspahrsummers
Copy link

I violently agree with capturing as much information as possible in the type system (thus our ongoing attempts to capture the hot/cold distinction in types for RAC), so I don't have any objections there.

Would support of composition like the following appease that issue?

Yeah, methods like these definitely help a lot.

But how would a method like Catch work in this system? For example, I may have a single-item stream that should change length if an error occurs.

It's not that these peculiar cases are unsolvable, of course. My concern is simply be that there are so many weird composition cases like this that the end result will just be confusion.

I think in some, subliminal sense, this could also be a disincentive toward using Observables—like a desire to keep everything in one “world”—which might lead users to miss out on the expressive power of stream-based programming.

Ultimately, though, this is just speculation on my part, and you know your target audience much better than I do. I don't want to discourage something that works and that you believe to be the right design decision. 😄

@mttkay
Copy link
Contributor

mttkay commented Feb 10, 2015

The mental model for these extra possibilities do definitely increase the complexity and are a big part of the pushback for using RxJava that I have seen for request/response models.

That is actually a pretty strong argument that I missed. We did have these cases whenever we would emit records from a database through Rx. Here the problem was that for the receiver it was unintuitive to deal with the "successful query, but nothing found" case, as this resulted (as per the duality with iterators, which our storage library is built upon) in a sequence that would simply complete and exit.

What we did to work around this was to convert the sequence with toList, since then we could test for "empty list" vs "non-empty list" to distinguish the two success cases.

So, I agree that Task in this case works in favor of the developer. I'm just worried that now developers constantly have to make a decision about whether an asynchronous sequence is exposed as an Observable or a Task. How difficult will it be to generalize my call from Task to Observable without totally rewriting it? (It might be simple, but not having worked with Task it's something I'm concerned about, since it looks like we're now wielding two different concepts that may or may not be interchangeable.)

@benjchristensen
Copy link
Member Author

Thanks both @mttkay and @jspahrsummers for your input on this. Very helpful.

concern is simply be that there are so many weird composition cases like this that the end result will just be confusion.

I agree with this concern, which is why I value feedback and consider this purely an experiment at this time and want to try and make the code work and see if it's good or not.

would a method like Catch work in this system

I don't see why it wouldn't since Task will not inherit from Observable and will specifically add only the operators that make sense for it. So it can have things like onErrorReturn(T), onErrorResume(Task<T>) or onErrorResume(Observable<T>) on the Task as options for handling errors.

The ability to design the Task API exactly for scalar values and interop with Observable is a key benefit of creating a Task type as opposed to trying to interop with Future or CompletableFuture.

this could also be a disincentive toward using Observables

Yes it likely will, but I don't necessarily see that as a bad thing in the same way that we don't consider a method signature of T getData() to be bad versus List<T> getData(), as long as the use of Task allows interop and composition into an Observable.

What we did to work around this was to convert the sequence with toList

This is exactly the type of stuff we shouldn't have to do and is far more awkward. An Observable<List<T>> kind of defeats the point of an Observable stream.

unintuitive to deal with the "successful query, but nothing found" case

How would you want this to work? In a scalar model, would you treat this as an error, or would you have a Response<T> object that can be empty, like an Option<T>/Maybe<T> type? For example, would it make more sense to be something like Task<Maybe<T>> or is it truly Task<T> with an error if not found?

worried that now developers constantly have to make a decision about whether an asynchronous sequence is exposed as an Observable or a Task

It seems to me that this should be an easy thing to distinguish. It's either always single-valued, or it's multi-valued.

How difficult will it be to generalize my call from Task to Observable without totally rewriting it?

Can you provide more information on what you mean with this question?

@mttkay
Copy link
Contributor

mttkay commented Feb 12, 2015

How would you want this to work? In a scalar model, would you treat this as an error

I don't think treating this as an error works well for us, since error is a terminal state. From our app point of view, it is not really an error anyway. It simply means there was nothing to satisfy your query, which is a perfectly reasonable "result". Modeling this as Option/Either would be more suitable. I struggle a bit with where those types would live though. I did not want to make our storage library pull in all of Guava just to have an Optional type; yet this seems to be the commonly used implementation in Java land? Adding a custom Option type to our library might then clash with apps that already use Guava's Option. Did you have similar situations in RxJava where the question around lending types from other libraries versus shipping your own came up?
(This is ultimately re why we chose toList()/isEmpty over using Optional, for better or worse.)

Can you provide more information on what you mean with this question?

I guess what I'm asking is: will there be specialized classes such as Task specific Observers that only work with tasks? What is the degree of interchangeability between Task and Observable?

If I would have to go back to my client code and touch a lot of code in the presentation layer because a sequence changed from N to 1, then that would be a smell to me. Right now, I can go to my storage objects and completely alter the number of emissions, order of emission, or even rate of emission without touching anything in the observing layers. So I guess what I'm asking is, if I switch an object emitting something from Observable to Task, what amount of change will trickle up through the system because of that?

@benjchristensen
Copy link
Member Author

I did not want to make our storage library pull in all of Guava just to have an Optional type

I share this view.

Modeling this as Option/Either would be more suitable

Since the apps I work on don't generally have a good standard for this we very often end up modeling optional states like these individually. For example, things that involve user authentication or search results will have a container type that is returned so that we don't use exceptions as part of the control flow.

For example:

Observable<SearchResult<T>> search(String query);

SearchResult {
   boolean hasResults();
   Observable<T> results();
}

will there be specialized classes such as Task specific Observers that only work with tasks?

Don't know, hence this discussion. In this proposal I do have a TaskObserver in the event someone wants just the onSuccess/onError semantics, but the default behavior of subscribe takes the normal multi-valued Subscriber based on the assumption that composition and interop is common use.

See this for execute(TaskObserver to): https://github.com/benjchristensen/RxJava/blob/cdf7ff9ce6ae5347dfa261a101283c7d43bd8cc6/src/main/java/rx/Task.java#L1500

What is the degree of interchangeability between Task and Observable?

My intent is for Task to naturally be composed into Observable, so the Task class would have Observable in the return types and method signatures. An Observable does not have many places where it would become a Task, only things like single or last. Those are already committed to so won't change, but it wouldn't be hard to have a toTask() method for that one use case, similar to toBlocking().toFuture().

If I would have to go back to my client code and touch a lot of code in the presentation layer because a sequence changed from N to 1, then that would be a smell to me

This would never be required. If a codebase or team is okay treating everything as sequences then Observable usage as right now is totally okay. All of the Netflix API is done that way and has been for years. This proposal to add Task is not because it's not possible to treat everything as a sequence, it's about providing static typing to communicate scalar semantics when an API or team is better served by that, without losing laziness, virtual time and parameterized scheduling and most importantly the composition capabilities of Observable sequences.

I can go to my storage objects and completely alter the number of emissions, order of emission, or even rate of emission without touching anything in the observing layers

That may be, but it doesn't mean the consumers are ready for it. This is an implicit data contract that virtually every codebase I've ever seen struggles with. If they used to get one item and now receive two, will their code work? I've seen production outages caused by UIs assuming 3 elements from a list and receiving 2 or 4. The Observable or Task classes don't obviate this challenge, but Task does make the common scalar use case part of the published static API for communicating when one-and-only-one response is expected.

if I switch an object emitting something from Observable to Task, what amount of change will trickle up through the system because of that?

Putting aside the static typing change which would require a recompile, the Task type will have less functions that can be applied to it. For example, scan, reduce and temporal flow control operators such as window and buffer don't make sense on Task. Operators that make sense for scalar responses though should be the same.

@akarnokd
Copy link
Member

Other than reusing internal operators of RxJava, does this rx.Task have to be part of core RxJava instead of being a separate project?

@benjchristensen
Copy link
Member Author

If it's going to exist I think it should exist here in core RxJava since it is a foundational type.

@tomlokhorst
Copy link

Since @benjchristensen asked me to weigh in on this, I'll post about my experiences here.

Some backstory: At our company Q42, we're a team of 4 developers simultaneously working on an Android and iOS app for a client. None of us had worked with RxJava before, and we had only a bit of experience with Rx and Tasks for .NET. We first developed the iOS app in Swift, using a promises library (which I wrote). The app only does simple network calls, and for the few places where we needed something reactive we used CoreData and NSNotifications.

When we started developing the Android app, we used RxJava, because that was easily integrated with Retrofit. While we did customise the UI for Android, we tried to keep the architecture of the code as similar as possible to the iOS codebase.

However we kept running in to issues. Network calls not happening because we didn't subscribe. Or happing twice (because we subscribed twice). We forgot to attach error handlers, or didn't cache the result of a flatMap. All these issues are of course solvable, and we did solve them, but it felt like we kept doing the same things over and over again.

Specifically, since we were just porting Swift code, all we wanted (and needed) was the simpeler promise semantics.

For that reason, we finally decided to just write a Promise wrapper around an Observable. This enforces promise semantics that are similar to the promises we use on iOS. It does things like immediately subscribe, to ensure the network call gets kicked off and cache the result. This is by no means code I would suggest others to use, it is very specific to our use case. But we are quite happy with it.

In summary, for our use case we didn't need the full power of Observables, and we build something around it to make our client code both simpler and easier to understand. Unfortunately, I don't have enough experience with RxJava to judge if adding a Task to RxJava itself would be a good or a bad idea for the library and its user base.

@benjchristensen
Copy link
Member Author

Thanks @tomlokhorst for sharing your usage experience.

@benjchristensen
Copy link
Member Author

ping @headinthebox for your input on this

@cy6erGn0m
Copy link

Task.create looks like simply Observable.create. Also Task is an additional concept that requires much new additional code (and documentation) and it's not obvious what is a reason to have Task?

@JakeWharton
Copy link
Member

It's a formalization of scalar observables (further enforced by the type
system) rather than having such a common use-case being completely implicit.

On Mon, Mar 16, 2015 at 9:41 PM Sergey Mashkov notifications@github.com
wrote:

Task.create looks like simply Observable.create. Also Task is an
additional concept that requires much new additional code (and
documentation) and it's not obvious what is a reason to have Task?


Reply to this email directly or view it on GitHub
#2641 (comment).

@artem-zinnatullin
Copy link
Contributor

What about different approach?

SingleItemObserver<T>:

public static abstract class SingleItemObserver<T> implements Observer<T> {
  @Override public final void onNext(T t) {
    // now Observer should somehow unsubscribe from Observable or ignore next calls
    onSuccess(t);
  }

  public abstract void onSuccess(T t);

  @Override public final void onCompleted() {
    // no impl required
  }
}

Observable is key concept of Reactive, with RxJava we thinking about streams of data because it allows us to make things like this:

1st version of Observable can return data from API
2nd version of Observable can return data from Cache and then from API
3rd version of Observable can return data from Cache, then from API and then receive further updates automatically

It's evolution of the application, it should not be hard.

With RxJava you don't have to change client (Observer) code, that's the power of RxJava (and operators, of course).

But, yes, sometimes (actually, pretty often) you want to receive only one result and don't care about onCompleted(), so why not to do it in lowest level as possible -> in Observer. It will allow you to change only client part (Observer) in future if you will need Stream of results instead of one result.

I will definitely won't use Task just because I don't want to have problems with switching to Observable in future if I'll need caching and other things.

Just some thoughts.

@benjchristensen
Copy link
Member Author

@artem-zinnatullin what about the proposed addition gives problems switching? It "is" an Observable and easily switches between them. That's the point of it.

@benjchristensen
Copy link
Member Author

I spoke with @headinthebox and we are both okay with this addition except for the name. Both 'Task' and 'Future' are used elsewhere to represent eager async scalar computation. We need a name that differentiates.

I'm not a fan of 'Single' or 'SingleObservable'. 'ScalarObservable' is correct but feels like a mouthful.

Thoughts on naming?

@artem-zinnatullin
Copy link
Contributor

@artem-zinnatullin what about the proposed addition gives problems switching? It "is" an Observable and easily switches between them. That's the point of it.

@benjchristensen as I see from files in the PR, it's not an Observable, it's a copy of Observable with similar methods and the problem is that if API declares some method with return type as Task it will be hard to change it to Observable in future, because it will affect client code.

I want to suggest to move decision "to work with Stream of data or work with one emission of data" to the end of the chain — into Observer.

About naming: ScalarObservable is okay

@tomlokhorst
Copy link

@benjchristensen Could you elaborate on the need and use cases for this "Task" to be lazy by default?

Since it can only ever produce one value, it seems especially weird that, that value is "mutable".
I like Tasks in other languages because they represent a future immutable value, where as Observables are a stream of values. ScalarObservable seems a weird in between I would not expect.

Out of these three functions, the last one seems least useful.

Observable<DateTime> serverTimeTicker = getServerTimesViaOpenConnection()
Task<DateTime> serverTime = getCurrentTimeFromServer()
ScalarObservable<DateTime> updatingServerTime = getUpdatingServerTime()

Or is this merely lazy-by-default to keep it semantically closer to Observables?

@headinthebox
Copy link
Contributor

The current proposal is similar to F#'s and Darts async in that you build a computation and then execute it. See http://tomasp.net/blog/async-csharp-differences.aspx for an explanation and the benefits of the "cold" model.

@kirkshoop
Copy link
Member

RE: naming - This seems like the Command pattern so perhaps ObservableCommand would fit.
Also the design might benefit from looking at ReactiveCommand. It is intended to integrate with XAML UI, but remove those pieces and I think it provides a good abstraction of a cold observable async task.

@benjchristensen
Copy link
Member Author

some method with return type as Task it will be hard to change it to Observable in future, because it will affect client code.

@artem-zinnatullin Yes, but that's kind of the point. My experience with consuming code is that changing an Observable behavior to start emitting more than 1 item will often break user code that assumes 1 item.

If you don't want to use this new type, that's fine, it's not for you then :-)

I want to suggest to move decision "to work with Stream of data or work with one emission of data" to the end of the chain — into Observer.

That doesn't solve the desired use case which is communicating via the public API that something is scalar. The Observer is not the issue.

@benjchristensen
Copy link
Member Author

Or is this merely lazy-by-default to keep it semantically closer to Observables?

@tomlokhorst That's one reason. The more important reason though is so that compositional graphs can be lazily defined and then subscribed to multiple times, just like with Observables.

The reason for being "cold" is exactly the same as Observable supporting being cold. Being scalar or vector is orthogonal to being hot vs cold.

If a ScalarObservable is hot then it must always be exposed with a function F<Task<T>> to be reused or lazily invoked.

There are already hot types like Promise and Future (and in C# Task) so the goal here is not to create another Future. It is to create a type with similar semantics as Observable (supporting push, composition, laziness, reactive pull backpressure etc) but via its type communicate that it is scalar.

This fits with what @headinthebox stated above: "build a computation and then execute it"

@benjchristensen
Copy link
Member Author

Naming options include:

  • ScalarObservable<T>
  • ObservableCommand<T>
  • Single<T>
  • Async<T>
  • Task<T>
  • Scalar<T>

Despite not liking how long the name is, ScalarObservable is the least surprising and best at communicating what it is.

@tomlokhorst
Copy link

I don't like ScalarObservable<T> because, to me at least, the "Observable" suffix implies it inherits from Observable<T>. I'm not very familiar Java naming conventions, so I don't know if others would make this assumption as well.

And thanks @benjchristensen and @headinthebox for expanding on the hot vs cold here. While I don't yet see a practical use case, I can definitely see the theoretical use case. That means a practical use case will probably pop up in a little while.

@mttkay
Copy link
Contributor

mttkay commented Mar 18, 2015 via email

@headinthebox
Copy link
Contributor

+1 for Single

@sdeleuze
Copy link

Hi @benjchristensen,

I fully agree with the need to have a scalar observable type, especially when exposing such type in application public API.

The main thing that confuses me in your implementation is this embedded Task.Promise type. I understand this is done differently in .NET since Task already exists, but on other platforms I would just expect a single type that I would expose in my API instead of Observable for scalar result.

About the naming:

  • I agree that Task is confusing
  • ScalarObservable is the most meaningful name, but I agree with @tomlokhorst, people will expect it to extend Observable and that's not the case
  • What about Deferred?

Thanks in advance for your feedback.

@mttkay
Copy link
Contributor

mttkay commented Mar 19, 2015

Deferred might get confused with defer/OnSubscribeDefer though?

On Thu, Mar 19, 2015 at 11:12 AM, Sébastien Deleuze <
notifications@github.com> wrote:

Hi @benjchristensen https://github.com/benjchristensen,

I fully agree with the need to have a scalar observable type, especially
when exposing such type in application public API.

The main thing that confuses me in your implementation is this embedded
Task.Promise type. I understand this is done differently in .NET since
Task already exists, but other platforms I would just expect a single
type that I would expose in my API instead of Observable for scalar
result.

About the naming:

  • I agree that Task is confusing
  • ScalarObservable is the most meaningful name, but I agree with
    @tomlokhorst https://github.com/tomlokhorst, people will expect it
    to extends Observable and that not the case
  • What about Deferred?

Thanks in advance for your feedback.


Reply to this email directly or view it on GitHub
#2641 (comment).

Matthias Käppler

Engineer

Twitter: https://twitter.com/mttkay

Skype: matthias-sc

SoundCloud Ltd. | Rheinsberger Str. 76/77, 10115 Berlin, Germany | +49
(0)172 2345679

Managing Director: Alexander Ljung | Incorporated in England & Wales
with Company
No. 6343600 | Local Branch Office | AG Charlottenburg | HRB 110657B

Capture and share your music & audio on SoundCloud
http://soundcloud.com/creators

@sdeleuze
Copy link

@mttkay Indeed that may perhaps be an issue.

I am not sure it makes sense, and there may be some backward compatibility impact, but since ScalarObservable is a nice name, what about making ScalarObservable and Observable extending the same XxxObservable class (to be created) ? Or have Observable extending ScalarObservable ?

@davidmoten
Copy link
Collaborator

I'm not a fan of the use of the word scalar for this purpose. I think it's use in programming is unfortunate. I was very confused by the the term when I encountered it in the codebase ages ago because a scalar in mathematics is just a number (something that has scale).

+1 Single

@iNoles
Copy link

iNoles commented Mar 19, 2015

if it is going to be Async, I would expect it to be full asynchronous from beginning to end. How it is going to handle over 100 items, would it be same as Observable?

@Matthias247
Copy link

I don't like Scalar - it sounds far more theoretical than for example Task. But that Task and Future are commonly used to describe the result of an already started async operation is true. So I would also favor another name. LazyTask could be an understandable compromise. Something like TaskBuilder, TaskRecipe would be similar. F#'s Async<T> really is in the same category than what is described here. Maybe it could also be AsyncOp<T> or AsyncOperation<t>? I guess LazyAyncOp is too much :)

@simonbasle
Copy link
Contributor

Interesting discussion!
I can see a rationale for not extending Observable, but maybe as @sdeleuze suggested a common base can be extracted?
Regarding naming I feel ScalarObservable is OK, Single also gets my +1.

@benjchristensen have you considered a onEmpty additional semantic if it's not going to extends Observable? (taking care of the Optional-like case where no return value is a correct case)

@codefromthecrypt
Copy link

just here for the bikeshed part: I like Async<T> and Async.Callback<T>.

@passsy
Copy link
Contributor

passsy commented May 8, 2015

+1 for this proposal

I'm a fan of Async<T> and Single <T>

@benjchristensen
Copy link
Member Author

The issue I have with Async is that there is actually nothing about the type that requires it to execute asynchronously, just like Observable. Thus between Async and Single I'd prefer Single.

@benjchristensen
Copy link
Member Author

@simonbasle

common base can be extracted

What do you intend? The point of the different types if that they are different. Even the subscribe signature is different.

have you considered a onEmpty additional semantic

Yes I think we should have that. I don't have it in this PR as I was going for a bare minimum to get started, but that's definitely a valid operator to have here.

@artem-zinnatullin
Copy link
Contributor

@benjchristensen I understand purposes of such Observable, but
are you sure you want this?

For simplicity, let's call it Single.

Pros:

  • Obvious API declaration when only one item should be emitted.

Cons and questions:

  • Great documentation is huge part of RxJava, especially, marble diagrams, it'll require a lot of work to add documentation for each Operator which is applicable both to Single and Observable, also, Operators' documentation for Single and Observable on the reactivex.io probably should be divided to make it easier to search/read.
  • You'd have to support both Observable and Single Operators and violate DRY is some cases 😞.
  • Same pack of tests would be required for Single, a lot of tests.
  • Will Single support as much Observable Operators as possible?
  • There are some questionable Operators for Single: Filter, All, Contains, SkipUntil, SkipWhile, TakeUntil, TakeWhile, they are applicable to Single, but semantically they intended to work with streams of data rather than with one item of the stream.
  • I'm afraid Single will make RxJava even more difficult for newbies: when to use Single and when to use Observable, why I need to convert Single into Observable for some things and so on.
  • Combining Operators of Observable will have to accept Single as parameter or we will see toObservable() hell in code, same problem with accepting Observable in combining operators of Single.
  • It'll be weird that it's possible to create Observable.empty() and Single.empty().
  • It'll be weird that you can use both Single.just() and Observable.just(), but Single.just() will return Single, while Observable.just() will return Observable, it's okay for me, but I don't think it'll be so obvious for those who just starting their Rx life.
  • Also, Scala supports Algebraic Data Types concept which looks like a valid case for Observable and Single, but I am not sure that it'll be possible to implement it correctly when core is written in Java, may be @zsxwing will add some thoughts.
  • No errors detection at compile time in compare to Observable, I don't see how Single type can help in that case.
  • It'll require to write overloads of methods to support both Observable and Single to avoid converting one into another in many applications.

Extra addition:

loadSomething() // returns Observable<T>
  .subscribe(value -> doSomething(value));

loadSomething() // returns Single<T>
  .subscribe(value -> doSomething(value));

As you can see, with lambdas it does not matter what is it: Single or Observable, you don't see anonymous classes with onNext() which look strange for Observable that emits one item.

I agree, that Single can help with creating more obvious API, especially if we don't want to write documentation for it, but in case of reading such code, I don't see much profit from Single, also, as I mentioned before, it'll be hard to make API more "reactive" in case of switching from Single to Observable without breaking compatibility 😞.

Just saying...

@Nilzor
Copy link

Nilzor commented May 12, 2015

No I'm not sure I want this, but for the sake of the debate I'd like to counter a couple of your cons:

You'd have to support both Observable and Single Operators and violate DRY is some cases

We could consider Singles to simply be constrained Observables under the hood, hence not violating DRY.

There are some questionable Operators for Single: Filter, All, Contains, SkipUntil, SkipWhile, TakeUntil, TakeWhile, they are applicable to Single, but semantically they intended to work with streams of data rather than with one item of the stream.

Exactly. These are not questionable - they are a definite no-go for Single. My point for supporting Single is to make the API simpler for newbies - most use cases I've encountered in my work reply with one event, so the wealth of functions for Observables confuse me. Removing operators such as filter, all, contains etc will simplify the API.

I'm afraid Single will make RxJava even more difficult for newbies: when to use Single and when to use Observable, why I need to convert Single into Observable for some things and so on.

I argue that most newbies will encounter Single before Observable, hence facing a simpler, API more easy to learn.

It'll be weird that it's possible to create Observable.empty() and Single.empty().

So don't implement it.

It'll be weird that you can use both Single.just() and Observable.just(), but Single.just() will return Single, while Observable.just() will return Observable, it's okay for me, but I don't think it'll be so obvious for those who just starting their Rx life.

I'd say only implement Single.from(), not Single.just(). It doesn't make sense for Singles.

@codefromthecrypt
Copy link

codefromthecrypt commented May 12, 2015 via email

@artem-zinnatullin
Copy link
Contributor

Just tried RxNetty, now I understand need in Scalar Observable type. Sorry for my previous comments, in client-side development it's usually okay to use Observable for almost everything, subscribe to them and refresh the UI on each emission.

But for example with RxNetty you need Scalar Observable for HTTP request processing because it's only possible to return one response to HTTP request, at the moment you have to use Observable for this and it's not so convenient and looks strange.

Probably, for backend systems which use HTTP instead of some socket-based solutions, Observable is too general type and Scalar Observable will be better type for such systems.

Naming: Single looks short, good and understandable, the only thing is that rx.Observable already has method single() which can cause some misunderstanding, but anyway, now I want to see Scalar Observable type in RxJava!

@zsxwing
Copy link
Member

zsxwing commented May 12, 2015

Single +1

@benjchristensen
Copy link
Member Author

Replacing with #3012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet