@benjchristensen benjchristensen released this Nov 18, 2014 · 1836 commits to 2.x since this release

Assets 2

After 2+ years of internal and open source development, 3600+ commits, 100+ releases, and with the help of 97 contributors RxJava has hit version 1.0.0.

Thank you @headinthebox @zsxwing @samuelgruetter @akarnokd @quidryan @DavidMGross @abersnaze @jmhofer @mairbek @mttkay @daveray @mattrjacobs @michaeldejong @MarioAriasC @johngmyers @pron @jbripley @davidmoten @gliptak @johnhmarks @jloisel @billyy @prabirshrestha @ragalie @abliss @dpsm @daschl @thegeez and the many other contributors and those who have reported bugs, tweeted, blogged or presented about RxJava.

The quality of this release could not have been achieved without all of your help. Thank you for your involvement and for building a community around this project.

JVM Language Adaptors & Subprojects

As of 1.0 the JVM language adapters and other subprojects no longer live under RxJava but have been separated out into their own projects with their own release cycles:

Versioning

Version 1.x is now a stable API and will be supported for several years.

Minor 1.x increments (such as 1.1, 1.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 1.x.y increments (such as 1.0.0 -> 1.0.1, 1.3.1 -> 1.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload).

Roadmap and Known Issues

  • 1.0.x milestone with known issues
  • 1.1 milestone with additional support for reactive pull backpressure
  • 1.x is a catch all for other items that may be pursued in 1.2, 1.3 and later versions.

Change Log

  • all deprecated methods and types from v0.20 and earlier are deleted
  • now published to groupId io.reactivex instead of com.netflix.rxjava
  • artifactId is now rxjava instead of rxjava-core
io.reactivex:rxjava:1.0.0

Following are specific changes from 0.20 to 1.0 to be aware of:

groupBy/groupByUntil

The groupByUntil operator was removed by collapsing its behavior into groupBy. Previously on groupBy when a child GroupedObservable was unsubscribed it would internally retain the state and ignore all future onNext for that key.

This matched behavior in Rx.Net but was found to be non-obvious and almost everyone using groupBy on long-lived streams actually wanted the behavior of groupByUntil where an unsubscribed GroupedObservable would clean up the resources and then if onNext for that key arrived again a new GroupedObservable would be emitted.

Adding backpressure (reactive pull) to groupByUntil was found to not work easily with its signatures so before 1.0 Final it was decided to collapse groupBy and groupByUntil. Further details on this can be found in Pull Request 1727.

Here is an example of how groupBy now behaves when a child GroupedObservable is unsubscribed (using take here):

// odd/even into lists of 10
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.take(10).toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]

Previously this would have only emitted 2 groups and ignored all subsequent values:

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

On a finite stream, similar behavior of the previous groupBy implementation that would filter can be achieved like this:

//odd/even into lists of 10
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.filter(i -> i <= 20).toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

That however does allow the stream to complete (which may not be wanted).

To unsubscribe here are some choices that get the same output but efficiently unsubscribe up so the source only emits 40 values:

Observable.timer(0, 1, TimeUnit.MILLISECONDS)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.take(10).toList();
        }).take(2).toBlocking().forEach(System.out::println);

or

Observable.timer(0, 1, TimeUnit.MILLISECONDS)
        .take(20)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.toList();
        }).toBlocking().forEach(System.out::println);

These show that now groupBy composes like any other operator without the nuanced and hidden behavior of ignoring values after a child GroupedObservable is unsubscribed.

Uses of groupByUntil can now all be done by just using operators like take, takeWhile and takeUntil on the GroupedObservable directly, such as this:

Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
        .groupBy(n -> n)
        .flatMap(g -> {
            return g.take(3).reduce((s, s2) -> s + s2);
        }).forEach(System.out::println);
aaa
bbb
ccc
aaa
bbb
ccc

retryWhen/repeatWhen

The retryWhen and repeatWhen method signatures both emitted a Observable<Notification> type which could be queried to represent either onError in the retryWhen case or onCompleted in the repeatWhen case. This was found to be confusing and unnecessary. The signatures were changed to emit Observable<Throwable> for retryWhen and Observable<Void> for repeatWhen to better signal the type of notification they are emitting without the need to then query the Notification.

