Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
357 lines (288 sloc) 13.9 KB

Backpressure

Rx leads events from one end of a pipeline to the other. The actions which take place on each end can be very dissimilar. What happens when the producer and the consumer require different amounts of time to process a value? In a synchronous model, this question isn't an issue. Consider the following example:

// Produce
Observable<Integer> producer = Observable.create(o -> {
	o.onNext(1);
	o.onNext(2);
	o.onCompleted();
});
// Consume
producer.subscribe(i -> {
	try {
		Thread.sleep(1000);
		System.out.println(i);
	} catch (Exception e) {	}
});

Here, the producer has its values ready and can emit them with no delay. The consumer is very slow by comparison, but this isn't going to cause problems, because the synchronous nature of the code above automatically regulates the rates of the producer and consumer. When o.onNext(1); is called, execution for the producer is blocked until the entire Rx chain completes. Only when that expression returns can the execution proceed to o.onNext(2);.

This works like that only for synchronous execution. It is very common for the producer and the consumer to be asynchronous. So, what happens when a producer and a consumer operate asynchronously at different speeds?

Let's first consider the traditional pull-based model, such as an iterator. In a pull-based model, the consumer requests the values. If the producer is slower, the consumer will block on request and resume when the next value arrives. If the producer is faster, then the producer will have idle time waiting for the consumer to request the next value.

Rx push-based, not pull-based. In Rx, it is the producer that pushes values to the consumer when the values are ready. If the producer is slower, then the consumer will have idle time waiting for the next value to arrive. If the producer is faster, without any provisions, it will keep force-feeding data to consumer without ever knowing about the consumer's difficulties.

Observable.interval(1, TimeUnit.MILLISECONDS)
	.observeOn(Schedulers.newThread())
	.subscribe(
		i -> {
			System.out.println(i);
			try {
				Thread.sleep(100);
			} catch (Exception e) {	}
		},
		System.out::println);

Output

0
1
rx.exceptions.MissingBackpressureException

Here, the MissingBackpressureException is letting us know that the producer is too fast and the operators that we chained together can't deal with it.

Remedies for the consumer

Some of the operators we've seen in previous chapters can help the consumer lessen the stress caused by too much input.

Thin out the data

The [sample](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#sample) operator naturally allows you to specify a maximum rate of input, leaving out any excess data.

Observable.interval(1, TimeUnit.MILLISECONDS)
	.observeOn(Schedulers.newThread())
	.sample(100, TimeUnit.MILLISECONDS)
	.subscribe(
		i -> {
			System.out.println(i);
			try {
				Thread.sleep(100);
			} catch (Exception e) {	}
		},
		System.out::println);

Output

82
182
283
...

The are similar operators that can serve the same purpose.

  • The [throttle](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#throttling) family of operators also filters on rate, but allows you to specify in a different way which element to let through when stressed.
  • [Debounce](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#debouncing) does not cut the rate to a fixed maximum. Instead, it will completely remove every burst of information and replace it with a single value.

Collect

Instead of sampling the data, you can use buffer and window to collect overflowing data while the consumer is busy. This is useful if processing items in batches is faster. Alternatively, you can inspect the buffer to manually decide how many and which of the buffered items to process.

In the example that we saw previously, the consumer processes single items and bulks at practically the same speed. Here we slowed down the producer to make the batches fit a line, but the principle remains the same.

Observable.interval(10, TimeUnit.MILLISECONDS)
	.observeOn(Schedulers.newThread())
	.buffer(100, TimeUnit.MILLISECONDS)
	.subscribe(
		i -> {
			System.out.println(i);
			try {
				Thread.sleep(100);
			} catch (Exception e) {	}
		},
		System.out::println);

Output

[0, 1, 2, 3, 4, 5, 6, 7]
[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
...

Reactive pull

The above remedies are legitimate solutions to the problem. However, they aren't always the best way to deal with an overproducing observable. Sometimes the problem can be better handled on the side of the producer. Backpressure is a way for the pipeline to resist the emission of values.

Back pressure refers to pressure opposed to the desired flow of a fluid in a confined place such as a pipe. It is often caused by obstructions or tight bends in the confinement vessel along which it is moving, such as piping or air vents. - Wikipedia

RxJava has implemented a way for a subscriber to regulate the rate of an observable. The Subscriber has a request(n) method, with which it notifies the observable that it is ready to accept n more values. By calling request on the onStart method of your Subscriber, you establish reactive pull backpressure. This isn't a pull in the sense of a pull-based model: it doesn't return any values and will not block if values are not ready. Instead, it merely notifies the observable of how many values the Subscriber is ready to accept and to hold the rest. Subsequent calls to request will allow more values through.

This is a Subscriber that takes values one at a time:

class MySubscriber extends Subscriber<T> {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
        ...
    }

    @Override
    public void onError(Throwable e) {
        ...
    }

    @Override
    public void onNext(T n) {
        ...
        request(1);
    }
}

The request(1) in onStart establishes backpressure and informs the observable that it should only emit the first value. After processing the value in onNext, we request the next item to be sent, if and when it is available. Calling request(Long.MAX_VALUE) disables backpressure.

doOnRequested

Back when we were discussing the doOn_ operators for side effects, we left out doOnRequested.

public final Observable<T> doOnRequest(Action1<java.lang.Long> onRequest)

The doOnRequested meta-event happens when a subscriber requests for more items. The value supplied to the action is the number of items requested.

At this moment, doOnRequest is in beta. In this book, we have been avoiding beta operators. We're making an exception here, because it enables us to peek into stable backpressure functionality that is otherwise hidden. Let's see what happens in a simple observable:

Observable.range(0, 3)
	.doOnRequest(i -> System.out.println("Requested " + i))
	.subscribe(System.out::println);

Output

Requested 9223372036854775807
0
1
2

We see that subscribe requests the maximum number of items from the beginning. This means that subscribe doesn't resist values at all. Subscribe will only use backpressure if we provide a subscriber that implements backpressure. Here is a complete example for a subscriber, which will allow us to control backpressure from the outside.

public class ControlledPullSubscriber<T> extends Subscriber<T> {
	
	private final Action1<T> onNextAction;
	private final Action1<Throwable> onErrorAction;
	private final Action0 onCompletedAction;
	
	public ControlledPullSubscriber(
			Action1<T> onNextAction,
			Action1<Throwable> onErrorAction,
			Action0 onCompletedAction) {
		this.onNextAction = onNextAction;
		this.onErrorAction = onErrorAction;
		this.onCompletedAction = onCompletedAction;
	}
	
	public ControlledPullSubscriber(
			Action1<T> onNextAction,
			Action1<Throwable> onErrorAction) {
		this(onNextAction, onErrorAction, () -> {});
	}
	
	public ControlledPullSubscriber(Action1<T> onNextAction) {
		this(onNextAction, e -> {}, () -> {});
	}
	
    @Override
    public void onStart() {
      request(0);
    }

    @Override
    public void onCompleted() {
    	onCompletedAction.call();
    }

    @Override
    public void onError(Throwable e) {
    	onErrorAction.call(e);
    }

    @Override
    public void onNext(T t) {
    	onNextAction.call(t);
    }
    
    public void requestMore(int n) {
    	request(n);
    }
}

This simple implementation will not request values unless we manually make it do so with requestMore.

ControlledPullSubscriber<Integer> puller = 
		new ControlledPullSubscriber<Integer>(System.out::println);

Observable.range(0, 3)
	.doOnRequest(i -> System.out.println("Requested " + i))
	.subscribe(puller);

puller.requestMore(2);
puller.requestMore(1);

Output

Requested 0
Requested 2
0
1
Requested 1
2

First we requested no emissions (our ControlledPullSubscriber does this in onStart). Then we requested 2 and we got 2 values, then we requested 1 and we got 1.

Rx operators that use queues and buffers internally should use backpressure to avoid storing an infinite amount of values. Large-scale buffering should be left to operators that explicitly serve this purpose, such as cache, buffer etc. An example of an operator that needs to buffer items is zip: the first observable might emit two or more values before the second observable emits its next value. Such small asymmetries are expected even when the two sequences are supposed to have the same frequency. Needing to buffer a couple of items shouldn't cause the operator to fail. For that reason, zip has a small buffer of 128 items.

Observable.range(0, 300)
	.doOnRequest(i -> System.out.println("Requested " + i))
	.zipWith(
			Observable.range(10, 300),
			(i1, i2) -> i1 + " - " + i2)
	.take(300)
	.subscribe();

Output

Requested 128
Requested 90
Requested 90
Requested 90

The zip operator starts by requesting enough items to fill its buffer, and requests more when it consumes them. The details of how many items zip requests isn't interesting. What the reader should take away is the realization that some buffering and backpressure exist in Rx whether the developer requests for it or not. This gives an Rx pipeline some flexibility, where you might expect none. It might trick you into thinking that your code is solid, by silently saving small tests from failing, but you're not safe until you have explicitly declared behaviour with regard to backpressure.

Backpressure policies

Many Rx operators use backpressure internally to avoid overfilling their internal queues. This way, the problem of a slow consumer is propagated backwards in the chain of operators: if an operator stops accepting values, then the previous operator will fill its buffers until it stops accepting values too, and so on. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. We still need to decide what to do with the values of an overproducing observable.

There are Rx operators that declare how you want to deal with situations where a subscriber cannot accept the values that are being emitted.

onBackpressureBuffer

The onBackpressureBuffer operator with cause every value that can't be consumed to be stored until the observer can consume it.

You can have a buffer of infinite size or a buffer with a maximum capacity. If the buffer overflows, the observable will fail.

Observable.interval(1, TimeUnit.MILLISECONDS)
	.onBackpressureBuffer(1000)
	.observeOn(Schedulers.newThread())
	.subscribe(
		i -> {
			System.out.println(i);
			try {
				Thread.sleep(100);
			} catch (Exception e) {	}
		},
		System.out::println
	);

Output

0
1
2
3
4
5
6
7
8
9
10
11
rx.exceptions.MissingBackpressureException: Overflowed buffer of 1000

What happens here is that the producer is 100 times faster than the consumer. We try to deal with that by buffering up to 1000 items. It is easy to calculate that, by the time that the consumer consumes the 11th item, the producer has produced 1100 items, well over our buffer's capacity. The sequence then fails, as it can't deal with the backpressure.

onBackpressureDrop

The onBackpressureDrop operator discards items if they can't be received.

Observable.interval(1, TimeUnit.MILLISECONDS)
	.onBackpressureDrop()
	.observeOn(Schedulers.newThread())
	.subscribe(
		i -> {
			System.out.println(i);
			try {
				Thread.sleep(100);
			} catch (Exception e) {	}
		},
		System.out::println);

Output

0
1
2
...
126
127
12861
12862
...

What we see here is that the first 128 items where consumed normally, but then we jumped forward. The items in-between were dropped by onBackPressureDrop. Even though we did not request it, the first 128 items were still buffered, since observeOn uses a small buffer between switching threads.

Previous Next
Sequences of coincidence