Skip to content

Latest commit

 

History

History
332 lines (264 loc) · 14.3 KB

1. Side effects.md

File metadata and controls

332 lines (264 loc) · 14.3 KB

PART 3 - Taming the sequence

So far we've learned how to create observables and how to extract relevant data from observables. In this chapter we will go beyond what is necessary for simple examples and discuss more advanced functionality, as well as some good practices for using Rx in bigger applications.

Side effects

Functions without side-effects interact with the rest of the program exclusively through their arguments and return values. When the operations within a function can affect the outcome of another function (or a subsequent call to the same function), we say that the function has side effects. Common side effects are writes to storage, logging, debugging or prints to a user interface. A more language-dependent form of side effect is the ability to modify the state of an object that is visible to other functions, which is something that Java considers legal. A function passed as an argument to an Rx operator can modify values in a wider scope, perform IO operations or update a display.

Side effects can be very useful and are unavoidable in many cases. But they also have pitfalls. Rx developers are encouraged to avoid unnecessary side effects, and to have a clear intention when they do use them. While some cases are justified, abuse introduces unnecessary hazards.

Issues with side effects

Functional programming in general tries to avoid creating any side effects. Functions with side effects, especially which modify state, require the programmer to understand more than just the inputs and outputs of the function. The surface area they are required to understand needs to now extend to the history and context of the state being modified. This can greatly increase the complexity of a function, and thus make it harder to correctly understand and maintain. Side effects are not always accidental, nor are they always intentional. An easy way to reduce the accidental side effects is to reduce the surface area for change. The simple actions coders can take are to reduce the visibility or scope of state and to make what you can immutable. You can reduce the visibility of a variable by scoping it to a code block like a method. You can reduce visibility of class members by making them private or protected. By definition immutable data can't be modified so cannot exhibit side effects. These are sensible encapsulation rules that will dramatically improve the maintainability of your Rx code.

We start with an example of an implementation with a side effect. Java doesn't allow references to non-final variables from lambdas (or anonymous implementations in general). However, the final keyword in Java protects only the reference and not the state of the referred object. Nothing stops you from modifying the state of objects from your lambda. Consider this simple counter, that is implemented as an object, rather than a primitive int.

class Inc {
	private int count = 0;
	public void inc() { 
		count++;
	}
	public int getCount() {
		return count;
	}
}

An instance of Inc can have its state modified even if it is declared as final. We are going to use this to index the items of an observable. Note that, while Java didn't force us to explicitly declare it as final, it would produce an error if we tried to change the reference while also using the reference in our lambda.

Observable<String> values = Observable.just("No", "side", "effects", "please");
		
Inc index = new Inc();
Observable<String> indexed = 
		values.map(w -> {
			index.inc();
			return w;
		});
indexed.subscribe(w -> System.out.println(index.getCount() + ": " + w));

Output

1: No
2: side
3: effects
4: please

So far it appears ok. Let's see what happens when we try to subscribe to that observable a second time.

Observable<String> values = Observable.just("No", "side", "effects", "please");
		
Inc index = new Inc();
Observable<String> indexed = 
		values.map(w -> {
			index.inc();
			return w;
		});
indexed.subscribe(w -> System.out.println("1st observer: " + index.getCount() + ": " + w));
indexed.subscribe(w -> System.out.println("2nd observer: " + index.getCount() + ": " + w));

Output

1st observer: 1: No
1st observer: 2: side
1st observer: 3: effects
1st observer: 4: please
2nd observer: 5: No
2nd observer: 6: side
2nd observer: 7: effects
2nd observer: 8: please

The second subscriber sees the indexing starting at 5, which is non-sense. While the bug here is straight-forward to discover, side effects can lead to bugs which are a lot more subtle.

Composing data in a pipeline

The safest way to use state in Rx is to include it in the data emitted. We can pair items with their indices using scan.

