-
Notifications
You must be signed in to change notification settings - Fork 7.6k
What's different in 3.0
RxJava 3 lives in the group io.reactivex.rxjava3 with artifact ID rxjava. Official language/platform adaptors will also be located under the group [io.reactivex.rxjava3].
The following examples demonstrate the typical import statements. Please consider the latest version and replace 3.0.0 with the numbers from the badge:
dependencies {
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.0</version>
</dependency>ℹ️ Further references: PR #6421
The 3.x documentation of the various components can be found at
Sub-version specific documentation is available under a version tag, for example
(replace 3.0.0-RC9 with the numbers from the badge: ).
The documentation of the current snapshot is under
For a long time, RxJava was limited to Java 6 API due to how Android was lagging behind in its runtime support. This changed with the upcoming Android Studio 4 previews where a process called desugaring is able to turn many Java 7 and 8 features into Java 6 compatible ones transparently.
This allowed us to increase the baseline of RxJava to Java 8 and add official support for many Java 8 constructs:
-
Stream: usejava.util.stream.Streamas a source or expose sequences as blockingStreams. - Stream
Collectors: aggregate items into collections specified by standard transformations. -
Optional: helps with the non-nullness requirement of RxJava -
CompletableFuture: consumeCompletableFutures non-blockingly or expose single results asCompletableFutures. - Use site non-null annotation: helps with some functional types be able to return null in specific circumstances.
However, some features won't be supported:
-
java.time.Duration: would add a lot of overloads; can always be decomposed into thetime+unitmanually. -
java.util.function: these can't throwThrowables, overloads would create bloat and/or ambiguity
Due to the state of the Android Desugar tooling, as of writing this page, internals of pre-existing, non Java 8-related RxJava operators do not use Java 8 constructs or types. This allows using these "older" operators with Android API levels where the desugaring tool doesn't provide automatic Java 8 backports of various constructs.
ℹ️ Further references: Issue #6695, PR #6765, other PRs
RxJava 3 components are located under the io.reactivex.rxjava3 package (RxJava 1 has rx and RxJava 2 is just io.reactivex. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable, Observer, etc.) have been moved to io.reactivex.rxjava3.core.
| Component | RxJava 2 | RxJava 3 |
|---|---|---|
| Core | io.reactivex |
io.reactivex.rxjava3.core |
| Annotations | io.reactivex.annotations |
io.reactivex.rxjava3.annotations |
| Disposables | io.reactivex.disposables |
io.reactivex.rxjava3.disposables |
| Exceptions | io.reactivex.exceptions |
io.reactivex.rxjava3.exceptions |
| Functions | io.reactivex.functions |
io.reactivex.rxjava3.functions |
| Flowables | io.reactivex.flowables |
io.reactivex.rxjava3.flowables |
| Observables | io.reactivex.observables |
io.reactivex.rxjava3.observables |
| Subjects | io.reactivex.subjects |
io.reactivex.rxjava3.subjects |
| Processors | io.reactivex.processors |
io.reactivex.rxjava3.processors |
| Observers | io.reactivex.observers |
io.reactivex.rxjava3.observers |
| Subscribers | io.reactivex.subscribers |
io.reactivex.rxjava3.subscribers |
| Parallel | io.reactivex.parallel |
io.reactivex.rxjava3.parallel |
| Internal | io.reactivex.internal |
io.reactivex.rxjava3.internal |
Due to naming matches, IDE's tend to import java.util.Observable instead of picking RxJava's io.reactivex.rxjava3.core.Observable. One can usually have the IDE ignore java.util.Observable and java.util.Observer, or otherwise, specify an explicit import io.reactivex.rxjava3.core.Observable; in the affected files.
Also since RxJava 3 now requires a Java 8 runtime, the standard library functional interfaces, such as java.util.function.Function, may be picked instead of io.reactivex.rxjava3.functions.Function. IDEs tend to give a non-descriptive errors such as "Function can't be converted to Function", omitting the fact about the package differences.
ℹ️ Further references: PR #6621
Sometimes, the design of components and operators turn out to be inadequate, too limited or wrong in some circumstances. Major releases such as this allows us to make the necessary changes that would have caused all sorts of problems in a patch release.
The purpose of the connectable types (ConnectableFlowable and ConnectableObservable) is to allow one or more consumers to be prepared before the actual upstream gets streamed to them upon calling connect(). This worked correctly for the first time, but had some trouble if the upstream terminated instead of getting disconnected. In this terminating case, depending on whether the connectable was created with replay() or publish(), fresh consumers would either be unable to receive items from a new connection or would miss items altogether.
With 3.x, connectables have to be reset explicitly when they terminate. This extra steps allows consumers to receive cached items or be prepared for a fresh connection.
With publish, if the connectable terminates, consumers subscribing later will only receive the terminal event. One has to call reset() so that a late consumer will receive items from a fresh connection.
ConnectableFlowable<Integer> connectable = Flowable.range(1, 10).publish();
// prepare consumers, nothing is signaled yet
connectable.subscribe(/* ... */);
connectable.subscribe(/* ... */);
// connect, current consumers will receive items
connectable.connect();
// let it terminate
Thread.sleep(2000);
// late consumers now will receive a terminal event
connectable.subscribe(item -> { }, error -> { }, () -> System.out.println("Done!"));
// reset the connectable to appear fresh again
connectable.reset();
// fresh consumers, they will also be ready to receive
connectable.subscribe(System.out::println, error -> { }, () -> System.out.println("Done!"));
// connect, the fresh consumer now gets the new items
connectable.connect();With replay, if the connectable terminates, consumers subscribing later will receive the cached items. One has to call reset to discard this cache so that late consumers can then receive fresh items.
ConnectableFlowable<Integer> connectable = Flowable.range(1, 10).replay();
// prepare consumers, nothing is signaled yet
connectable.subscribe(System.out::println);
connectable.subscribe(System.out::println);
// connect, current consumers will receive items
connectable.connect();
// let it terminate
Thread.sleep(2000);
// late consumers will still receive the cached items
connectable.subscribe(System.out::println, error -> { }, () -> System.out.println("Done!"));
// reset the connectable to appear fresh again
connectable.reset();
// fresh consumers, they will also be ready to receive
connectable.subscribe(System.out::println, error -> { }, () -> System.out.println("Done!"));
// connect, the fresh consumer now gets the new items
connectable.connect();ℹ️ Further references: Issue #5628, PR #6519
The implementation of Flowable.publish hosts an internal queue to support backpressure from its downstream. In 2.x, this queue, and consequently the upstream source, was slowly draining on its own if all of the resulting ConnectableFlowable's consumers have cancelled. This caused unexpected item loss when the lack of consumers was only temporary.
With 3.x, the implementation pauses and items already in the internal queue will be immediately available to consumers subscribing a bit later.
ConnectableFlowable<Integer> connectable = Flowable.range(1, 200).publish();
connectable.connect();
// the first consumer takes only 50 items and cancels
connectable.take(50).test().assertValueCount(50);
// with 3.x, the remaining items will be still available
connectable.test().assertValueCount(150);Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava