Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to know when an Observable supports backpressure #2557

Closed
davidmoten opened this issue Jan 29, 2015 · 17 comments
Closed

How to know when an Observable supports backpressure #2557

davidmoten opened this issue Jan 29, 2015 · 17 comments

Comments

@davidmoten
Copy link
Collaborator

Observable.just(1,2) respects backpressure, yet Observable.just(1) does not. The second case uses ScalarSynchronousObservable which does not respect backpressure (nor unsubscription prior to onCompleted). This bit me in a unit test and the workaround is to use Observable.just(1).onBackpressureBuffer() but it struck me as a possible source of confusion indeed errors for myself and others.

This leads into a broader topic which is how can I know programmatically that an Observable respects backpressure? I've written an Operator that requires its (multiple) inputs to be backpressure enabled and I would love to be able to perform a check programmatically that the inputs are valid.

We could add a boolean Observable.respectsBackpressure() method to the API and the various transformations (say o.count()) would pretty easily pass or modify that value as appropriate.

Something like this would be especially useful for doing a strict merge that only allowed backpressure aware sources to be used. At the moment I only find out at runtime via a MissingBackpressureException that I have a problem and the circumstance that brought it about may be rare and not covered by my tests.

What do people think?

@benjchristensen
Copy link
Member

How is this knowledge different than not knowing how many elements a sequence is going to emit? If it will terminate? If it is hot or cold?

@JakeWharton
Copy link
Member

Java 8 type annotations might be enough here. Of course, it requires an infrastructure around to perform the validation (a la checker framework).

@davidmoten
Copy link
Collaborator Author

How is this knowledge different than not knowing how many elements a sequence is going to emit? If it will terminate? If it is hot or cold?

Now that you mention it, not very different!

I suppose the biggest problem is that an observable constructed in a flatMap is something that won't be constructed till some arbitrary time in the future. I'm forgetting that and it busts the idea with Observable.respectsBackpressure. @JakeWharton 's suggestion may have legs inasmuch as I could get the type system to ensure that inside a flat map construction I have definitely constructed an observable that respects backpressure. I guess it's something that could be revisited once we are using java 8. Not sure if the type system can offer something prior to Java 8 that just involves non-breaking API additions.

@davidmoten
Copy link
Collaborator Author

What about including some backpressure documentation in javadoc for each operator? I guess it needs careful wording because some Operators given backpressure-respecting sources will provide a backpressure-respecting observable but without backpressure-respecting sources perhaps will not. From that point of view the term backpressure aware is probably better than supports backpressure.

At the moment my check for backpressure awareness is to inspect the source and look for patterns that I'm familiar with involving a Producer etc. Conclusive if you know what to look for but not ideal.

@akarnokd
Copy link
Member

akarnokd commented Feb 5, 2015

I could imagine an annotation:

@Backpressure(input = IGNORED, output = RESPECTED)
public final Observable<T> onBackpressureBlock() { }

where input could be one of the following:

  • IGNORED: doesn't matter if upstream does or doesn't support backpressure. Example: onBackpressureBlock.
  • REQUIRED: upstream has to respect backpressure and the operator will never ask for Long.MAX_VALUE. Example: observeOn
  • AMBIVALENT: for intermediate operators that don't themselves do backpressure "airlocking", the downstream's input behavior is reflected upwards. Example: map, filter

and output could be one of the following:

  • NONE: ignores any downstream backpressure requests. Example: just(T)
  • RESPECTED: if downstream uses backpressure or not, the operator works accordingly. Example: range(x, n) | n > 1
  • ALWAYS: The operator will always work in backpressured mode (i.e., no fast paths). Example: zip (always buffers sources).

Connection matrix:

Upstream output Downstream input Behavior
NONE IGNORED Firehose
NONE REQUIRED Likely MissingBackpressureException
NONE AMBIVALENT Depends on deeper downstream's input
RESPECTED IGNORED Firehose
RESPECTED REQUIRED Batch-like execution
RESPECTED AMBIVALENT Depends on deeper downstream's input
ALWAYS IGNORED Batch-like execution
ALWAYS REQUIRED Batch-like execution
ALWAYS AMBIVALENT Depends on deeper downstream's input

