Skip to content

2.x: Inconsistent behavior on throwing UndeliverableException in Observable.merge #5779

@tony-root

Description

@tony-root

Library version: 2.1.7

As stated in RxJava2 Error Handling, no Throwable should be silently swallowed in 2.x. Throwables that cannot be delivered to a disposed downstream get redirected to RxJavaPlugins' errorHandler.

However, Observable.merge operator does not behave consistently in this case:

  • If two different PublishSubjects are merged, and emit onError one after the other, the second exception does not get delivered to RxJavaPlugins' errorHandler.
  • If one PublishSubject get merged with itself (after being filtered for example, which I expect to be a common scenario), and it emit single onError, the exception emitted gets delivered to both onError and RxJavaPlugins' errorHandler.

The snippet below

public class MergeErrorsExample {
    public static void main(String[] args) {
        RxJavaPlugins.setErrorHandler((error) ->
                printError("RxJavaPlugins.errorHandler", error)
        );

        mergeTwoDifferentObservablesDoesNotThrowUndeliverable();
        mergeSameObservableThrowsUndeliverable();
        mergeSameObservableWithPublishThrowsUndeliverable();
    }

    static void mergeTwoDifferentObservablesDoesNotThrowUndeliverable() {
        System.out.println("Merge two different PublishSubjects DOES NOT throw UndeliverableException");
        PublishSubject<Object> ps1 = PublishSubject.create();
        PublishSubject<Object> ps2 = PublishSubject.create();

        Observable.merge(ps1, ps2).subscribe(
                (next) -> System.out.println("onNext " + next),
                (error) -> printError("onError", error),
                () -> System.out.println("onComplete")
        );

        ps1.onError(new RuntimeException("ps1 exception"));
        ps2.onError(new RuntimeException("ps2 exception"));
        System.out.println();
    }

    static void mergeSameObservableThrowsUndeliverable() {
        System.out.println("Merge same PublishSubject throws UndeliverableException");
        PublishSubject<Boolean> ps1 = PublishSubject.create();

        Observable.merge(
                ps1.filter((condition) -> condition),
                ps1.filter((condition) -> !condition)
        ).subscribe(
                (next) -> System.out.println("onNext " + next),
                (error) -> printError("onError", error),
                () -> System.out.println("onComplete")
        );

        ps1.onError(new RuntimeException("ps1 exception"));
        System.out.println();
    }

    static void mergeSameObservableWithPublishThrowsUndeliverable() {
        System.out.println("Merge same Observable with publish() throws UndeliverableException");
        Observable<Boolean> o1 = Observable.error(new RuntimeException("o1 exception"));

        o1.publish((observable) ->
                Observable.merge(
                        observable.filter((condition) -> condition),
                        observable.filter((condition) -> !condition)
                )
        ).subscribe(
                (next) -> System.out.println("onNext " + next),
                (error) -> printError("onError", error),
                () -> System.out.println("onComplete")
        );
        System.out.println();
    }

    private static void printError(String message, Throwable t) {
        System.out.println(message + ": [" + t.getClass().getSimpleName() + "] " + t.getMessage());
    }
}

produces

Merge two different PublishSubjects DOES NOT throw UndeliverableException
onError: [RuntimeException] ps1 exception

Merge same PublishSubject throws UndeliverableException
onError: [RuntimeException] ps1 exception
RxJavaPlugins.errorHandler: [UndeliverableException] java.lang.RuntimeException: ps1 exception

Merge same Observable with publish() throws UndeliverableException
onError: [RuntimeException] o1 exception
RxJavaPlugins.errorHandler: [UndeliverableException] java.lang.RuntimeException: o1 exception

I'm still not sure which of the options is correct, as "splitting" an Observable and then merging it back seems to be a common pattern, which will always cause an UndeliverableException in RxJavaPlugins' errorHandler in case of onError.
Observable.mergeDelayErrors is an option for "split-merge" pattern, but the user of it has to make sure he does not swallow an exception on one of the paths, or the exception will not be delivered to the downstream at all.

What do you think would be the correct behavior here?
Do you think it is possible to make Observable.merge not redirect an error to RxJavaPlugins' errorHandler in case of "split-merge" pattern?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions