Skip to content

Commit

Permalink
Add doOnTerminate to Single/Maybe for consistency (#6379)
Browse files Browse the repository at this point in the history
  • Loading branch information
skryvets committed Jan 27, 2019
1 parent 1484106 commit c714e85
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Expand Up @@ -2892,6 +2892,31 @@ public final Maybe<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe) {
));
}

/**
* Returns a Maybe instance that calls the given onTerminate callback
* just before this Maybe completes normally or with an exception.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
* <p>
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
* {@code onError} notification.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError}
* @return the new Maybe instance
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @see #doOnTerminate(Action)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> doOnTerminate(final Action onTerminate) {
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
return RxJavaPlugins.onAssembly(new MaybeDoOnTerminate<T>(this, onTerminate));
}

/**
* Calls the shared consumer with the success value sent via onSuccess for each
* MaybeObserver that subscribes to the current Maybe.
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/io/reactivex/Single.java
Expand Up @@ -2495,6 +2495,32 @@ public final Single<T> doOnSubscribe(final Consumer<? super Disposable> onSubscr
return RxJavaPlugins.onAssembly(new SingleDoOnSubscribe<T>(this, onSubscribe));
}

/**
* Returns a Single instance that calls the given onTerminate callback
* just before this Single completes normally or with an exception.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
* <p>
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
* {@code onError} notification.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError}
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @see #doOnTerminate(Action)
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnTerminate(final Action onTerminate) {
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
return RxJavaPlugins.onAssembly(new SingleDoOnTerminate<T>(this, onTerminate));
}

/**
* Calls the shared consumer with the success value sent via onSuccess for each
* SingleObserver that subscribes to the current Single.
Expand Down
@@ -0,0 +1,90 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.Maybe;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;

public final class MaybeDoOnTerminate<T> extends Maybe<T> {

final MaybeSource<T> source;

final Action onTerminate;

public MaybeDoOnTerminate(MaybeSource<T> source, Action onTerminate) {
this.source = source;
this.onTerminate = onTerminate;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new DoOnTerminate(observer));
}

final class DoOnTerminate implements MaybeObserver<T> {
final MaybeObserver<? super T> downstream;

DoOnTerminate(MaybeObserver<? super T> observer) {
this.downstream = observer;
}

@Override
public void onSubscribe(Disposable d) {
downstream.onSubscribe(d);
}

@Override
public void onSuccess(T value) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onSuccess(value);
}

@Override
public void onError(Throwable e) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}

downstream.onError(e);
}

@Override
public void onComplete() {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onComplete();
}
}
}
@@ -0,0 +1,78 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;

public final class SingleDoOnTerminate<T> extends Single<T> {

final SingleSource<T> source;

final Action onTerminate;

public SingleDoOnTerminate(SingleSource<T> source, Action onTerminate) {
this.source = source;
this.onTerminate = onTerminate;
}

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new DoOnTerminate(observer));
}

final class DoOnTerminate implements SingleObserver<T> {

final SingleObserver<? super T> downstream;

DoOnTerminate(SingleObserver<? super T> observer) {
this.downstream = observer;
}

@Override
public void onSubscribe(Disposable d) {
downstream.onSubscribe(d);
}

@Override
public void onSuccess(T value) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onSuccess(value);
}

@Override
public void onError(Throwable e) {
try {
onTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}

downstream.onError(e);
}
}
}
@@ -0,0 +1,116 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.Maybe;
import io.reactivex.TestHelper;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.PublishSubject;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;

public class MaybeDoOnTerminateTest {

@Test(expected = NullPointerException.class)
public void doOnTerminate() {
Maybe.just(1).doOnTerminate(null);
}

@Test
public void doOnTerminateSuccess() {
final AtomicBoolean atomicBoolean = new AtomicBoolean();
Maybe.just(1).doOnTerminate(new Action() {
@Override
public void run() {
atomicBoolean.set(true);
}
}).subscribe();

assertTrue(atomicBoolean.get());
}

@Test
public void doOnTerminateError() {
final AtomicBoolean atomicBoolean = new AtomicBoolean();
Maybe.error(new TestException()).doOnTerminate(new Action() {
@Override
public void run() {
atomicBoolean.set(true);
}
})
.test()
.assertFailure(TestException.class);

assertTrue(atomicBoolean.get());
}

@Test
public void doOnTerminateComplete() {
final AtomicBoolean atomicBoolean = new AtomicBoolean();
Maybe.empty().doOnTerminate(new Action() {
@Override
public void run() {
atomicBoolean.set(true);
}
})
.subscribe();

assertTrue(atomicBoolean.get());
}

@Test
public void doOnTerminateSuccessCrash() {
Maybe.just(1).doOnTerminate(new Action() {
@Override
public void run() {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}

@Test
public void doOnTerminateErrorCrash() {
Maybe.error(new TestException())
.doOnTerminate(new Action() {
@Override
public void run() {
throw new TestException();
}
})
.test()
.assertFailure(CompositeException.class);
}

@Test
public void doOnTerminateCompleteCrash() {
Maybe.empty()
.doOnTerminate(new Action() {
@Override
public void run() {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
}

0 comments on commit c714e85

Please sign in to comment.