Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding super/extends so that Observable is covariant #331

Merged
merged 20 commits into from
Sep 4, 2013

Conversation

jmhofer
Copy link
Contributor

@jmhofer jmhofer commented Aug 31, 2013

Ok, so this pull request changes a lot of lines. It's mostly generalizing all the FuncXs to be used like FuncX[-T1, -T2, ..., -TX, +R] (contravariant parameters, covariant return type) and all the Observers to be used "in a contravariant way". A few of the Observable uses are covariant, now, too (mostly zip).

This is the pull request for #326.

This doesn't look very good in the code (thanks Java). Also, it doesn't seem to make Scala interop easier at all (at least not yet).

Please take a look. I'm not exactly happy with the result. - Maybe I'm doing something wrong here? - I've still got hope that there's an easier way...

The pull request compiles and tests ok for me (except for the Clojure module, but that's another story and not due to my changes).

@cloudbees-pull-request-builder

RxJava-pull-requests #213 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

I'll have to spend some time later playing with this. Anyone else able to try this out and comment on the changes?

@jmhofer Can you provide examples or use cases of using the updated code that demonstrates covariant/contravariant usage that couldn't be achieved before?

@jmhofer
Copy link
Contributor Author

jmhofer commented Aug 31, 2013

Ok, here's an example of what you can do now with the improved zip:

import rx.Observable;
import rx.util.functions.Func2;

class Media {}
class Movie extends Media {}
class HorrorMovie extends Movie {}

class Rating {}
class CoolRating extends Rating {}

class Result {}
class ExtendedResult extends Result {}

public class Covariance {

  public static void main(String[] args) {
    Observable<HorrorMovie> horrors = Observable.from(new HorrorMovie());
    Observable<CoolRating> ratings = Observable.from(new CoolRating());

    Func2<Media, Rating, ExtendedResult> combine = new Func2<Media, Rating, ExtendedResult>() {
      @Override public ExtendedResult call(Media m, Rating r) {
        return null;
      }
    };

    Observable.zip(horrors, ratings, combine);
    // 0.11: The method zip(Observable<T0>, Observable<T1>, Func2<T0,T1,R>) 
    // in the type Observable is not applicable for the arguments 
    // (Observable<HorrorMovie>, Observable<CoolRating>, Func2<Media,Rating,ExtendedResult>)
    //
    // but works in super-extends branch
  }
}

@jmhofer
Copy link
Contributor Author

jmhofer commented Aug 31, 2013

Lots more Observable usages to go; however, the Observers are probably mostly done...

benjchristensen and others added 4 commits August 31, 2013 16:15
…cess with them.

- only testing zip operator at this time
I can't get through release process to maven central on 2.10.2 for some reason so pinning until that is solved.
Changed order of generics on zip (and combineLatest) to match the rest of the project.
Added arties 5, 6, 7, 8, 9 to zip operator.

ReactiveX#333 Order of Generics on Zip Operator
ReactiveX#103 Add more arities to zip operator
Zip and CombineLatest Operators: Generic Order and More Arities
@cloudbees-pull-request-builder

RxJava-pull-requests #216 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

Let me know when you're ready for this to be merged and released. I plan on releasing this as 0.12.0 as it does have some breaking changes.

@benjchristensen
Copy link
Member

That CloudBees build failure is legit, I also can't build rxjava-scala on my machine with this branch. The core library is fine.

* What went wrong:
Execution failed for task ':language-adaptors:rxjava-scala:compileScala'.
> Compilation failed because of an internal compiler error; see the error output for details.

@daveray
Copy link
Contributor

daveray commented Sep 1, 2013

In this line:

Observable<String> observable = Observable.create(new Func1<Observer<? super String>, Subscription>()

is it the case that Java programmers creating an Observable this way would always have to include the ? super bit? Or is this just for completeness? Is this what you mean by "I'm not exactly happy with the result"? I haven't thought through it, but this doesn't seem like it should be necessary if it's done right.

@jmhofer
Copy link
Contributor Author

jmhofer commented Sep 1, 2013

@benjchristensen I just noticed that, too, and adapted the RxImplicits.

@daveray Looks like it, yes. I couldn't make the compiler happy without it, but maybe I'm missing something. If you find something that avoids this, then please let me know.

@cloudbees-pull-request-builder

RxJava-pull-requests #217 SUCCESS
This pull request looks good

@jmhofer
Copy link
Contributor Author

jmhofer commented Sep 1, 2013

Still to do: Future is covariant, and Timestamped and Notification are probably, too.
Also: Lots of operators can be used standalone and therefore should be generalized, too.

@cloudbees-pull-request-builder

RxJava-pull-requests #218 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #219 FAILURE
Looks like there's a problem with this pull request

@jmhofer
Copy link
Contributor Author

jmhofer commented Sep 1, 2013

Huh? - Very interesting (compiles for me)...

@benjchristensen
Copy link
Member

Here is a trivial example using Java 8 demonstrating how ? super needs to exist even here if retaining type safety:

        Observable<String> oMovie = Observable.create((Observer<? super Movie> o) -> {
            o.onNext(new Movie());
            o.onNext(new Movie());
            return Subscriptions.empty();
        }).map((movie) -> {
            return "movie transformed: " + movie;
        });


        Observable<String> oMedia = Observable.create((Observer<? super Media> o) -> {
            o.onNext(new Media());
            o.onNext(new HorrorMovie());
            return Subscriptions.empty();
        }).map((movie) -> {
            return "media transformed: " + movie;
        });

        Observable.zip(oMovie, oMedia, (a, b) -> {
           return a + " ----- " + b;
        }).subscribe((movie) -> {
            System.out.println("Media/Movie: " + movie);
        });

Or type safety can be thrown away:

        Observable<String> oMovie = Observable.create((Observer o) -> {
            o.onNext(new Movie());
            o.onNext(new Movie());
            return Subscriptions.empty();
        }).map((movie) -> {
            return "movie transformed: " + movie;
        });

But now that allows this to compile:

        Observable<String> oMovie = Observable.create((Observer o) -> {
            o.onNext(new Movie());
            o.onNext("hello"); // this is not a Movie object
            return Subscriptions.empty();
        }).map((movie) -> {
            return "movie transformed: " + movie;
        });

@benjchristensen
Copy link
Member

@jmhofer The create method is how Observables are created so this affects all producers. This should not affect consumers other than making the prompts/docs verbose:

screen shot 2013-09-03 at 4 41 21 pm

@mttkay and @mustafasezgin Do you have any input on this discussion since you are using RxJava from plain Java?

@benjchristensen
Copy link
Member

Groovy is similar ... but the generics don't actually do much for it as they are more-or-less ignored at compile time as best I can tell:

        Observable<String> oMovie = Observable.create({ Observer o ->
            o.onNext(new Movie());
            o.onNext(new Movie());
            return Subscriptions.empty();
        }).map({ movie ->
            return "movie transformed: " + movie;
        });


        Observable<String> oMedia = Observable.create({ Observer<? super Media> o ->
            o.onNext(new Media());
            o.onNext(new HorrorMovie());
            return Subscriptions.empty();
        }).map({ movie ->
            return "media transformed: " + movie;
        });

        Observable.zip(oMovie, oMedia, { a, b ->
            return a + " ----- " + b;
        }).subscribe({ movie ->
            System.out.println("Media/Movie: " + movie);
        });

@benjchristensen
Copy link
Member

This Groovy code works against current master as well as the new code with ? super:

        Observable<Movie> oMovie = Observable.create({ Observer<? super Movie> o ->
            o.onNext(new Movie());
            o.onNext(new Movie());
            o.onCompleted();
            return Subscriptions.empty();
        });


        Observable<Media> oMedia = Observable.create({ Observer<? super Media> o ->
            o.onNext(new Media());
            o.onNext(new HorrorMovie());
            o.onCompleted();
            return Subscriptions.empty();
        });

        Observable.zip(oMovie, oMedia, { Movie a, Media b ->
            return String.valueOf(a) + " ----- " + String.valueOf(b);
        }).subscribe({ media ->
            System.out.println("Media/Movie: " + media);
        });

And here it is again in plain Java:

        Observable<Movie> oMovie = Observable.create(new Func1<Observer<? super Movie>, Subscription>() {

            @Override
            public Subscription call(Observer<? super Movie> o) {
                o.onNext(new Movie());
                o.onNext(new Movie());
                o.onCompleted();
                return Subscriptions.empty();
            }

        });

        Observable<Media> oMedia = Observable.create(new Func1<Observer<? super Media>, Subscription>() {

            @Override
            public Subscription call(Observer<? super Media> o) {
                o.onNext(new Media());
                o.onNext(new HorrorMovie());
                o.onCompleted();
                return Subscriptions.empty();
            }

        });

        Observable.zip(oMovie, oMedia, new Func2<Movie, Media, String>() {

            @Override
            public String call(Movie a, Media b) {
                return String.valueOf(a) + " ----- " + String.valueOf(b);
            }

        }).subscribe(new Action1<String>() {

            @Override
            public void call(String media) {
                System.out.println("Media/Movie: " + media);
            }

        });

It seems that ? super is fine on the outer generics and doesn't need to be typed everywhere, it's just when it's a generic inside a generic, the type of the Observer that it becomes annoying.

@benjchristensen
Copy link
Member

It seems the only option (while supporting covariance) for reducing code verbosity is to create a new type that hides the Func1<? super Observer<? super T>, ? extends Subscription> complexity.

So my question now is: should we change the API to make all uses of create use the new type, or should it remain as Func1 and the other type is just a nice utility that can be used to fulfill the signature?

In other words, do we leave it as this:

public static <T> Observable<T> create(Func1<? super Observer<? super T>, ? extends Subscription> func)

or change it to

public static <T> Observable<T> create(ObservableFunction<T> func)

And what do we call the new type if we go that route? Options I've considered are:

  • ObservableFunction
  • ObservableFunc
  • OnSubscribeFunction
  • OnSubscribeFunc
  • FuncOnSubscribe

And should that live in rx.util.functions or rx alongside rx.Observable? It feels that this is not generic and should therefore live inside rx.

Unfortunately we can not overload this method and support both as type erasure makes them the same (and it will confuse dynamic languages, implicits etc) if we had two methods with similar single function signatures.

Normally I'd rather leave the lower level Func1 method signature and just have a helper class, but code completion and javadocs will be far less obvious and not very discoverable. Thus, for discoverability I think it makes more sense to use create(ObservableFunction<T> func). I can't yet think of any forward-compatibility reason why this would be a bad thing. This pull request is breaking either direction we pursue.

@benjchristensen
Copy link
Member

One last spam to everyone ... please weigh in if you have an opinion, as I intend on making a decision and releasing this week.

This will be a breaking change and affect usage for everyone.

@benjchristensen
Copy link
Member

Here is what the ObservableFunction interface looks like:

/**
 * Function interface for work to be performed when an {@link Observable} is subscribed to via {@link Observable#subscribe(Observer)}
 * 
 * @param <T>
 */
public interface ObservableFunction<T> extends Func1<Observer<? super T>, Subscription> {

    public Subscription call(Observer<? super T> t1);

}

And the updated create method:

public static <T> Observable<T> create(ObservableFunction<T> func) 

And sample code using this:

        Observable<Movie> oMovie = Observable.create(new ObservableFunction<Movie>() {

            @Override
            public Subscription call(Observer<? super Movie> o) {
                o.onNext(new Movie());
                o.onNext(new Movie());
                o.onCompleted();
                return Subscriptions.empty();
            }

        });

@benjchristensen
Copy link
Member

Here is a fork including these changes so we can review and discuss: benjchristensen@de0358f

I'm still not thrilled by any of the directions we can take here. I can't argue against making RxJava support covariant types. Josh Bloch certainly supports it in Effective Java when he says "If you write a library that will be widely used, the proper use of wildcard types should be considered mandatory."

On the flip-side, it forces the use of '? super/? extends' everywhere even when covariant requirements are rare.

I think the only decision at this point to make is whether we should use something like ObservableFunction<T> to minimize the verbosity or just leave Func1<? super Observer<? super T>, ? extends Subscription>. And if we do use ObservableFunction do we provide similar solutions elsewhere? That feels wrong to have special function interfaces for so many things, even though it likely will improve readability and usability.

Other operators that are awkward include:

  • defer(Func0<? extends Observable<? extends T>> observableFactory)
  • aggregate(Func2<? super T, ? super T, ? extends T> accumulator)
  • buffer(Func0<? extends Observable<? extends BufferClosing>> bufferClosingSelector)
  • flatMap(Func1<? super T, ? extends Observable<? extends R>> func)

FlatMap is very common and ends up like this:

        oMedia.flatMap(new Func1<Media, Observable<? extends String>>() {
            @Override
            public Observable<? extends String> call(Media s) {
                ...
            }
        });

Compare this with current:

        oMedia.flatMap(new Func1<Media, Observable<String>>() {
            @Override
            public Observable<String> call(Media s) {
                ...
            }
        });

However, lambdas do make a big different on instance methods (not as much on statics) as they can infer the types. For example, in Java 8 the above can become this:

        oMedia.flatMap(value -> {
            return null;
        });

So without ObservableFunction<T> in Java 8 we have:

        Observable.create((Observer<? super Media> o) -> {
            o.onNext(new Media());
            o.onNext(new HorrorMovie());
            return Subscriptions.empty();
        }).flatMap(movie -> {
            return Observable.from("media transformed: " + movie.getName());
        });

With ObservableFunction<T> in Java 8 we get this:

        Observable.create((Observer<Media> o) -> {
            o.onNext(new Media());
            o.onNext(new HorrorMovie());
            return Subscriptions.empty();
        }).flatMap(movie -> {
            return Observable.from("media transformed: " + movie.getName());
        });

It seems that type inference will handle the instance methods. For example, aggregate/reduce becomes this:

        Observable.create((Observer<Integer> o) -> {
            o.onNext(1);
            o.onNext(2);
            return Subscriptions.empty();
        }).aggregate((previous, current) -> {
            return previous + current;
        });

So it's only the static methods of concern, create being the most significant.

Java 6/7 (and Android) are going to be ugly no matter what - but we already knew that and they already are. This makes it worse.

@mttkay
Copy link
Contributor

mttkay commented Sep 4, 2013

Experiencing some of the pain with generic method signatures and nesting
first hand I prefer the type alias in form of an interface. I would argue
that the subscription function is so widely used that the interface
cloaking it will be understood quickly enough by developers so the loss in
direct clarity should be negligible.

@cloudbees-pull-request-builder

RxJava-pull-requests #229 FAILURE
Looks like there's a problem with this pull request

@abersnaze
Copy link
Contributor

+1 ObservableFunction interface but call it something else maybe with subscription/subscribe in the name.

@jmhofer
Copy link
Contributor Author

jmhofer commented Sep 4, 2013

@benjchristensen We could overload create etc. - For this, we'd have to make ObservableFunction delegate to Func1 then, instead of inheriting from it. I'm not sure if that's a good idea, though. It would probably be even more confusing.

I'm not sure about the naming. Although it is kind of the internal Observable function, I think I'd prefer SubscriptionFunction or SubscriberFunction or something like that.

@cloudbees-pull-request-builder

RxJava-pull-requests #230 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

How about these names?

  • OnSubscribeFunction
  • OnSubscribeFunc
  • FuncOnSubscribe

@benjchristensen
Copy link
Member

Based on my highly scientific poll of people around me at my office ... and the few who have commented above, I'm going with OnSubscribeFunc.

@daveray
Copy link
Contributor

daveray commented Sep 4, 2013

@benjchristensen works for me. Will it extend Func1, or be an independent type as @jmhofer suggests above. I think I'd prefer the latter.

@benjchristensen
Copy link
Member

I haven't played with it not extending Func1. It will need to at least extend Function so it fits with all other functions in the codebase. In what way will it improve things if it doesn't extend?

@benjchristensen
Copy link
Member

Since this function is only intended for us by the Observable.create does anyone have issues with it being namespaced as rx.Observable.OnSubscribeFunc as a static inner class of Observable rather than a first-class citizen rx.OnSubscribeFunc?

This works great in Java, are there any issues from Clojure or Scala doing it this way?

Create looks like this:

public static <T> Observable<T> create(OnSubscribeFunc<T> func)

Use of it looks like this:

        import rx.Observable;
        import rx.Observable.OnSubscribeFunc;

        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            @Override
            public Subscription call(Observer<? super String> Observer) {
                Observer.onNext("one");
                Observer.onNext("two");
                Observer.onNext("three");
                Observer.onCompleted();
                return Subscriptions.empty();
            }

        });

