Skip to content

Commit

Permalink
Better visibility on transport errors (#278)
Browse files Browse the repository at this point in the history
__Motivation__

We want to surface the close reason from `NettyConnection` in the exception returned from the request. By exposing that, we also enable users to observe transport errors and drive connection state metrics using a `HttpConnectionFilterFactory`. We should also consider elaborating the message of the `ClosedChannelException` produced by `CloseHandler.CloseEvent`

__Modifications__

- expose transport level exception on `ConnectionContext`
- move `SingleProcessor` from `internal` to `api`

__Result__

Users have access to transport level `Exception` information to help diagnose failures and drive metrics
  • Loading branch information
jayv committed Feb 8, 2019
1 parent 4163e2e commit 8fa25f9
Show file tree
Hide file tree
Showing 26 changed files with 321 additions and 222 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api.internal;
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.QueueFullAndRejectedSubscribeException;
import io.servicetalk.concurrent.internal.TerminalNotification;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void onSuccess(@Nullable final T result) {

@Override
public void onError(final Throwable t) {
terminate(t);
terminate(TerminalNotification.error(t));
}

private void terminate(@Nullable Object terminalSignal) {
Expand All @@ -104,8 +104,9 @@ private void terminate(@Nullable Object terminalSignal) {
}

private void notifyListeners(@Nullable Object terminalSignal) {
if (terminalSignal instanceof Throwable) {
final Throwable error = (Throwable) terminalSignal;
if (terminalSignal instanceof TerminalNotification) {
final Throwable error = ((TerminalNotification) terminalSignal).getCause();
assert error != null : "Cause can't be null from TerminalNotification.error(..)";
drainSingleConsumerQueueDelayThrow(subscribers, subscriber -> subscriber.onError(error),
drainingTheQueueUpdater, this);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import org.junit.Rule;
import org.junit.Test;

import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;

public class SingleProcessorTest {
@Rule
public final MockedSingleListenerRule<?> rule = new MockedSingleListenerRule<>();
@Rule
public final MockedSingleListenerRule<?> rule2 = new MockedSingleListenerRule<>();

@SuppressWarnings("unchecked")
private <T> MockedSingleListenerRule<T> rule() {
return (MockedSingleListenerRule<T>) rule;
}

@SuppressWarnings("unchecked")
private <T> MockedSingleListenerRule<T> rule2() {
return (MockedSingleListenerRule<T>) rule2;
}

@Test
public void testSuccessBeforeListen() {
testSuccessBeforeListen("foo");
}

@Test
public void testSuccessThrowableBeforeListen() {
testSuccessBeforeListen(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessNullBeforeListen() {
testSuccessBeforeListen(null);
}

private <T> void testSuccessBeforeListen(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onSuccess(expected);
rule().listen(processor).verifySuccess(expected);
}

@Test
public void testErrorBeforeListen() {
SingleProcessor<String> processor = new SingleProcessor<>();
processor.onError(DELIBERATE_EXCEPTION);
rule().listen(processor).verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessAfterListen() {
testSuccessAfterListen("foo");
}

@Test
public void testSuccessNullAfterListen() {
testSuccessAfterListen(null);
}

@Test
public void testSuccessThrowableAfterListen() {
testSuccessAfterListen(DELIBERATE_EXCEPTION);
}

private <T> void testSuccessAfterListen(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
processor.onSuccess(expected);
rule().verifySuccess(expected);
}

@Test
public void testErrorAfterListen() {
SingleProcessor<String> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
processor.onError(DELIBERATE_EXCEPTION);
rule().verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessThenError() {
testSuccessThenError("foo");
}

@Test
public void testSuccessThrowableThenError() {
testSuccessThenError(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessNullThenError() {
testSuccessThenError(null);
}

private <T> void testSuccessThenError(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onSuccess(expected);
processor.onError(DELIBERATE_EXCEPTION);
rule().listen(processor).verifySuccess(expected);
}

@Test
public void testErrorThenSuccess() {
testErrorThenSuccess("foo");
}

@Test
public void testErrorThenSuccessThrowable() {
testErrorThenSuccess(DELIBERATE_EXCEPTION);
}

@Test
public void testErrorThenSuccessNull() {
testErrorThenSuccess(null);
}

private <T> void testErrorThenSuccess(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onError(DELIBERATE_EXCEPTION);
processor.onSuccess(expected);
rule().listen(processor).verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified("foo");
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotifiedWithNull() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(null);
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotifiedWithThrowable() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(DELIBERATE_EXCEPTION);
}

private <T> void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
rule2().listen(processor).verifyNoEmissions();
rule().cancel();
processor.onSuccess(expected);
rule().verifyNoEmissions();
rule2().verifySuccess(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public void onSuccess(@Nullable final T result) {

@Override
public void onError(final Throwable t) {
setTerminal(t);
setTerminal(TerminalNotification.error(t));
}

@Override
Expand All @@ -581,9 +581,11 @@ void terminateOnEnqueueFailure(final Throwable cause) {

@Override
void deliverTerminalToSubscriber(final Object terminal) {
if (terminal instanceof Throwable) {
if (terminal instanceof TerminalNotification) {
try {
target.onError((Throwable) terminal);
final Throwable error = ((TerminalNotification) terminal).getCause();
assert error != null : "Cause can't be null from TerminalNotification.error(..)";
target.onError(error);
} catch (Throwable t) {
LOGGER.error("Ignored unexpected exception from onError. Subscriber: {}", target, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBuffersAndTrailersIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpObjectsAndTrailersIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.concurrent.internal.BlockingIterables;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBuffersAndTrailersIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.http.api.HttpDataSourceTranformations.BridgeFlowControlAndDiscardOperator;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterOperator;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpPayloadAndTrailersFromSingleOperator;
Expand Down

0 comments on commit 8fa25f9

Please sign in to comment.