Skip to content

Commit

Permalink
Merge pull request #3417 from artem-zinnatullin/single-do-on-success
Browse files Browse the repository at this point in the history
Add Single.doOnSuccess()
  • Loading branch information
abersnaze committed Oct 9, 2015
2 parents 1db23ee + 0c0a18f commit c1d3187
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
34 changes: 34 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1864,4 +1864,38 @@ public void onNext(T t) {

return lift(new OperatorDoOnEach<T>(observer));
}

/**
* Modifies the source {@link Single} so that it invokes an action when it calls {@code onSuccess}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnNext.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSuccess} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSuccess
* the action to invoke when the source {@link Single} calls {@code onSuccess}
* @return the source {@link Single} with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@Experimental
public final Single<T> doOnSuccess(final Action1<? super T> onSuccess) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(T t) {
onSuccess.call(t);
}
};

return lift(new OperatorDoOnEach<T>(observer));
}
}
78 changes: 78 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -570,4 +571,81 @@ public void shouldPassErrorFromCallable() throws Exception {

verify(callable).call();
}

@Test
public void doOnSuccessShouldInvokeAction() {
Action1<String> action = mock(Action1.class);

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("value")
.doOnSuccess(action)
.subscribe(testSubscriber);

testSubscriber.assertValue("value");
testSubscriber.assertNoErrors();

verify(action).call(eq("value"));
}

@Test
public void doOnSuccessShouldPassErrorFromActionToSubscriber() {
Action1<String> action = mock(Action1.class);

Throwable error = new IllegalStateException();
doThrow(error).when(action).call(eq("value"));

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("value")
.doOnSuccess(action)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(error);

verify(action).call(eq("value"));
}

@Test
public void doOnSuccessShouldNotCallActionIfSingleThrowsError() {
Action1<Object> action = mock(Action1.class);

Throwable error = new IllegalStateException();

TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

Single
.error(error)
.doOnSuccess(action)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(error);

verifyZeroInteractions(action);
}

@Test
public void doOnSuccessShouldNotSwallowExceptionThrownByAction() {
Action1<String> action = mock(Action1.class);

Throwable exceptionFromAction = new IllegalStateException();

doThrow(exceptionFromAction).when(action).call(eq("value"));

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("value")
.doOnSuccess(action)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(exceptionFromAction);

verify(action).call(eq("value"));
}
}

0 comments on commit c1d3187

Please sign in to comment.