Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
4 contributors

Users who have contributed to this file

@Froussios @Tiltman @stkent @aldoKelvianto
247 lines (190 sloc) 11 KB

Key types

Rx is based around two fundamental types, while several others expand the functionality around the core types. Those two core types are the Observable and the Observer, which will be introduced in this chapter. We will also introduce Subjects, which ease the learning curve.

Rx builds upon the Observer pattern. It is not unique in doing so. Event handling already exists in Java (e.g. JavaFX's EventHandler). Those are simpler approaches, which suffer in comparison to Rx:

  • Events through event handlers are hard to compose.
  • They cannot be queried over time
  • They can lead to memory leaks
  • These is no standard way of signaling completion.
  • Require manual handling of concurrency and multithreading.

Observable

Observable is the first core element that we will see. This class contains a lot of the implementation of Rx, including all of the core operators. We will be examining it step by step throughout this book. For now, we must understand the Subscribe method. Here is one key overload of the method:

public final Subscription subscribe(Subscriber<? super T> subscriber)

This is the method that you use to receive the values emitted by the observable. As the values come to be pushed (through policies that we will discuss throughout this book), they are pushed to the subscriber, which is then responsible for the behaviour intended by the consumer. The Subscriber here is an implementation of the Observer interface.

An observable pushes 3 kinds of events

  • Values
  • Completion, which indicates that no more values will be pushed.
  • Errors, if something caused the sequence to fail. These events also imply termination.

Observer

We already saw one abstract implementation of the Observer, Subscriber. Subscriber implements some extra functionality and should be used as the basis for our implementations of Observer. For now, it is simpler to first understand the interface.

interface Observer<T> {
    void onCompleted();
    void onError(java.lang.Throwable e);
    void onNext(T t);
}

Those three methods are the behaviour that is executed every time the observable pushes a value. The observer will have its onNext called zero or more times, optionally followed by an onCompleted or an onError. No calls happen after a call to onError or onCompleted.

When developing Rx code, you'll see a lot of Observable, but not so much of Observer. While it is important to understand the Observer, there are shorthands that remove the need to instantiate it yourself.

Implementing Observable and Observer

You could manually implement Observer or extend Observable. In reality that will usually be unnecessary, since Rx already provides all the building blocks you need. It is also dangerous, as interaction between parts of Rx includes conventions and internal plumbing that are not obvious to a beginner. It is both simpler and safer to use the many tools that Rx gives you for generating the functionality that you need.

To subscribe to an observable, it is not necessary to provide instances of Observer at all. There are overloads to subscribe that simply take the functions to be executed for onNext, onError and onSubscribe, hiding away the instantiation of the corresponding Observer. It is not even necessary to provide each of those functions. You can provide a subset of them, i.e. just onNext or just onNext and onError.

The introduction of lambda functions in Java 1.8 makes these overloads very convenient for the short examples that exist in this book.

Subject

Subjects are an extension of the Observable that also implements the Observer interface. The idea may sound odd at first, but they make things a lot simpler in some cases. They can have events pushed to them (like observers), which they then push further to their own subscribers (like observables). This makes them ideal entry points into Rx code: when you have values coming in from outside of Rx, you can push them into a Subject, turning them into an observable. You can think of them as entry points to an Rx pipeline.

Subject has two parameter types: the input type and the output type. This was designed so for the sake of abstraction and not because the common uses for subjects involve transforming values. There are transformation operators to do that, which we will see later.

There are a few different implementations of Subject. We will now examine the most important ones and their differences.

PublishSubject

PublishSubject is the most straight-forward kind of subject. When a value is pushed into a PublishSubject, the subject pushes it to every subscriber that is subscribed to it at that moment.

public static void main(String[] args) {
	PublishSubject<Integer> subject = PublishSubject.create();
	subject.onNext(1);
	subject.subscribe(System.out::println);
	subject.onNext(2);
	subject.onNext(3);
	subject.onNext(4);
}

Output

2
3
4

As we can see in the example, 1 isn't printed because we weren't subscribed when it was pushed. After we subscribed, we began receiving the values that were pushed to the subject.

This is the first time we see subscribe being used, so it is worth paying attention to how it was used. In this case, we used the overload which expects one Function for the case of onNext. That function takes an argument of type Integer and returns nothing. Functions without a return type are also called actions. We can provide that function in different ways:

  • we can supply an instance of Action1<Integer>,
  • implicitly create one using a lambda expression or
  • pass a reference to an existing method that fits the signature. In this case, System.out::println has an overload that accepts Object, so we passed a reference to it. subscribe will call println with the arriving values as the argument.

ReplaySubject

ReplaySubject has the special feature of caching all the values pushed to it. When a new subscription is made, the event sequence is replayed from the start for the new subscriber. After catching up, every subscriber receives new events as they come.

ReplaySubject<Integer> s = ReplaySubject.create();	
s.subscribe(v -> System.out.println("Early:" + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late: " + v));	
s.onNext(2);

Output

Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2

All the values are received by the subscribers, even though one was late. Also notice that the late subscriber had everything replayed to it before proceeding to the next value.

Caching everything isn't always a good idea, as an observable sequence can run for a long time. There are ways to limit the size of the internal buffer. ReplaySubject.createWithSize limits the size of the buffer, while ReplaySubject.createWithTime limits how long an object can stay cached.

ReplaySubject<Integer> s = ReplaySubject.createWithSize(2);	
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));	
s.onNext(3);

Output

Late: 1
Late: 2
Late: 3

Our late subscriber now missed the first value, which fell off the buffer of size 2. Similarily, old values fall off the buffer as time passes, when the subject is created with createWithTime

ReplaySubject<Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS,
                                                        Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));	
s.onNext(3);

Output

Late: 1
Late: 2
Late: 3

Creating a ReplaySubject with time requires a Scheduler, which is Rx's way of keeping time. Feel free to ignore this for now, as we will properly introduce schedulers in the chapter about concurrency.

ReplaySubject.createWithTimeAndSize limits both, which ever comes first.

BehaviorSubject

BehaviorSubject only remembers the last value. It is similar to a ReplaySubject with a buffer of size 1. An initial value can be provided on creation, therefore guaranteeing that a value always will be available immediately on subscription.

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));	
s.onNext(3);

Output

Late: 2
Late: 3

The following example just completes, since that is the last event.

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
s.subscribe(
	v -> System.out.println("Late: " + v),
	e -> System.out.println("Error"),
	() -> System.out.println("Completed")
);

An initial value is provided to be available if anyone subscribes before the first value is pushed.

BehaviorSubject<Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);

Output

0
1

Since the defining role of a BehaviorSubject is to always have a value readily available, it is unusual to create one without an initial value. It is also unusual to terminate one.

AsyncSubject

AsyncSubject also caches the last value. The difference now is that it doesn't emit anything until the sequence completes. Its use is to emit a single value and immediately complete.

AsyncSubject<Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();

Output

2

Note that, if we didn't do s.onCompleted();, this example would have printed nothing.

Implicit contracts

As we already mentioned, there are contracts in Rx that are not obvious in the code. An important one is that no events are emitted after a termination event (onError or onCompleted). The implemented subjects respect that, and the subscribe method also prevents some violations of the contract.

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);

Output

0

Safety nets like these are not guaranteed in the entirety of the implementation of Rx. It is best that you are mindful not to violate the contract, as this may lead to undefined behaviour.

Continue reading

Previous Next
Why Rx Lifetime management
You can’t perform that action at this time.