The function definition looks like:

    /**
     * Function interface for work to be performed when an {@link Observable} is subscribed to via {@link Observable#subscribe(Observer)}
     * 
     * @param <T>
     */
    public static interface OnSubscribeFunc<T> extends Function<T> {

        public Subscription call(Observer<? super T> t1);

    }

@jmhofer
Copy link
Contributor Author

jmhofer commented Sep 4, 2013

Good idea, and shouldn't be a problem for Scala.

@benjchristensen
Copy link
Member

Great, I'll proceed with this change then.

Thanks for the feedback.

@benjchristensen
Copy link
Member

Final interface looks like this:

    /**
     * Function interface for work to be performed when an {@link Observable} is subscribed to via {@link Observable#subscribe(Observer)}
     * 
     * @param <T>
     */
    public static interface OnSubscribeFunc<T> extends Function {

        public Subscription onSubscribe(Observer<? super T> t1);

    }

This is being merged in #343

@benjchristensen benjchristensen merged commit 51dd848 into ReactiveX:master Sep 4, 2013
rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
This is related to ongoing work related to covariant support at ReactiveX#331
billyy pushed a commit to billyy/RxJava that referenced this pull request Jan 13, 2014
This is related to ongoing work related to covariant support at ReactiveX#331
benjchristensen added a commit to ReactiveX/RxSwing that referenced this pull request Aug 19, 2014
This is related to ongoing work related to covariant support at ReactiveX/RxJava#331
benjchristensen added a commit to ReactiveX/RxGroovy that referenced this pull request Aug 19, 2014
This is related to ongoing work related to covariant support at ReactiveX/RxJava#331
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…ntimeExce… (ReactiveX#389)

* Issue ReactiveX#331: Fixed Retry.decorateCallable. It handled only RuntimeExceptions, but should handle Exceptions instead.

* Issue ReactiveX#331: CircuitBreaker not handle java.lang.Error, but only java.lang.Exception.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants