Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better visibility on transport errors #278

Merged
merged 6 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2018-2019 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah Intellij is trying to be not smart enough here.

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api.internal;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayv - one thing I forgot to ask was that does SingleProcessor need to be part of the api? We have a few internal packages now where I imagine this class could live.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasoning: CompletableProcessor was already public so we decided to make this one public too. I think it's a useful API to have, what is your concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we had the same internal package structure when CompletableProcessor was first introduced/moved into the current package. I agree this class is useful, and we use it throughout ServiceTalk for our internal implementations. We should be restricting what goes in the api package as what is necessary to use the api, and I don't think SingleProcessor or CompletableProcessor are necessary for our users to interact with concurrent.api.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use CompletableProcessor in concurrent-api hence we can't move it to concurrent-api-internal. However, SingleProcessor isn't used in concurrent-api so can be moved to concurrent-api-internal if we are OK with one residing in concurrent-api and other doesn't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.QueueFullAndRejectedSubscribeException;

import java.util.Queue;
Expand Down Expand Up @@ -89,7 +88,7 @@ public void onSubscribe(final Cancellable cancellable) {

@Override
public void onSuccess(@Nullable final T result) {
terminate(result);
terminate(ThrowableWrapper.wrapIfThrowable(result));
NiteshKant marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there benefit to wrapping success (presumably the more common case) instead of wrapping errors?
I think in notifyListeners, wrapping errors (and replacing terminalSignal instanceof Throwable with terminalSignal instanceof ThrowableWrapper) would reduce the number of instanceofs in the success case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's indeed a great suggestion!

}

@Override
Expand All @@ -109,10 +108,36 @@ private void notifyListeners(@Nullable Object terminalSignal) {
drainSingleConsumerQueueDelayThrow(subscribers, subscriber -> subscriber.onError(error),
drainingTheQueueUpdater, this);
} else {
@SuppressWarnings("unchecked")
final T value = (T) terminalSignal;
final T value = ThrowableWrapper.unwrapIfNeeded(terminalSignal);
drainSingleConsumerQueueDelayThrow(subscribers, subscriber -> subscriber.onSuccess(value),
drainingTheQueueUpdater, this);
}
}

/**
* Wrapper to mark {@link Throwable} data so that we don't deliver it as {@link Subscriber#onError(Throwable)}.
*/
private static final class ThrowableWrapper {
NiteshKant marked this conversation as resolved.
Show resolved Hide resolved
private final Throwable t;
private ThrowableWrapper(final Throwable t) {
this.t = t;
}

@SuppressWarnings("unchecked")
@Nullable
static <T> T unwrapIfNeeded(@Nullable Object inData) {
if (inData instanceof SingleProcessor.ThrowableWrapper) {
return (T) ((ThrowableWrapper) inData).t;
}
return (T) inData;
}

@Nullable
static Object wrapIfThrowable(@Nullable Object inData) {
if (inData instanceof Throwable) {
return new ThrowableWrapper((Throwable) inData);
}
return inData;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2018-2019

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import org.junit.Rule;
import org.junit.Test;

import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;

public class SingleProcessorTest {
@Rule
public final MockedSingleListenerRule<?> rule = new MockedSingleListenerRule<>();
@Rule
public final MockedSingleListenerRule<?> rule2 = new MockedSingleListenerRule<>();

@SuppressWarnings("unchecked")
private <T> MockedSingleListenerRule<T> rule() {
return (MockedSingleListenerRule<T>) rule;
}

@SuppressWarnings("unchecked")
private <T> MockedSingleListenerRule<T> rule2() {
return (MockedSingleListenerRule<T>) rule2;
}

@Test
public void testSuccessBeforeListen() {
testSuccessBeforeListen("foo");
}

@Test
public void testSuccessThrowableBeforeListen() {
testSuccessBeforeListen(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessNullBeforeListen() {
testSuccessBeforeListen(null);
}

private <T> void testSuccessBeforeListen(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onSuccess(expected);
rule().listen(processor).verifySuccess(expected);
}

@Test
public void testErrorBeforeListen() {
SingleProcessor<String> processor = new SingleProcessor<>();
processor.onError(DELIBERATE_EXCEPTION);
rule().listen(processor).verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessAfterListen() {
testSuccessAfterListen("foo");
}

@Test
public void testSuccessNullAfterListen() {
testSuccessAfterListen(null);
}

@Test
public void testSuccessThrowableAfterListen() {
testSuccessAfterListen(DELIBERATE_EXCEPTION);
}

private <T> void testSuccessAfterListen(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
processor.onSuccess(expected);
rule().verifySuccess(expected);
}

@Test
public void testErrorAfterListen() {
SingleProcessor<String> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
processor.onError(DELIBERATE_EXCEPTION);
rule().verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessThenError() {
testSuccessThenError("foo");
}

@Test
public void testSuccessThrowableThenError() {
testSuccessThenError(DELIBERATE_EXCEPTION);
}

@Test
public void testSuccessNullThenError() {
testSuccessThenError(null);
}

private <T> void testSuccessThenError(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onSuccess(expected);
processor.onError(DELIBERATE_EXCEPTION);
rule().listen(processor).verifySuccess(expected);
}

@Test
public void testErrorThenSuccess() {
testErrorThenSuccess("foo");
}

@Test
public void testErrorThenSuccessThrowable() {
testErrorThenSuccess(DELIBERATE_EXCEPTION);
}

@Test
public void testErrorThenSuccessNull() {
testErrorThenSuccess(null);
}

private <T> void testErrorThenSuccess(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
processor.onError(DELIBERATE_EXCEPTION);
processor.onSuccess(expected);
rule().listen(processor).verifyFailure(DELIBERATE_EXCEPTION);
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified("foo");
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotifiedWithNull() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(null);
}

@Test
public void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotifiedWithThrowable() {
cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(DELIBERATE_EXCEPTION);
}

private <T> void cancelRemovesListenerAndStillAllowsOtherListenersToBeNotified(@Nullable T expected) {
SingleProcessor<T> processor = new SingleProcessor<>();
rule().listen(processor).verifyNoEmissions();
rule2().listen(processor).verifyNoEmissions();
rule().cancel();
processor.onSuccess(expected);
rule().verifyNoEmissions();
rule2().verifySuccess(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBuffersAndTrailersIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpObjectsAndTrailersIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.servicetalk.concurrent.CloseableIterable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.concurrent.internal.BlockingIterables;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterIterable;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBuffersAndTrailersIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SingleProcessor;
import io.servicetalk.concurrent.api.SingleProcessor;
import io.servicetalk.http.api.HttpDataSourceTranformations.BridgeFlowControlAndDiscardOperator;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpBufferFilterOperator;
import io.servicetalk.http.api.HttpDataSourceTranformations.HttpPayloadAndTrailersFromSingleOperator;
Expand Down