Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Add deliverLatestToView to rx2/RxTiPresenterUtils #137

Merged
merged 3 commits into from
Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,34 @@
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import net.grandcentrix.thirtyinch.Removable;
import net.grandcentrix.thirtyinch.TiLifecycleObserver;
import net.grandcentrix.thirtyinch.TiPresenter;
import net.grandcentrix.thirtyinch.TiView;

public class RxTiPresenterUtils {

/**
* Wrapper for an emitted value, along with a record of whether or not the view was attached when the value was
* emitted.
*/
private static class ViewReadyValue<T> {

T value;

boolean viewReady;

ViewReadyValue(final T t, final Boolean viewReady) {
this.value = t;
this.viewReady = viewReady;
}
}

/**
* Observable of the view state. The View is ready to receive calls after calling {@link
* TiPresenter#attachView(TiView)} and before calling {@link TiPresenter#detachView()}.
Expand Down Expand Up @@ -66,4 +86,54 @@ public boolean isDisposed() {
}).distinctUntilChanged();
}

/**
* Returns a transformer that will delay onNext, onError and onComplete emissions unless a view
* become available. getView() is guaranteed to be != null during all emissions. This
* transformer can only be used on application's main thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must admit, when I wrote the rx1 version I copied most of the docs.
Actually the main thread restriction is not true it's only recommended to make sure the view is always non null. Attach/Detach happens on the main thread. When code executes on any other thread it's not guaranteed that those actions happen before the view attach/detach state changes. So it may happen that the emission of T happens after the view got detached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to rewrite the comment. The behaviour should be clear from the tests anyway, I hope.

* <p/>
* If this transformer receives a next value while the previous value has not been delivered,
* the previous value will be dropped.
* <p/>
* Use this operator when you need to show updatable data.
*
* @param <T> a type of onNext value.
* @param presenter the presenter waiting for the view
* @return the delaying operator.
*/
public static <T> ObservableTransformer<T, T> deliverLatestToView(
final TiPresenter presenter) {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(final Observable<T> observable) {

// make sure we never complete
final Observable<T> source = observable.concatWith(Observable.<T>never());

// The order of the sources is important here! We want the viewReady emission to be captured first so that any synchronous
// source emissions are not skipped.
// See https://github.com/ReactiveX/RxJava/issues/5325
return Observable
.combineLatest(isViewReady(presenter), source,
new BiFunction<Boolean, T, ViewReadyValue<T>>() {
@Override
public ViewReadyValue<T> apply(final Boolean viewReady, final T t)
throws Exception {
return new ViewReadyValue<>(t, viewReady);
}
})
.flatMap(new Function<ViewReadyValue<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(final ViewReadyValue<T> viewReadyValue)
throws Exception {
if (viewReadyValue.viewReady) {
return Observable.just(viewReadyValue.value);
} else {
return Observable.empty();
}
}
});
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import static org.mockito.Mockito.*;

import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.subjects.PublishSubject;
import net.grandcentrix.thirtyinch.TiPresenter;
import net.grandcentrix.thirtyinch.TiView;
import org.junit.*;
Expand Down Expand Up @@ -57,4 +59,116 @@ public void testIsViewReady_DisposeBeforeAttachView_ShouldRemoveCallback() throw
mPresenter.attachView(mView);
test.assertValue(false);
}
}

@Test
public void testDeliverLatestToView_ViewNotReady() throws Exception {
mPresenter.create();

TestObserver<Integer> testObserver = new TestObserver<>();
Observable.just(1, 2, 3)
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);

mPresenter.attachView(mView);

testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertValuesOnly(3);
}

@Test
public void testDeliverLatestToView_ViewReady() throws Exception {
mPresenter.create();
mPresenter.attachView(mView);

TestObserver<Integer> testObserver = new TestObserver<>();
Observable.just(1, 2, 3)
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);

testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertValuesOnly(1, 2, 3);
}

@Test
public void testDeliverLatestToView_ViewNeverReady() throws Exception {
mPresenter.create();

TestObserver<Integer> testObserver = new TestObserver<>();
Observable.just(1, 2, 3)
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);

testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertEmpty();
}

@Test
public void testDeliverLatestToView_ViewComesAndGoes() throws Exception {
mPresenter.create();

PublishSubject<Integer> source = PublishSubject.create();
TestObserver<Integer> testObserver = new TestObserver<>();

source
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);

source.onNext(1);
source.onNext(2);
mPresenter.attachView(mView);
source.onNext(3);
mPresenter.detachView();
source.onNext(4);
source.onNext(5);
mPresenter.attachView(mView);
source.onNext(6);

testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertValuesOnly(2, 3, 5, 6);
}

@Test
public void testDeliverLatestToView_SingleItemViewComesAndGoes() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this test to rx1 as well. I'm actually not sure this is working.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that I've created a new issue. See #138

mPresenter.create();

PublishSubject<Integer> source = PublishSubject.create();
TestObserver<Integer> testObserver = new TestObserver<>();

source
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);

source.onNext(1);
source.onNext(2);
mPresenter.attachView(mView);
mPresenter.detachView();
mPresenter.attachView(mView);
mPresenter.detachView();
mPresenter.attachView(mView);

testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertValuesOnly(2, 2, 2);
}

@Test
public void testDeliverLatestToView_Empty() throws Exception {
mPresenter.create();

TestObserver<Integer> testObserver = new TestObserver<>();
Observable.<Integer>empty()
.compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
.subscribe(testObserver);

mPresenter.attachView(mView);

testObserver.assertNotComplete();
testObserver.assertNoErrors();
testObserver.assertEmpty();
}

}