The following contrived examples shows how the Observable<Throwable> is used to get the error that occurred when deciding to retry:

    AtomicInteger count = new AtomicInteger();
    Observable.create((Subscriber<? super String> s) -> {
        if (count.getAndIncrement() == 0) {
            s.onError(new RuntimeException("always fails"));
        } else {
            s.onError(new IllegalArgumentException("user error"));
        }
    }).retryWhen(attempts -> {
        return attempts.flatMap(throwable -> {
            if (throwable instanceof IllegalArgumentException) {
                System.out.println("don't retry on IllegalArgumentException... allow failure");
                return Observable.error(throwable);
            } else {
                System.out.println(throwable + " => retry after 1 second");
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        });
    })
    .toBlocking().forEach(System.out::println);

collect

The collect operator was changed to require a factory method for the initial value. This allows the Observable to be executed multiple times and get a new value (typically a mutable data structure) each time. Prior to this the Observable could only be subscribed to once since it would retain the original mutable data structure and keep mutating it.

Observable.range(0, 10).collect(() -> new ArrayList<Integer>(), (list, i) -> {
    list.add(i);
}).forEach(System.out::println);

Removed Scheduler.parallelism

The Scheduler.parallelism method was no longer being used by anything so was removed.

Removed Observable.parallel

The parallel operator was a failed experiment and almost all uses of it were wrong and led to confusion and often bad performance. Due to this it was removed.

Here is example code to show approaches to adding concurrency:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
        System.out.println("------------ mergingSync");
        mergingSync();
        System.out.println("------------ mergingSyncMadeAsync");
        mergingSyncMadeAsync();
        System.out.println("------------ flatMapExampleSync");
        flatMapExampleSync();
        System.out.println("------------ flatMapExampleAsync");
        flatMapExampleAsync();
        System.out.println("------------ flatMapBufferedExampleAsync");
        flatMapBufferedExampleAsync();
        System.out.println("------------ flatMapWindowedExampleAsync");
        flatMapWindowedExampleAsync();
        System.out.println("------------");
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2))
                .toBlocking().forEach(System.out::println);
    }

    /**
     * Merging async Observables subscribes to all of them concurrently.
     */
    private static void mergingSync() {
        // here you'll see the delay as each is executed synchronously
        Observable.merge(getDataSync(1), getDataSync(2))
                .toBlocking().forEach(System.out::println);
    }

    /**
     * If the Observables are synchronous they can be made async with `subscribeOn`
     */
    private static void mergingSyncMadeAsync() {
        // if you have something synchronous and want to make it async, you can schedule it like this
        // so here we see both executed concurrently
        Observable.merge(
                getDataSync(1).subscribeOn(Schedulers.io()),
                getDataSync(2).subscribeOn(Schedulers.io())
                )
                .toBlocking().forEach(System.out::println);
    }

    /**
     * flatMap uses `merge` so any async Observables it returns will execute concurrently.
     */
    private static void flatMapExampleAsync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataAsync(i);
        }).toBlocking().forEach(System.out::println);
    }

    /**
     * If synchronous Observables are merged (via flatMap here) then it will behave like `concat`
     * and execute each Observable (getDataSync here) synchronously one after the other.
     */
    private static void flatMapExampleSync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataSync(i);
        }).toBlocking().forEach(System.out::println);
    }

    /**
     * If a single stream needs to be split across multiple CPUs it is generally more efficient to do it in batches.
     * 
     * The `buffer` operator can be used to batch into chunks that are then each processed on a separate thread.
     */
    private static void flatMapBufferedExampleAsync() {
        Observable.range(0, 5000).buffer(500).flatMap(i -> {
            return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
                // simulate computational work
                    try {
                        Thread.sleep(1);
                    } catch (Exception e) {
                    }
                    return item + " processed " + Thread.currentThread();
                });
        }).toBlocking().forEach(System.out::println);
    }

    /**
     * Or the `window` operator can be used instead of buffer to process them as a stream instead of buffered list.
     */
    private static void flatMapWindowedExampleAsync() {
        Observable.range(0, 5000).window(500).flatMap(work -> {
            return work.observeOn(Schedulers.computation()).map(item -> {
                // simulate computational work
                    try {
                        Thread.sleep(1);
                    } catch (Exception e) {
                    }
                    return item + " processed " + Thread.currentThread();
                });
        }).toBlocking().forEach(System.out::println);
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                s.onNext(i);
                s.onCompleted();
            });
    }
}

Removed Observable.parallelMerge

Similar to the parallel operator parallelMerge was confusing and was a failed experiment so was removed.

Removed publishLast/initialValue

Removed publishLast since it can be done with takeLast(1).publish().
Removed any method overload that took an initial value since the startWith operator already allows that generically.

Removed Utility Methods/Classes

A handful of utility classes and methods that are not core to the purpose of RxJava were removed. This was preferred over having to constantly make arbitrary decisions about what utility methods should be included versus not while balancing the other goal of keeping the library small and focused.

The changes can be seen here and here

Observable.compose

The Transformer signature used by compose was changed from:

public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>>

to

public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<? extends R>>

This was done after finding issues with the generics that prevented it from easily being used.

Removed Observable.multicast

The multicast operator was removed in favor of directly using publish(), replay(), share() or cache().
This was done because multicast could not be made to work with reactive pull backpressure due to its use of Subject. The publish and replay operators on the other hand can have their implementations changed to not use a Subject and then support backpressure. The publish() operator has since been updated to support backpressure.