Further examples:

@Backpressure(input = REQUIRED, output = ALWAYS)
public final <U, R> Observable<R> zipWith(
    @Backpressure(output = REQUIRED)
    Observable<U> other, 
    Func2<T, U, R> func) { }

@Backpressure(input = IGNORED, output = NONE)
public final class PublishSubject { }

@Backpressure(output = NONE)
public static <T> Observable<T> just(T value) { }

@Backpressure(input = AMBIVALENT, output = RESPECTED)
public final <R> Observable<R> map(Func1<T, R> func) { }

@davidmoten
Copy link
Collaborator Author

Nice idea, I like it. I think Neutral might be a better word than Ambivalent because ambivalent literally means 'mixed emotions'. There are some grey areas that might be nice to represent, one of them being merge which tolerates a lack of backpressure awareness in its sources to a certain extent (can tolerate lots if we bump up system property rx.ring-buffer-size) as opposed to a custom operator that I just made that throws an exception immediately when more arrives than is expected.

So in terms of inputs we might have:

  • IGNORED
  • REQUIRED - will fail if more arrive than requested
  • PREFERRED - may fail if more arrive than requested
  • NEUTRAL - don't care

@benjchristensen
Copy link
Member

How would the use the annotations, just for docs?

We do have notes in the Javadoc about backpressure but they aren't very comprehensive. We call out the temporal operators to say they don't have backpressure support.

@davidmoten
Copy link
Collaborator Author

I don't have a use case apart from documentation at the moment. The annotation will suit me fine or some standardized addition to the javadocs (or both).

@benjchristensen
Copy link
Member

The annotations as documentation seem okay to me. The annotations on the parameters are intriguing, though they feel very intrusive on first impression. We'd only need them though on public APIs where an Observable is a parameter, correct?

@davidmoten
Copy link
Collaborator Author

I'm returning to this issue because it keeps striking me about the codebase as I browse it. Yep I think putting the annotations on the Public API and you suggest putting them on say the static version of merge rather than the instance methods?

@benjchristensen
Copy link
Member

Moving to 2.0 for further discussions.

@akarnokd
Copy link
Member

akarnokd commented Sep 3, 2015

In 2.x, all operators support backpressure but if you aren't requesting fast enough, you'll get a MissingBackpressureException from the operator itself (and not from observeOn/flatMap somewhere down the line...)

@akarnokd
Copy link
Member

akarnokd commented Nov 9, 2015

I've added annotations in 2.x; I can port it back to 1.x and also update the javadoc where the backpressure text is missing.

@davidmoten
Copy link
Collaborator Author

Thanks for pursuing this @akarnokd.

I just wanted to review the data model underlying this.

Every operator/source can be classified with one only of these:

  • BackpressureSupport.FULL_WITH_MBE
  • BackpressureSupport.FULL_NO_MBE
  • BackpressureSupport.PASS_THROUGH
  • BackpressureSupport.NONE

MBE is MissingBackpressureException

And further we could offer information about the requesting behaviour of an operator:

  • RequestBehaviour.REQUESTS_MAX_VALUE

This last is how I'd replace UNBOUNDED_IN.

@davidmoten
Copy link
Collaborator Author

We could also specify when backpressure is not required of a source used by an operator, perhaps by annotating a parameter or annotating a method:

public Observable<T> mergeWith(@Backpressure.SOURCE_REQUIRES Observable<T> source) {
...
}

@Backpressure.SOURCE_REQUIRES
public Observable<T>  onBackpressureBuffer() {
...
}

@akarnokd akarnokd modified the milestones: 2.0 RC 1, 2.0 Jun 19, 2016
@akarnokd
Copy link
Member

The current 2.x contains annotations on the operator methods themselves (some may be missing) and up for discussion/PRs. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@davidmoten
Copy link
Collaborator Author

Thanks, that sounds good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants