Skip to content

What's different in 3.0

David Karnok edited this page Feb 13, 2020 · 85 revisions

Table of contents

Introduction

Maven coordinates

RxJava 3 lives in the group io.reactivex.rxjava3 with artifact ID rxjava. Official language/platform adaptors will also be located under the group [io.reactivex.rxjava3].

The following examples demonstrate the typical import statements. Please consider the latest version and replace 3.0.0 with the numbers from the badge: Maven Central

Gradle import

dependencies {
  implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}

Maven import

<dependency>
  <groupId>io.reactivex.rxjava3</groupId>
  <artifactId>rxjava</artifactId>
  <version>3.0.0</version>
</dependency>

ℹ️ Further references: PR #6421

JavaDocs

The 3.x documentation of the various components can be found at

Sub-version specific documentation is available under a version tag, for example

(replace 3.0.0-RC9 with the numbers from the badge: Maven Central).

The documentation of the current snapshot is under

Java 8

For a long time, RxJava was limited to Java 6 API due to how Android was lagging behind in its runtime support. This changed with the upcoming Android Studio 4 previews where a process called desugaring is able to turn many Java 7 and 8 features into Java 6 compatible ones transparently.

This allowed us to increase the baseline of RxJava to Java 8 and add official support for many Java 8 constructs:

  • Stream: use java.util.stream.Stream as a source or expose sequences as blocking Streams.
  • Stream Collectors: aggregate items into collections specified by standard transformations.
  • Optional: helps with the non-nullness requirement of RxJava
  • CompletableFuture: consume CompletableFutures non-blockingly or expose single results as CompletableFutures.
  • Use site non-null annotation: helps with some functional types be able to return null in specific circumstances.

However, some features won't be supported:

  • java.time.Duration: would add a lot of overloads; can always be decomposed into the time + unit manually.
  • java.util.function: these can't throw Throwables, overloads would create bloat and/or ambiguity

⚠️ Note on the internal Java 8 support

Due to the state of the Android Desugar tooling, as of writing this page, internals of pre-existing, non Java 8-related RxJava operators do not use Java 8 constructs or types. This allows using these "older" operators with Android API levels where the desugaring tool doesn't provide automatic Java 8 backports of various constructs.

ℹ️ Further references: Issue #6695, PR #6765, other PRs

Package structure

RxJava 3 components are located under the io.reactivex.rxjava3 package (RxJava 1 has rx and RxJava 2 is just io.reactivex. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable, Observer, etc.) have been moved to io.reactivex.rxjava3.core.

Component RxJava 2 RxJava 3
Core io.reactivex io.reactivex.rxjava3.core
Annotations io.reactivex.annotations io.reactivex.rxjava3.annotations
Disposables io.reactivex.disposables io.reactivex.rxjava3.disposables
Exceptions io.reactivex.exceptions io.reactivex.rxjava3.exceptions
Functions io.reactivex.functions io.reactivex.rxjava3.functions
Flowables io.reactivex.flowables io.reactivex.rxjava3.flowables
Observables io.reactivex.observables io.reactivex.rxjava3.observables
Subjects io.reactivex.subjects io.reactivex.rxjava3.subjects
Processors io.reactivex.processors io.reactivex.rxjava3.processors
Observers io.reactivex.observers io.reactivex.rxjava3.observers
Subscribers io.reactivex.subscribers io.reactivex.rxjava3.subscribers
Parallel io.reactivex.parallel io.reactivex.rxjava3.parallel
Internal io.reactivex.internal io.reactivex.rxjava3.internal

⚠️ Note on running "Organize Imports" from the IDE

Due to naming matches, IDE's tend to import java.util.Observable instead of picking RxJava's io.reactivex.rxjava3.core.Observable. One can usually have the IDE ignore java.util.Observable and java.util.Observer, or otherwise, specify an explicit import io.reactivex.rxjava3.core.Observable; in the affected files.

Also since RxJava 3 now requires a Java 8 runtime, the standard library functional interfaces, such as java.util.function.Function, may be picked instead of io.reactivex.rxjava3.functions.Function. IDEs tend to give a non-descriptive errors such as "Function can't be converted to Function", omitting the fact about the package differences.