class Indexed <T> {
	public final int index;
	public final T item;
	public Indexed(int index, T item) {
		this.index = index;
		this.item = item;
	}
}
Observable<String> values = Observable.just("No", "side", "effects", "please");

Observable<Indexed<String>> indexed = 
	values.scan(
			new Indexed<String>(0, null), 
			(prev,v) -> new Indexed<String>(prev.index+1, v))
		.skip(1);
indexed.subscribe(w -> System.out.println("1st observer: " + w.index + ": " + w.item));
indexed.subscribe(w -> System.out.println("2nd observer: " + w.index + ": " + w.item));

Output

1st observer: 1: No
1st observer: 2: side
1st observer: 3: effects
1st observer: 4: please
2nd observer: 1: No
2nd observer: 2: side
2nd observer: 3: effects
2nd observer: 4: please

The result now is valid. We removed the shared state between the two subscriptions and now they can't affect each other.

do

There are cases where we do want a side effect, for example when logging. The subscribe method always has a side effect, otherwise it is not useful. We could put our logging in the body of a subscriber but then we would have two disadvantages:

  1. We are mixing the less interesting code of logging with the critical code of our subscription
  2. If we wanted to log an intermediate state in our pipeline, e.g. before and after mapping, we would have to to introduce an additional subscription just for that, which won't necessarily see exactly what the consumer saw and at the time when they saw it.

The next family of methods helps us declare side effects in a tidier manner.

public final Observable<T> doOnCompleted(Action0 onCompleted)
public final Observable<T> doOnEach(Action1<Notification<? super T>> onNotification)
public final Observable<T> doOnEach(Observer<? super T> observer)
public final Observable<T> doOnError(Action1<java.lang.Throwable> onError)
public final Observable<T> doOnNext(Action1<? super T> onNext)
public final Observable<T> doOnTerminate(Action0 onTerminate)

As we can see, they take actions to perform when items are emitted. They also return the Observable<T>, which means that we can use them between operators in our pipeline. In some cases, you could achieve the same result using map or filter. Using doOn* is better because it documents your intention to have a side effect. Here's an example

Observable<String> values = Observable.just("side", "effects");
		
values
	.doOnEach(new PrintSubscriber("Log"))
	.map(s -> s.toUpperCase())
	.subscribe(new PrintSubscriber("Process"));

Output

Log: side
Process: SIDE
Log: effects
Process: EFFECTS
Log: Completed
Process: Completed

We reused our convenient PrintSubscriber from previous chapters. The "do" methods are not affected by the transformations later in the pipeline. We can log what our service produces regardless of what the consumer actually consumes. Consider the following service:

static Observable<String> service() {
	return	Observable.just("First", "Second", "Third")
			.doOnEach(new PrintSubscriber("Log"));
}

Then we use it:

service()
	.map(s -> s.toUpperCase())
	.filter(s -> s.length() > 5)
	.subscribe(new PrintSubscriber("Process"));

Output

Log: First
Log: Second
Process: SECOND
Log: Third
Log: Completed
Process: Completed

We logged everything that our service produced, even though the consumer modified and filtered the results.

The differences between the different variants for "do" should be apparent by this point. In summary:

  • doOnEach runs when any notification is emitted
  • doOnNext runs when a value is emitted
  • doOnError runs when the observable terminates with an error
  • doOnCompleted runs when the observable terminates with no error
  • doOnTerminate runs when the observable terminates

One special note is the onTerminate, which runs right before the observable terminates with either onCompleted or onError. There is also the method finallyDo, which will run immediately after the observable terminates.

doOnSubscribe, doOnUnsubscribe

public final Observable<T> doOnSubscribe(Action0 subscribe)
public final Observable<T> doOnUnsubscribe(Action0 unsubscribe)

Subscription and unsubscription are not events that are emitted by an observable. They can still be seen as events in a general sense and you may want to perform some actions when they occur. Most likely, you'll be using them for logging purposes.

ReplaySubject<Integer> subject = ReplaySubject.create();
Observable<Integer> values = subject
	.doOnSubscribe(() -> System.out.println("New subscription"))
	.doOnUnsubscribe(() -> System.out.println("Subscription over"));

Subscription s1 = values.subscribe(new PrintSubscriber("1st"));
subject.onNext(0);
Subscription s2 = values.subscribe(new PrintSubscriber("2st"));
subject.onNext(1);
s1.unsubscribe();
subject.onNext(2);
subject.onNext(3);
subject.onCompleted();

Output

New subscription
1st: 0
New subscription
2st: 0
1st: 1
2st: 1
Subscription over
2st: 2
2st: 3
2st: Completed
Subscription over

Encapsulating with AsObservable

Rx is designed in the style of functional programming, but it exists within an object-oriented environment. We also have to protect against object-oriented dangers. Consider this naive implementation for a service that returns an observable.

public class BrakeableService {
	public BehaviorSubject<String> items = BehaviorSubject.create("Greet");
	public void play() {
		items.onNext("Hello");
		items.onNext("and");
		items.onNext("goodbye");
	}
}

The code above does not prevent a naughty consumer from changing your items with one of their own. After that happens, subscriptions done before the change will no longer receive items, because you are not calling onNext on the right Subject any more. We obviously need to hide access to our Subject

public class BrakeableService {
	private final BehaviorSubject<String> items = BehaviorSubject.create("Greet");
	
	public BehaviorSubject<String> getValues() {
		return items;
	}
	
	public void play() {
		items.onNext("Hello");
		items.onNext("and");
		items.onNext("goodbye");
	}
}

Now our reference is safe, but we are still exposing a reference to a Subject. Anyone can call onNext on our Subject and inject values in our sequence. We should only return Observable<T>, which is an immutable object. Subjects extend Observable and we can cast our subject

public Observable<String> getValuesUnsafe() {
	return items;
}

Our API now looks safe, but it isn't. Nothing is stopping a user from discovering that our Observable is actually a Subject (e.g. using instanceof), casting it to a Subject and using it like previously.

asObservable

The idea behind the asObservable method is to wrap extensions of Observable into an actual Observable that can be safely shared, since Observable is immutable.

public Observable<String> getValues() {
	return items.asObservable();
}

Now we have properly protected our Subject. This protection is not only against malicious attacks but also against mistakes. We have mentioned before that subjects should be avoided when alternatives exist, and now we've seen examples of why. Subjects introduce state to our observables. Calls to onNext, onCompleted and onError alter the sequence that consumers will see. Observables that are constructed with any of the factory methods or operators exposed on Observable are immutable, provided that we don't introduce side-effects ourselves, as we saw in Issues with side effects.

You can find the full code of the examples discussed here

Mutable elements cannot be protected

As one might expect, an Rx pipeline forwards references to objects and doesn't create copies (unless we do so ourselves in the functions we supply). Modifications to the objects will be visible to every position in the pipeline that uses them. Consider the following mutable class:

class Data {
	public int id;
	public String name;
	public Data(int id, String name) {
		this.id = id;
		this.name = name;
	}
}

Now we show an observable of that type and two subscribers.

Observable<Data> data = Observable.just(
	new Data(1, "Microsoft"),
	new Data(2, "Netflix")
);

data.subscribe(d -> d.name = "Garbage");
data.subscribe(d -> System.out.println(d.id + ": " + d.name));

Output

1: Garbage
2: Garbage

The first subscriber is the first to be called for each item. Its action is to modify the data. Once the first subscriber is done, the same reference is also passed to the second subscriber, only now the data is changed in a way that was not declared in the producer. A developer needs to have a deep understanding of Rx, Java and their environment in order to reason about the sequence of modifications, and then argue that such code would run according to a plan. It is simpler to avoid mutable state altogether. Observables should be seen as a sequence notifications about resolved events.

Continue reading

Previous Next
Transformation of sequences Leaving the monad