Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Method for converting an Observable to a different type, on Observable #2793

Closed
ldaley opened this issue Mar 3, 2015 · 13 comments
Closed

Method for converting an Observable to a different type, on Observable #2793

ldaley opened this issue Mar 3, 2015 · 13 comments

Comments

@ldaley
Copy link
Contributor

ldaley commented Mar 3, 2015

Ratpack is a web framework that integrates with RxJava. It has its own Promise type used throughout its API. Part of the Rx integration is the ability to move back and forth between Ratpack and Rx types. A method like…

<O> O to(Func1<? super Observable<T>, ? extends O> converter)

On Observable would help here. Ratpack has something similar on it's Promise type. Having this method on Observable would allow code like Observable.just(1).to(RxRatpack::asPromise). The same could be used for the RxReactiveStreams integration… Observable.just(1).to(RxReactiveStreams::toPublisher).

A bit niche I know, but I thought it's worth asking for as it would make using Rx from Ratpack nicer.

@akarnokd
Copy link
Member

akarnokd commented Mar 3, 2015

Or Ratpack could have a method method which takes an RxJava Observable and returns the appropriate type:

import static RxRatpack.*;
asPromise(source)...;

We do this for pattern for lots of things: iterables, futures, blocking observables. And besides, your proposed method loses the type T and you'd get Promise by default.

@ldaley
Copy link
Contributor Author

ldaley commented Mar 3, 2015

It does have that method. The problem is that it breaks the flow if it can't be used fluently.

Promise<T>  promise = someObservableOfT.to(RxRatpack::asPromise).onError(logger::error);

That's what I'm after.

The type of the observable isn't lost if the return of the converter function is defined in terms of it.

@akarnokd
Copy link
Member

akarnokd commented Mar 3, 2015

You are right with the return type, just checked. Still, I'm not convinced whether or not we'd need such operator.

@ldaley
Copy link
Contributor Author

ldaley commented Mar 4, 2015

I'm not sure what more I can add to this, unless you feel that you don't see how I would use it.

@benjchristensen
Copy link
Member

Hi @alkemist,

Would it be correct to say that this is about "escaping the monad" in a non-blocking way? Similar to how toBlocking().single() and others return T this is about extracting values from an Observable to a new type asynchronously, correct?

Effectively the only difference between to and subscribe is that to returns the O value so allows fluent APIs as opposed to defining the subscriber, subscribing, then using the subscriber instance.

This seems reasonable and like a simple overload of subscribe of which we already have several.

@ldaley
Copy link
Contributor Author

ldaley commented Mar 5, 2015

Would it be correct to say that this is about "escaping the monad" in a non-blocking way? Similar to how toBlocking().single() and others return T this is about extracting values from an Observable to a new type asynchronously, correct?

Not quite… if I understand you.

It's actually the observable that gets transformed not its items.

I have this…

static <T> Promise<T> promise(Observable<T> observable) {
  // adapt observable type to promise type
}

It's common in Ratpack to do processing with Rx, but need to give a promise to Ratpack to render the value or do… something… with it. Ratpack's core doesn't depend on Rx, so at times it's convenient to adapt.

So the static method thing works and is fine, it's just not lambda friendly and breaks the chaining flow.

Promise<String> string() {
  RxRatpack.promise(Observable.just("foo"))
}

vs.

Promise<String> string() {
  Observable.just("foo").to(RxRatpack::promise)
}

It's exactly the same thing when adapting Observable to Publisher, or whatever else.

@akarnokd
Copy link
Member

I've posted a PR for this #2971.

@stealthcode
Copy link

Would it be acceptable for the to method to take a Func1<OnSubscribe<T>, O> instead of the Func1<Observable<T>, O>? Then the implementation could be a call to pass the this.onSubscribe to the func and avoid subscribing when it may not be necessary.

@akarnokd
Copy link
Member

If "I" gave it out, there would be no safeguards, no plugin hook calls and no start calls.

@stealthcode
Copy link

@akarnokd You bring up a good point about the plugin hooks not getting called but this seems like something that could be solved by a variety of engineering solutions (such as an abstract class or possibly a default method on an interface).

With respects the onStart call, this isn't required for simple cases. The default behavior of a subscriber would make requests when setProducer is called. More complex use cases could create a Subscriber that implements an onStart method. I am not proposing not using a Subscriber; I am proposing not calling the Observable.subscribe method for the purpose of converting an Observable.

With respects to safeguards, I would appreciate your feedback. Let me pose a use case. Lets say that a developer wants to convert their observable to a HashMap using the to method in a blocking manner. Lets say that the conversion func has some way to produce a key for each value onNexted. If that key generation function threw an exception then what should happen? There is no Observable emitted so the error has to be propagated by normal Java means. I would expect the call to to to throw an exception from the actual line that caused the exception in the first place. This would give users the chance to handle conversion user exceptions. Another failure case scenario would be when there is an exception in the Observable's operator chain which propagates down to the to callsite via the onError. I would also expect a chance to handle this exception in a similar way. Neither of these conditions require the use of a SafeSubscriber. But I am totally open to other ideas. Can you think of another way to handle exceptions? How should users code their conversion funcs with the current proposed implementation?

@akarnokd
Copy link
Member

With respects the onStart call, this isn't required for simple cases. The default behavior of a subscriber would make requests when setProducer is called. More complex use cases could create a Subscriber that implements an onStart method.

onStart is part of the protocol between Observable and Subscriber, it must be called at the right time.

Lets say that a developer wants to convert their observable to a HashMap using the to method in a blocking manner. Lets say that the conversion func has some way to produce a key for each value onNexted. If that key generation function threw an exception then what should happen?

When you're designing the function that leaves the Observable realm, it is up to you to design the return of an error case.

public static <T> Func1<Observable<T>, HashMap<String, T>> toHashMap() {
    return o -> {
        HashMap<String, T> result = new HashMap<>();
        try {
            o.toBlocking().forEach(e -> result.put(e.toString(), e));
        } catch (RuntimeException ex) {
            // ignore
        }
        return result;
    };
}

Maybe you swallow the exception, maybe you return some default value or have the return type as Notification<T>.

Neither of these conditions require the use of a SafeSubscriber.

It is one of the safeguards with the purpose to call unsubscribe on terminal events and clean up the chain. The others are protecting against RuntimeExceptions thrown by OnSubscribe.call, Operator.call or even Observer.onNext and they then route it through the onError.

@akarnokd
Copy link
Member

We ended up with a new operator x() for this purpose in 1.0.14.

@ldaley
Copy link
Contributor Author

ldaley commented Aug 13, 2015

Interesting choice of method name.

I've integrated this for my original purpose: ratpack/ratpack@d65ec33

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants