Skip to content

Commit

Permalink
fix: Fail early when publishing to already failed publishers (#934)
Browse files Browse the repository at this point in the history
Also log a warning by default when a publisher client fails.

Also make dependencies.sh include googleapis/synthtool#1266
  • Loading branch information
dpcollins-google committed Nov 3, 2021
1 parent 5801212 commit 6ab2754
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
10 changes: 4 additions & 6 deletions .kokoro/dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@ function determineMavenOpts() {
| sed -E 's/^(1\.[0-9]\.0).*$/\1/g'
)

case $javaVersion in
"17")
if [[ $javaVersion == 17* ]]
then
# MaxPermSize is no longer supported as of jdk 17
echo -n "-Xmx1024m"
;;
*)
else
echo -n "-Xmx1024m -XX:MaxPermSize=128m"
;;
esac
fi
}

export MAVEN_OPTS=$(determineMavenOpts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.flogger.GoogleLogger;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
// publisher. It encodes a MessageMetadata object in the response string.
public class WrappingPublisher extends ProxyService implements Publisher {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher;
private final MessageTransformer<PubsubMessage, Message> transformer;

Expand All @@ -43,11 +46,23 @@ public WrappingPublisher(
super(wirePublisher);
this.wirePublisher = wirePublisher;
this.transformer = transformer;
addListener(
new Listener() {
@Override
public void failed(State from, Throwable failure) {
logger.atWarning().withCause(failure).log(
"Publisher client failed with permanent error.");
}
},
SystemExecutors.getFuturesExecutor());
}

// Publisher implementation.
@Override
public ApiFuture<String> publish(PubsubMessage message) {
if (state().equals(State.FAILED)) {
return ApiFutures.immediateFailedFuture(toCanonical(failureCause()).underlying);
}
Message wireMessage;
try {
wireMessage = transformer.transform(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.pubsublite.cloudpubsub.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -38,6 +40,7 @@
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -101,4 +104,15 @@ public void badTimestampCannotBeTransformed() {
ApiExceptionMatcher.assertFutureThrowsCode(published, Code.INVALID_ARGUMENT);
assertThat(publisher.isRunning()).isFalse();
}

@Test
public void publishAfterFailureFailedImmediately() throws Exception {
underlying.fail(new CheckedApiException(Code.FAILED_PRECONDITION));
assertThrows(Throwable.class, publisher::awaitTerminated);

PubsubMessage message = PubsubMessage.newBuilder().setOrderingKey("abc").build();
ApiFuture<String> published = publisher.publish(message);
ExecutionException e = assertThrows(ExecutionException.class, published::get);
assertThat(toCanonical(e).code()).isEqualTo(Code.FAILED_PRECONDITION);
}
}

0 comments on commit 6ab2754

Please sign in to comment.