Removed Observable.*withIndex

The takeWhileWithIndex and skipWhileWithIndex operators were removed since zipWith(Observable.range(Integer.MAX_VALUE)) can be used on any Observable to get an index value. Any operator may want "withIndex" so it didn't make sense to have API clutter with some and certainly not all would have this variant added.

Observable.longCount -> countLong

The longCount operator was renamed to countLong so it shows up alphabetically next to count.

Full List of Changes

  • Pull 1886 Buffer with time and merge fix
  • Pull 1882 Remove Unused Scheduler.parallelism
  • Pull 1884 Fix Scan/Reduce/Collect Factory Ambiguity
  • Pull 1866 Fix memory leak in bounded ReplaySubject due to retaining the node index
  • Pull 1834 Subject.toSerialized
  • Pull 1832 Fix Take Early Unsubscription Causing Interrupts
  • Pull 1835 Scan/Reduce with Seed Factory
  • Pull 1836 Reduce Ring Buffer Default Sizes (and lower for Android)
  • Pull 1833 Fix Thread Safety for Unsubscribe of Window
  • Pull 1827 CacheThreadScheduler Evictor should Check Removal
  • Pull 1830 Fix mergeDelayError Handling of Error in Parent Observable
  • Pull 1829 Fix Window by Count Unsubscribe Behavior
  • Pull 1788 Remove PublishLast/InitialValue
  • Pull 1796 Improve TestSubject Javadoc
  • Pull 1803 Print full classname (inner class support) and fix enum output
  • Pull 1802 add hasObservers method to Subjects
  • Pull 1806 Remove Unnecessary Utilities
  • Pull 1809 Remove Utility Functions from Public API
  • Pull 1813 Fix issue #1812 that zip may swallow requests
  • Pull 1817 Fix Synchronous OnSubscribe Exception Skips Operators
  • Pull 1819 Fix Concat Breaks with Double onCompleted
  • Pull 1771 On error return backpressure
  • Pull 1776 Observable.compose Generics
  • Pull 1778 Change Transformer to Func1<Observable, Observable>
  • Pull 1775 BlockingOperatorNextTest.testSingleSourceManyIterators fix
  • Pull 1784 Publish with Backpressure
  • Pull 1786 Remove Multicast
  • Pull 1787 Remove *withIndex Operators
  • Pull 1789 GroupedObservable.from/create
  • Pull 1793 Take/Redo Unsubscribe
  • Pull 1767 ExecutorScheduler delivers uncaught exceptions
  • Pull 1765 backpressure support in onErrorResumeNext* operators
  • Pull 1766 Unhandled errors go to UncaughtExceptionHandler
  • Pull 1755 OnSubscribeRefCount with Synchronous Support
  • Pull 1750 Fix NPE when iterable is null
  • Pull 1745 SerializedSubject
  • Pull 1746 Fatal System.err Logs on Unhandled Exceptions
  • Pull 1743 Subject Error Handling
  • Pull 1742 EmptyObserver and TestObserver
  • Pull 1740 longCount -> countLong
  • Pull 1736 Fix TrampolineScheduler NullPointerException
  • Pull 1738 Delay Operator with Reactive Pull Backpressure
  • Pull 1739 Fix Slow Non-deterministic Test
  • Pull 1731 Remove Unused Code
  • Pull 1733 Move To Proper Location
  • Pull 1747 Cleanup: final and utility classes
  • Pull 1729 CombineLatest: Request Up When Dropping Values
  • Pull 1728 ObserveOn Error Propagation
  • Pull 1727 Proposed groupBy/groupByUntil Changes
  • Pull 1726 Fix Merge: backpressure + scalarValueQueue don't play nicely
  • Pull 1720 Change repeatWhen and retryWhen signatures.
  • Pull 1719 Fix Bug in the onBackpressure operators
  • Pull 1687 Don't allocate an empty ArrayList for each Observable.empty call
  • Pull 1705 Fix null-emitting combineLatest
  • Pull 1683 ObserveOn Error Handling
  • Pull 1686 Fix Rx serialization bug in takeUntil again and the concurrent issue in BufferUntilSubscriber
  • Pull 1701 Fix the compose generics
  • Pull 1712 Fixing regression in mergeDelayError
  • Pull 1716 Remove Observable.Parallel
  • Pull 1667 Fix the bug that Switch doesn't propagate 'unsubscribe'
  • Pull 1659 OperatorScan should check for MAX_VALUE on request
  • Pull 1657 Ignore furthur messages after entering terminate state
  • Pull 1669 Error Handling Unsubscribe and Terminal State
  • Pull 1656 Make TakeUntil obey Rx serialization contract
  • Pull 1652 Operator Scan Backpressure Fix
  • Pull 1645 Remove ParallelMerge

Artifacts: Maven Central