Skip to content

Commit

Permalink
Promote Single.TerminalSignalConsumer to top level interface (#1008)
Browse files Browse the repository at this point in the history
Motivation:
TerminalSignalConsumer for Completable and Publisher is a top level
type, but Single has an internal type for the same purpose. This makes
it more challenging to expose utilities classes in a consistent manner
and also may lead to naming collisions.

Modifications:
- Move Single.TerminalSignalConsumer to a top level interface and rename
to SingleTerminalSignalConsumer to avoid naming collisions.
- Expose RunnableTerminalSignalConsumer and re-use it in
BeforeFinallyOnHttpResponseOperator.

Result:
More consistent type definition scheme for TerminalSignalConsumer and
SingleTerminalSignalConsumer and ability to expose conversion/utility
methods outside the scope of the Source.
  • Loading branch information
Scottmitch committed Apr 7, 2020
1 parent 3756013 commit e53d677
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 125 deletions.
Expand Up @@ -23,9 +23,9 @@

final class AfterFinallySingle<T> extends AbstractSynchronousSingleOperator<T, T> {

private final TerminalSignalConsumer<T> doFinally;
private final SingleTerminalSignalConsumer<T> doFinally;

AfterFinallySingle(Single<T> original, TerminalSignalConsumer<T> doFinally, Executor executor) {
AfterFinallySingle(Single<T> original, SingleTerminalSignalConsumer<T> doFinally, Executor executor) {
super(original, executor);
this.doFinally = requireNonNull(doFinally);
}
Expand All @@ -37,14 +37,14 @@ public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {

private static final class AfterFinallySingleSubscriber<T> implements Subscriber<T> {
private final Subscriber<? super T> original;
private final TerminalSignalConsumer<T> doFinally;
private final SingleTerminalSignalConsumer<T> doFinally;

private static final AtomicIntegerFieldUpdater<AfterFinallySingleSubscriber> doneUpdater =
AtomicIntegerFieldUpdater.newUpdater(AfterFinallySingleSubscriber.class, "done");
@SuppressWarnings("unused")
private volatile int done;

AfterFinallySingleSubscriber(Subscriber<? super T> original, TerminalSignalConsumer<T> doFinally) {
AfterFinallySingleSubscriber(Subscriber<? super T> original, SingleTerminalSignalConsumer<T> doFinally) {
this.original = original;
this.doFinally = doFinally;
}
Expand Down
Expand Up @@ -23,9 +23,9 @@

final class BeforeFinallySingle<T> extends AbstractSynchronousSingleOperator<T, T> {

private final TerminalSignalConsumer<T> doFinally;
private final SingleTerminalSignalConsumer<T> doFinally;

BeforeFinallySingle(Single<T> original, TerminalSignalConsumer<T> doFinally, Executor executor) {
BeforeFinallySingle(Single<T> original, SingleTerminalSignalConsumer<T> doFinally, Executor executor) {
super(original, executor);
this.doFinally = requireNonNull(doFinally);
}
Expand All @@ -37,14 +37,14 @@ public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {

private static final class BeforeFinallySingleSubscriber<T> implements Subscriber<T> {
private final Subscriber<? super T> original;
private final TerminalSignalConsumer<T> doFinally;
private final SingleTerminalSignalConsumer<T> doFinally;

private static final AtomicIntegerFieldUpdater<BeforeFinallySingleSubscriber> doneUpdater =
AtomicIntegerFieldUpdater.newUpdater(BeforeFinallySingleSubscriber.class, "done");
@SuppressWarnings("unused")
private volatile int done;

BeforeFinallySingleSubscriber(Subscriber<? super T> original, TerminalSignalConsumer<T> doFinally) {
BeforeFinallySingleSubscriber(Subscriber<? super T> original, SingleTerminalSignalConsumer<T> doFinally) {
this.original = original;
this.doFinally = doFinally;
}
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

final class RunnableSingleTerminalSignalConsumer<T> implements SingleTerminalSignalConsumer<T> {
private final Runnable onFinally;

RunnableSingleTerminalSignalConsumer(final Runnable onFinally) {
this.onFinally = requireNonNull(onFinally);
}

@Override
public void onSuccess(@Nullable final T result) {
onFinally.run();
}

@Override
public void onError(final Throwable throwable) {
onFinally.run();
}

@Override
public void cancel() {
onFinally.run();
}
}
Expand Up @@ -18,7 +18,6 @@
import static java.util.Objects.requireNonNull;

final class RunnableTerminalSignalConsumer implements TerminalSignalConsumer {

private final Runnable onFinally;

RunnableTerminalSignalConsumer(final Runnable onFinally) {
Expand Down
Expand Up @@ -297,18 +297,20 @@ public final Single<T> whenFinally(Runnable doFinally) {
}

/**
* Invokes the corresponding method on {@code whenFinally} {@link TerminalSignalConsumer} argument when any of the
* following terminal methods are called:
* Invokes the corresponding method on {@code whenFinally} {@link SingleTerminalSignalConsumer} argument when any
* of the following terminal methods are called:
* <ul>
* <li>{@link Subscriber#onSuccess(Object)} - invokes {@link TerminalSignalConsumer#onSuccess(Object)}</li>
* <li>{@link Subscriber#onError(Throwable)} - invokes {@link TerminalSignalConsumer#onError(Throwable)}</li>
* <li>{@link Cancellable#cancel()} - invokes {@link TerminalSignalConsumer#cancel()}</li>
* <li>{@link Subscriber#onSuccess(Object)} - invokes
* {@link SingleTerminalSignalConsumer#onSuccess(Object)}</li>
* <li>{@link Subscriber#onError(Throwable)} - invokes
* {@link SingleTerminalSignalConsumer#onError(Throwable)}</li>
* <li>{@link Cancellable#cancel()} - invokes {@link SingleTerminalSignalConsumer#cancel()}</li>
* </ul>
* for Subscriptions/{@link Subscriber}s of the returned {@link Single}.
* <p>
* The order in which {@code whenFinally} will be invoked relative to the above methods is undefined. If you need
* strict ordering see {@link #beforeFinally(TerminalSignalConsumer)} and
* {@link #afterFinally(TerminalSignalConsumer)}.
* strict ordering see {@link #beforeFinally(SingleTerminalSignalConsumer)} and
* {@link #afterFinally(SingleTerminalSignalConsumer)}.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
Expand All @@ -327,12 +329,12 @@ public final Single<T> whenFinally(Runnable doFinally) {
* }</pre>
*
* @param doFinally For each subscribe of the returned {@link Single}, at most one method of this
* {@link TerminalSignalConsumer} will be invoked.
* {@link SingleTerminalSignalConsumer} will be invoked.
* @return The new {@link Single}.
* @see #beforeFinally(TerminalSignalConsumer)
* @see #afterFinally(TerminalSignalConsumer)
* @see #beforeFinally(SingleTerminalSignalConsumer)
* @see #afterFinally(SingleTerminalSignalConsumer)
*/
public final Single<T> whenFinally(TerminalSignalConsumer<T> doFinally) {
public final Single<T> whenFinally(SingleTerminalSignalConsumer<T> doFinally) {
return beforeFinally(doFinally);
}

Expand Down Expand Up @@ -714,16 +716,18 @@ public final Single<T> beforeCancel(Runnable onCancel) {
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX do operator.</a>
*/
public final Single<T> beforeFinally(Runnable doFinally) {
return beforeFinally(new RunnableTerminalSignalConsumer<>(doFinally));
return beforeFinally(new RunnableSingleTerminalSignalConsumer<>(doFinally));
}

/**
* Invokes the corresponding method on {@code beforeFinally} {@link TerminalSignalConsumer} argument
* Invokes the corresponding method on {@code beforeFinally} {@link SingleTerminalSignalConsumer} argument
* <strong>before</strong> any of the following terminal methods are called:
* <ul>
* <li>{@link Subscriber#onSuccess(Object)} - invokes {@link TerminalSignalConsumer#onSuccess(Object)}</li>
* <li>{@link Subscriber#onError(Throwable)} - invokes {@link TerminalSignalConsumer#onError(Throwable)}</li>
* <li>{@link Cancellable#cancel()} - invokes {@link TerminalSignalConsumer#cancel()}</li>
* <li>{@link Subscriber#onSuccess(Object)} - invokes
* {@link SingleTerminalSignalConsumer#onSuccess(Object)}</li>
* <li>{@link Subscriber#onError(Throwable)} - invokes
* {@link SingleTerminalSignalConsumer#onError(Throwable)}</li>
* <li>{@link Cancellable#cancel()} - invokes {@link SingleTerminalSignalConsumer#cancel()}</li>
* </ul>
* for Subscriptions/{@link Subscriber}s of the returned {@link Single}.
* <p>
Expand All @@ -742,11 +746,11 @@ public final Single<T> beforeFinally(Runnable doFinally) {
* }</pre>
*
* @param doFinally For each subscribe of the returned {@link Single}, at most one method of this
* {@link TerminalSignalConsumer} will be invoked.
* {@link SingleTerminalSignalConsumer} will be invoked.
* @return The new {@link Single}.
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX do operator.</a>
*/
public final Single<T> beforeFinally(TerminalSignalConsumer<T> doFinally) {
public final Single<T> beforeFinally(SingleTerminalSignalConsumer<T> doFinally) {
return new BeforeFinallySingle<>(this, doFinally, executor);
}

Expand Down Expand Up @@ -878,16 +882,18 @@ public final Single<T> afterCancel(Runnable onCancel) {
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX do operator.</a>
*/
public final Single<T> afterFinally(Runnable doFinally) {
return afterFinally(new RunnableTerminalSignalConsumer<>(doFinally));
return afterFinally(new RunnableSingleTerminalSignalConsumer<>(doFinally));
}

/**
* Invokes the corresponding method on {@code afterFinally} {@link TerminalSignalConsumer} argument
* Invokes the corresponding method on {@code afterFinally} {@link SingleTerminalSignalConsumer} argument
* <strong>after</strong> any of the following terminal methods are called:
* <ul>
* <li>{@link Subscriber#onSuccess(Object)} - invokes {@link TerminalSignalConsumer#onSuccess(Object)}</li>
* <li>{@link Subscriber#onError(Throwable)} - invokes {@link TerminalSignalConsumer#onError(Throwable)}</li>
* <li>{@link Cancellable#cancel()} - invokes {@link TerminalSignalConsumer#cancel()}</li>
* <li>{@link Subscriber#onSuccess(Object)} - invokes
* {@link SingleTerminalSignalConsumer#onSuccess(Object)}</li>
* <li>{@link Subscriber#onError(Throwable)} - invokes
* {@link SingleTerminalSignalConsumer#onError(Throwable)}</li>
* <li>{@link Cancellable#cancel()} - invokes {@link SingleTerminalSignalConsumer#cancel()}</li>
* </ul>
* for Subscriptions/{@link Subscriber}s of the returned {@link Single}.
* <p>
Expand All @@ -906,11 +912,11 @@ public final Single<T> afterFinally(Runnable doFinally) {
* }</pre>
*
* @param doFinally For each subscribe of the returned {@link Single}, at most one method of this
* {@link TerminalSignalConsumer} will be invoked.
* {@link SingleTerminalSignalConsumer} will be invoked.
* @return The new {@link Single}.
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX do operator.</a>
*/
public final Single<T> afterFinally(TerminalSignalConsumer<T> doFinally) {
public final Single<T> afterFinally(SingleTerminalSignalConsumer<T> doFinally) {
return new AfterFinallySingle<>(this, doFinally, executor);
}

Expand Down Expand Up @@ -1864,55 +1870,4 @@ final Executor executor() {
//
// Internal Methods End
//

/**
* A contract that provides discrete callbacks for various ways in which a {@link SingleSource.Subscriber} can
* terminate.
*
* @param <T> Type of the result of the {@link Single}.
*/
public interface TerminalSignalConsumer<T> {

/**
* Callback to indicate termination via {@link Subscriber#onSuccess(Object)}.
*
* @param result the observed result of type {@link T}.
*/
void onSuccess(@Nullable T result);

/**
* Callback to indicate termination via {@link Subscriber#onError(Throwable)}.
*
* @param throwable the observed {@link Throwable}.
*/
void onError(Throwable throwable);

/**
* Callback to indicate termination via {@link Cancellable#cancel()}.
*/
void cancel();
}

private static final class RunnableTerminalSignalConsumer<T> implements TerminalSignalConsumer<T> {
private final Runnable onFinally;

RunnableTerminalSignalConsumer(final Runnable onFinally) {
this.onFinally = requireNonNull(onFinally);
}

@Override
public void onSuccess(@Nullable final T result) {
onFinally.run();
}

@Override
public void onError(final Throwable throwable) {
onFinally.run();
}

@Override
public void cancel() {
onFinally.run();
}
}
}
@@ -0,0 +1,59 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;

import javax.annotation.Nullable;

/**
* A contract that provides discrete callbacks for various ways in which a {@link SingleSource.Subscriber} can
* terminate.
*
* @param <T> Type of the result of the {@link Single}.
*/
public interface SingleTerminalSignalConsumer<T> {
/**
* Callback to indicate termination via {@link SingleSource.Subscriber#onSuccess(Object)}.
*
* @param result the observed result of type {@link T}.
*/
void onSuccess(@Nullable T result);

/**
* Callback to indicate termination via {@link SingleSource.Subscriber#onError(Throwable)}.
*
* @param throwable the observed {@link Throwable}.
*/
void onError(Throwable throwable);

/**
* Callback to indicate termination via {@link Cancellable#cancel()}.
*/
void cancel();

/**
* Create a {@link SingleTerminalSignalConsumer} where each method executes a {@link Runnable#run()}.
* @param runnable The {@link Runnable} which is invoked in each method of the returned
* {@link SingleTerminalSignalConsumer}.
* @param <X> The type of {@link SingleTerminalSignalConsumer}.
* @return a {@link SingleTerminalSignalConsumer} where each method executes a {@link Runnable#run()}.
*/
static <X> SingleTerminalSignalConsumer<X> from(Runnable runnable) {
return new RunnableSingleTerminalSignalConsumer<>(runnable);
}
}
Expand Up @@ -24,7 +24,6 @@
* {@link CompletableSource.Subscriber} can terminate.
*/
public interface TerminalSignalConsumer {

/**
* Callback to indicate termination via {@link PublisherSource.Subscriber#onComplete()} or
* {@link CompletableSource.Subscriber#onComplete()}.
Expand All @@ -43,4 +42,14 @@ public interface TerminalSignalConsumer {
* Callback to indicate termination via {@link Cancellable#cancel()}.
*/
void cancel();

/**
* Create a {@link TerminalSignalConsumer} where each method executes a {@link Runnable#run()}.
* @param runnable The {@link Runnable} which is invoked in each method of the returned
* {@link TerminalSignalConsumer}.
* @return a {@link TerminalSignalConsumer} where each method executes a {@link Runnable#run()}.
*/
static TerminalSignalConsumer from(Runnable runnable) {
return new RunnableTerminalSignalConsumer(runnable);
}
}

0 comments on commit e53d677

Please sign in to comment.