ℹ️ Further references: PR #6621

Behavior changes

Sometimes, the design of components and operators turn out to be inadequate, too limited or wrong in some circumstances. Major releases such as this allows us to make the necessary changes that would have caused all sorts of problems in a patch release.

Connectable source reset

The purpose of the connectable types (ConnectableFlowable and ConnectableObservable) is to allow one or more consumers to be prepared before the actual upstream gets streamed to them upon calling connect(). This worked correctly for the first time, but had some trouble if the upstream terminated instead of getting disconnected. In this terminating case, depending on whether the connectable was created with replay() or publish(), fresh consumers would either be unable to receive items from a new connection or would miss items altogether.

With 3.x, connectables have to be reset explicitly when they terminate. This extra steps allows consumers to receive cached items or be prepared for a fresh connection.

publish-reset example

With publish, if the connectable terminates, consumers subscribing later will only receive the terminal event. One has to call reset() so that a late consumer will receive items from a fresh connection.

ConnectableFlowable<Integer> connectable = Flowable.range(1, 10).publish();

// prepare consumers, nothing is signaled yet
connectable.subscribe(/* ... */);
connectable.subscribe(/* ... */);

// connect, current consumers will receive items
connectable.connect();

// let it terminate
Thread.sleep(2000);

// late consumers now will receive a terminal event
connectable.subscribe(item -> { }, error -> { }, () -> System.out.println("Done!"));

// reset the connectable to appear fresh again
connectable.reset();

// fresh consumers, they will also be ready to receive
connectable.subscribe(System.out::println, error -> { }, () -> System.out.println("Done!"));

// connect, the fresh consumer now gets the new items
connectable.connect();

replay-reset example

With replay, if the connectable terminates, consumers subscribing later will receive the cached items. One has to call reset to discard this cache so that late consumers can then receive fresh items.

ConnectableFlowable<Integer> connectable = Flowable.range(1, 10).replay();

// prepare consumers, nothing is signaled yet
connectable.subscribe(System.out::println);
connectable.subscribe(System.out::println);

// connect, current consumers will receive items
connectable.connect();

// let it terminate
Thread.sleep(2000);

// late consumers will still receive the cached items
connectable.subscribe(System.out::println, error -> { }, () -> System.out.println("Done!"));

// reset the connectable to appear fresh again
connectable.reset();

// fresh consumers, they will also be ready to receive
connectable.subscribe(System.out::println, error -> { }, () -> System.out.println("Done!"));

// connect, the fresh consumer now gets the new items
connectable.connect();

ℹ️ Further references: Issue #5628, PR #6519

Flowable.publish pause

The implementation of Flowable.publish hosts an internal queue to support backpressure from its downstream. In 2.x, this queue, and consequently the upstream source, was slowly draining on its own if all of the resulting ConnectableFlowable's consumers have cancelled. This caused unexpected item loss when the lack of consumers was only temporary.

With 3.x, the implementation pauses and items already in the internal queue will be immediately available to consumers subscribing a bit later.

publish-pause example

ConnectableFlowable<Integer> connectable = Flowable.range(1, 200).publish();

connectable.connect();

// the first consumer takes only 50 items and cancels
connectable.take(50).test().assertValueCount(50);

// with 3.x, the remaining items will be still available
connectable.test().assertValueCount(150);

ℹ️ Further references: Issue #5899, PR #6519

Processor.offer null-check

Calling PublishProcessor.offer(), BehaviorProcessor.offer() or MulticastProcessor.offer with a null argument now throws a NullPointerException instead of signaling it via onError and thus terminating the processor. This now matches the behavior of the onNext method required by the Reactive Streams specification.

offer-example

PublishProcessor<Integer> pp = PublishProcessor.create();

TestSubscriber<Integer> ts = pp.test();

try {
   pp.offer(null);
} catch (NullPointerException expected) {
}

// no error received
ts.assertEmpty();

pp.offer(1);

// consumers are still there to receive proper items
ts.asssertValuesOnly(1);

ℹ️ Further references: PR #6799

API changes

New Types

Moved components

API additions

API expansions

API removals

Miscellaneous

Clone this wiki locally