Skip to content

Commit

Permalink
Adds createWithDefault API
Browse files Browse the repository at this point in the history
  • Loading branch information
oldergod authored and JakeWharton committed Sep 13, 2019
1 parent 45c3faa commit 80174ec
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ import io.reactivex.Observable
* value from the upstream observable *only* when one or more downstream observers are connected.
* This allows expensive upstream observables to be shut down when no one is observing while also
* replaying the last value seen by *any* observer to new ones.
*
* @param defaultValue the initial value to be cached
*/
fun <T> Observable<T>.replayingShare(): Observable<T> = compose(ReplayingShare.instance<T>())
@JvmOverloads
fun <T> Observable<T>.replayingShare(defaultValue: T? = null): Observable<T> {
return compose(
if (defaultValue != null) ReplayingShare.createWithDefault(defaultValue)
else ReplayingShare.instance<T>()
)
}

/**
* A transformer which combines `replay(1)`, `publish()`, and `refCount()` operators.
Expand All @@ -35,5 +43,13 @@ fun <T> Observable<T>.replayingShare(): Observable<T> = compose(ReplayingShare.i
* value from the upstream flowable *only* when one or more downstream subscribers are connected.
* This allows expensive upstream flowables to be shut down when no one is subscribed while also
* replaying the last value seen by *any* subscriber to new ones.
*
* @param defaultValue the initial value to be cached
*/
fun <T> Flowable<T>.replayingShare(): Flowable<T> = compose(ReplayingShare.instance<T>())
@JvmOverloads
fun <T> Flowable<T>.replayingShare(defaultValue: T? = null): Flowable<T> {
return compose(
if (defaultValue != null) ReplayingShare.createWithDefault(defaultValue)
else ReplayingShare.instance<T>()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,14 @@ class ReplayingShareTest {
val o = Flowable.never<String>().replayingShare()
assertNotNull(o)
}

@Test fun observableExtensionMethodWorksWithDefaultValue() {
val strings = Observable.never<String>().replayingShare("default").test()
strings.assertValues("default")
}

@Test fun flowableExtensionMethodWorksWithDefaultValue() {
val strings = Flowable.never<String>().replayingShare("default").test()
strings.assertValues("default")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,64 @@
*/
public final class ReplayingShare<T>
implements ObservableTransformer<T, T>, FlowableTransformer<T, T> {
private static final ReplayingShare<Object> INSTANCE = new ReplayingShare<>();
private static final ReplayingShare<Object> INSTANCE = new ReplayingShare<>(null);

/** The singleton instance of this transformer. */
@SuppressWarnings("unchecked") // Safe because of erasure.
public static <T> ReplayingShare<T> instance() {
return (ReplayingShare<T>) INSTANCE;
}

private ReplayingShare() {
/**
* Creates a `ReplayingShare` transformer with a default value which will be emitted downstream
* on subscription if there is not any cached value yet.
* @param defaultValue the initial value, cannot be null
* @throws NullPointerException if {@code defaultValue} is null
*/
public static <T> ReplayingShare<T> createWithDefault(T defaultValue) {
if (defaultValue == null) throw new NullPointerException("defaultValue == null");
return new ReplayingShare<>(defaultValue);
}

private final T defaultValue;

/**
* Constructs a ReplayingShare with the given initial value.
* @param defaultValue the initial value
*/
private ReplayingShare(T defaultValue) {
this.defaultValue = defaultValue;
}

@Override public Observable<T> apply(Observable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
LastSeen<T> lastSeen = new LastSeen<>(defaultValue);
return new LastSeenObservable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}

@Override public Flowable<T> apply(Flowable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
LastSeen<T> lastSeen = new LastSeen<>(defaultValue);
return new LastSeenFlowable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}

static final class LastSeen<T> implements Observer<T>, Subscriber<T> {
private final T defaultValue;
volatile T value;

LastSeen(T defaultValue) {
this.defaultValue = defaultValue;
value = defaultValue;
}

@Override public void onNext(T value) {
this.value = value;
}

@Override public void onError(Throwable e) {
value = null;
value = defaultValue;
}

@Override public void onComplete() {
value = null;
value = defaultValue;
}

@Override public void onSubscribe(Subscription ignored) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,16 @@ public final class ReplayingShareFlowableTest {
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWith(start).compose(ReplayingShare.<String>instance());

TestSubscriber<String> observer1 = new TestSubscriber<>();
replayed.subscribe(observer1);
observer1.assertValues("initA");
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");

TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
observer1.assertValues("initA");
subscriber1.assertValues("initA");

upstream.onComplete();
observer1.assertComplete();
subscriber1.assertComplete();
observer2.assertComplete();

start.set(0, "initB");
Expand All @@ -224,17 +224,17 @@ public final class ReplayingShareFlowableTest {
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWith(start).compose(ReplayingShare.<String>instance());

TestSubscriber<String> observer1 = new TestSubscriber<>();
replayed.subscribe(observer1);
observer1.assertValues("initA");
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");

TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
observer1.assertValues("initA");
subscriber1.assertValues("initA");

RuntimeException r = new RuntimeException();
upstream.onError(r);
observer1.assertError(r);
subscriber1.assertError(r);
observer2.assertError(r);

start.set(0, "initB");
Expand All @@ -255,4 +255,87 @@ public final class ReplayingShareFlowableTest {
replayed.subscribe(testSubscriber);
testSubscriber.assertNoValues();
}

@Test public void defaultValueOnSubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");

subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");
}

@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");

subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");

TestSubscriber<String> observer2 = new TestSubscriber<>();
flowable.subscribe(observer2);
observer2.assertValues("Foo");
}

@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");

PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWith(start).compose(ReplayingShare.createWithDefault("default"));

TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");

TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");

upstream.onComplete();
subscriber1.assertComplete();
observer2.assertComplete();

start.set(0, "initB");

TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}

@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");

PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWith(start).compose(ReplayingShare.createWithDefault("default"));

TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");

TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");

RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);

start.set(0, "initB");

TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.jakewharton.rx;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
Expand Down Expand Up @@ -258,4 +257,87 @@ public final class ReplayingShareObservableTest {
replayed.subscribe(testObserver);
testObserver.assertNoValues();
}

@Test public void defaultValueOnSubscribe() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.createWithDefault("default"));

TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertValues("default");

subject.onNext("Foo");
observer1.assertValues("default", "Foo");
}

@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.createWithDefault("default"));

TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertValues("default");

subject.onNext("Foo");
observer1.assertValues("default", "Foo");

TestObserver<String> observer2 = new TestObserver<>();
observable.subscribe(observer2);
observer2.assertValues("Foo");
}

@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");

PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed =
upstream.startWith(start).compose(ReplayingShare.createWithDefault("default"));

TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("default", "initA");

TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("default", "initA");

upstream.onComplete();
observer1.assertComplete();
observer2.assertComplete();

start.set(0, "initB");

TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}

@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");

PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed =
upstream.startWith(start).compose(ReplayingShare.createWithDefault("default"));

TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("default", "initA");

TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("default", "initA");

RuntimeException r = new RuntimeException();
upstream.onError(r);
observer1.assertError(r);
observer2.assertError(r);

start.set(0, "initB");

TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
}

0 comments on commit 80174ec

Please sign in to comment.