Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Unexpected Flowable.flatMap(...) behavior while handling an upstream error #6825

Closed
ghost opened this issue Jan 8, 2020 · 5 comments
Closed
Milestone

Comments

@ghost
Copy link

ghost commented Jan 8, 2020

Hi,

I've discovered an unexpected behavior of the Flowable.flatMap() operator leading to a possible subscription leak. Localized my problem in the following scenario:

  1. RxJava version 2.2.16.
  2. Given a simple Rx chain of flowable1.flatMap(value -> flowable2).
  3. flowable1 emits a value (so that flatMap subscribes to flowable2).
  4. flowable1 emits an error.
    Expected result: flatMap disposes flowable2 upon handling the upstream error.
    Actual result: flowable2 remains subscribed after the entire Rx chain terminates because of the error.

Replacing the flatMap operator with concatMap fixes the problem (such a replacement is valid in my case). However I decided to raise this issue because such behavior of flatMap looks quite odd to me. Could somebody from RxJava team confirm whether my expectation is valid and the observed behavior is a defect?

I've reproduced this issue in a code snippet: https://gist.github.com/eugene-zolotko/b24cbc436bc0eab2ed5de539b9e4e312
I'm expecting this code to produce "flowable2 cancelled" output. But got "flowable2 error" instead, plus error2 gets thrown as UndeliverableException because it occurs after the entire chain is terminated.

@akarnokd
Copy link
Member

akarnokd commented Jan 8, 2020

Sources emitting an error are considered cancelled so it is completely legal to not call cancel on them. Use doFinally to cleanup upon all sorts of termination or cancellation.

@ghost
Copy link
Author

ghost commented Jan 8, 2020

Thank you for looking into this. But I'm afraid I wasn't specific enough in my question above. In the following code snippet I expected error2 to never get emitted because error1 terminates the entire flow sooner that flowable2 emits error2. However error2 actually fired (triggered an UndeliverableException).

    final Exception error1 = new Exception("1");
    final Flowable<Integer> flowable1 = Flowable.just(1).delay(1, TimeUnit.SECONDS)
        .concatWith(Flowable.<Integer>error(error1).delaySubscription(1, TimeUnit.SECONDS));

    final Exception error2 = new Exception("2");
    final Flowable<Object> flowable2 =
        Flowable.error(error2).delaySubscription(2, TimeUnit.SECONDS)
            .doOnCancel(() -> System.out.println("flowable2 cancelled"))
            .doOnError(error -> System.out.println("flowable2 error"));

    flowable1.flatMap(i -> flowable2).test()
        .awaitDone(4, TimeUnit.SECONDS)
        .assertError(error1);

So my questions are:

  1. Is this an expected behavior? 2. Is there a way to avoid throwing error2 as an UndeliverableException (except for intercepting it with RxJavaPlugins.setErrorHandler)? 3. What's the reason of the difference in behavior between flatMap and concatMap is such conditions?

@akarnokd
Copy link
Member

akarnokd commented Jan 8, 2020

  1. No, the inner sequence should get cancelled. I'll investigate this further.
  2. Suppressing the error via onErrorX or not let RxJava know about the error in the first place
  3. Should work the same.

@akarnokd
Copy link
Member

akarnokd commented Jan 9, 2020

Closing via #6826 & #6827.

@akarnokd akarnokd closed this as completed Jan 9, 2020
@ghost
Copy link
Author

ghost commented Jan 9, 2020

Thank you very much for your support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant