Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better visibility on transport errors #278

Merged
merged 6 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api.internal;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayv - one thing I forgot to ask was that does SingleProcessor need to be part of the api? We have a few internal packages now where I imagine this class could live.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasoning: CompletableProcessor was already public so we decided to make this one public too. I think it's a useful API to have, what is your concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we had the same internal package structure when CompletableProcessor was first introduced/moved into the current package. I agree this class is useful, and we use it throughout ServiceTalk for our internal implementations. We should be restricting what goes in the api package as what is necessary to use the api, and I don't think SingleProcessor or CompletableProcessor are necessary for our users to interact with concurrent.api.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use CompletableProcessor in concurrent-api hence we can't move it to concurrent-api-internal. However, SingleProcessor isn't used in concurrent-api so can be moved to concurrent-api-internal if we are OK with one residing in concurrent-api and other doesn't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.QueueFullAndRejectedSubscribeException;
import io.servicetalk.concurrent.internal.TerminalNotification;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void onSuccess(@Nullable final T result) {

@Override
public void onError(final Throwable t) {
terminate(t);
terminate(TerminalNotification.error(t));
}

private void terminate(@Nullable Object terminalSignal) {
Expand All @@ -104,8 +104,9 @@ private void terminate(@Nullable Object terminalSignal) {
}

private void notifyListeners(@Nullable Object terminalSignal) {
if (terminalSignal instanceof Throwable) {
final Throwable error = (Throwable) terminalSignal;
if (terminalSignal instanceof TerminalNotification) {
final Throwable error = ((TerminalNotification) terminalSignal).getCause();
assert error != null : "Cause can't be null from TerminalNotification.error(..)";
drainSingleConsumerQueueDelayThrow(subscribers, subscriber -> subscriber.onError(error),
drainingTheQueueUpdater, this);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import org.junit.Rule;
import org.junit.Test;

import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;

public class SingleProcessorTest {
@Rule
public final MockedSingleListenerRule<?> rule = new MockedSingleListenerRule<>();
@Rule
public final MockedSingleListenerRule<?> rule2 = new MockedSingleListenerRule<>();

@SuppressWarnings("unchecked")
private <T> MockedSingleListenerRule<T> rule() {
return (MockedSingleListenerRule<T>) rule;
}

@SuppressWarnings("unchecked")
private <T> MockedSingleListenerRule<T> rule2() {
return (MockedSingleListenerRule<T>) rule2;
}

@Test
public void testSuccessBeforeListen() {
testSuccessBeforeListen("foo");
}

@Test
public void testSuccessThrowableBeforeListen() {
testSuccessBeforeListen(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessNullBeforeListen() {
testSuccessBeforeListen(null);
}

private <T> void testSuccessBeforeListen(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onSuccess(expected);
rule().listen(processor).verifySuccess(expected);
}

@Test
public void testErrorBeforeListen() {
SingleProcessor<String> processor = new SingleProcessor<>();
processor.onError(DELIBERATE_EXCEPTION);
rule().listen(processor).verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessAfterListen() {
testSuccessAfterListen("foo");
}

@Test
public void testSuccessNullAfterListen() {
testSuccessAfterListen(null);
}

@Test
public void testSuccessThrowableAfterListen() {
testSuccessAfterListen(DELIBERATE_EXCEPTION);
}

private <T> void testSuccessAfterListen(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
processor.onSuccess(expected);
rule().verifySuccess(expected);
}

@Test
public void testErrorAfterListen() {
SingleProcessor<String> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
processor.onError(DELIBERATE_EXCEPTION);
rule().verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessThenError() {
testSuccessThenError("foo");
}

@Test
public void testSuccessThrowableThenError() {
testSuccessThenError(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessNullThenError() {
testSuccessThenError(null);
}

private <T> void testSuccessThenError(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onSuccess(expected);
processor.onError(DELIBERATE_EXCEPTION);
rule().listen(processor).verifySuccess(expected);
}

@Test
public void testErrorThenSuccess() {
testErrorThenSuccess("foo");
}

@Test
public void testErrorThenSuccessThrowable() {
testErrorThenSuccess(DELIBERATE_EXCEPTION);
}

@Test
public void testErrorThenSuccessNull() {
testErrorThenSuccess(null);
}

private <T> void testErrorThenSuccess(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onError(DELIBERATE_EXCEPTION);
processor.onSuccess(expected);
rule().listen(processor).verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified("foo");
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotifiedWithNull() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(null);
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotifiedWithThrowable() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(DELIBERATE_EXCEPTION);
}

private <T> void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
rule2().listen(processor).verifyNoEmissions();
rule().cancel();
processor.onSuccess(expected);
rule().verifyNoEmissions();
rule2().verifySuccess(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public void onSuccess(@Nullable final T result) {

@Override
public void onError(final Throwable t) {
setTerminal(t);
setTerminal(TerminalNotification.error(t));
}

@Override
Expand All @@ -581,9 +581,11 @@ void terminateOnEnqueueFailure(final Throwable cause) {

@Override
void deliverTerminalToSubscriber(final Object terminal) {
if (terminal instanceof Throwable) {
if (terminal instanceof TerminalNotification) {
try {
target.onError((Throwable) terminal);
final Throwable error = ((TerminalNotification) terminal).getCause();
assert error != null : "Cause can't be null from TerminalNotification.error(..)";
target.onError(error);
} catch (Throwable t) {
LOGGER.error("Ignored unexpected exception from onError. Subscriber: {}", target, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBuffersAndTrailersIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpObjectsAndTrailersIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.concurrent.internal.BlockingIterables;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBuffersAndTrailersIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.http.api.HttpDataSourceTranformations.BridgeFlowControlAndDiscardOperator;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterOperator;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpPayloadAndTrailersFromSingleOperator;
Expand Down