From b1551cd7b1f943644d3d357e00679aa4613fd331 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 26 May 2020 05:58:28 -0400 Subject: [PATCH 1/7] Throw an exception on publish if the publisher state is not RUNNING --- .../cloudpubsub/internal/WrappingPublisher.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java index be536c765..51d590e69 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java @@ -16,8 +16,11 @@ package com.google.cloud.pubsublite.cloudpubsub.internal; +import static com.google.cloud.pubsublite.internal.Preconditions.checkState; + import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.ApiService; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.MessageTransformer; import com.google.cloud.pubsublite.PublishMetadata; @@ -62,6 +65,15 @@ public ApiFuture publish(PubsubMessage message) { onPermanentError(e); return ApiFutures.immediateFailedFuture(e); } + try { + ApiService.State currentState = state(); + checkState( + currentState == ApiService.State.RUNNING, + String.format("Cannot publish when Publisher state is %s.", currentState.name())); + } catch (StatusException e) { + // Non-permanent error. + return ApiFutures.immediateFailedFuture(e); + } return ApiFutures.transform( wirePublisher.publish(wireMessage), PublishMetadata::encode, From bd8e1ef9ba6487c257be563fcc233d54b807b885 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 26 May 2020 07:01:21 -0400 Subject: [PATCH 2/7] Immediately return failed future --- .../internal/WrappingPublisher.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java index 51d590e69..c688727d1 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java @@ -16,8 +16,6 @@ package com.google.cloud.pubsublite.cloudpubsub.internal; -import static com.google.cloud.pubsublite.internal.Preconditions.checkState; - import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.ApiService; @@ -28,6 +26,7 @@ import com.google.cloud.pubsublite.internal.ProxyService; import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; +import io.grpc.Status; import io.grpc.StatusException; // A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant @@ -58,6 +57,15 @@ protected void handlePermanentError(StatusException error) {} // Publisher implementation. @Override public ApiFuture publish(PubsubMessage message) { + ApiService.State currentState = state(); + if (currentState != ApiService.State.RUNNING) { + return ApiFutures.immediateFailedFuture( + Status.FAILED_PRECONDITION + .withDescription( + String.format("Cannot publish when Publisher state is %s.", currentState.name())) + .asException()); + } + Message wireMessage; try { wireMessage = transformer.transform(message); @@ -65,15 +73,6 @@ public ApiFuture publish(PubsubMessage message) { onPermanentError(e); return ApiFutures.immediateFailedFuture(e); } - try { - ApiService.State currentState = state(); - checkState( - currentState == ApiService.State.RUNNING, - String.format("Cannot publish when Publisher state is %s.", currentState.name())); - } catch (StatusException e) { - // Non-permanent error. - return ApiFutures.immediateFailedFuture(e); - } return ApiFutures.transform( wirePublisher.publish(wireMessage), PublishMetadata::encode, From 069edb33b366d55d481c533749fd9db45dbe7b3d Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 27 May 2020 17:05:38 -0400 Subject: [PATCH 3/7] Moved check to PublisherImpl --- .../cloudpubsub/internal/WrappingPublisher.java | 11 ----------- .../cloud/pubsublite/internal/wire/PublisherImpl.java | 10 ++++++++++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java index c688727d1..be536c765 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java @@ -18,7 +18,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; -import com.google.api.core.ApiService; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.MessageTransformer; import com.google.cloud.pubsublite.PublishMetadata; @@ -26,7 +25,6 @@ import com.google.cloud.pubsublite.internal.ProxyService; import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; -import io.grpc.Status; import io.grpc.StatusException; // A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant @@ -57,15 +55,6 @@ protected void handlePermanentError(StatusException error) {} // Publisher implementation. @Override public ApiFuture publish(PubsubMessage message) { - ApiService.State currentState = state(); - if (currentState != ApiService.State.RUNNING) { - return ApiFutures.immediateFailedFuture( - Status.FAILED_PRECONDITION - .withDescription( - String.format("Cannot publish when Publisher state is %s.", currentState.name())) - .asException()); - } - Message wireMessage; try { wireMessage = transformer.transform(message); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 0aa316b99..6f8131747 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.ApiService; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.cloud.pubsublite.Constants; @@ -193,6 +194,15 @@ private void processBatch(Collection batch) throws StatusExcep @Override public ApiFuture publish(Message message) { + ApiService.State currentState = state(); + if (currentState != ApiService.State.RUNNING) { + return ApiFutures.immediateFailedFuture( + Status.FAILED_PRECONDITION + .withDescription( + String.format("Cannot publish when Publisher state is %s.", currentState.name())) + .asException()); + } + PubSubMessage proto = message.toProto(); if (proto.getSerializedSize() > Constants.MAX_PUBLISH_MESSAGE_BYTES) { Status error = From 8d7999a5a4100687b0a84ee8697925479f37ab7e Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 28 May 2020 02:35:11 -0400 Subject: [PATCH 4/7] Tests for PublisherImplTest --- .../internal/wire/PublisherImpl.java | 1 - .../internal/wire/PublisherImplTest.java | 32 +++++++++++++++---- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 6f8131747..e7dad1c03 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -202,7 +202,6 @@ public ApiFuture publish(Message message) { String.format("Cannot publish when Publisher state is %s.", currentState.name())) .asException()); } - PubSubMessage proto = message.toProto(); if (proto.getSerializedSize() > Constants.MAX_PUBLISH_MESSAGE_BYTES) { Status error = diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java index fe14542dd..f7ba1c457 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java @@ -116,20 +116,25 @@ public void setUp() throws StatusException { INITIAL_PUBLISH_REQUEST.getInitialRequest(), BATCHING_SETTINGS_THAT_NEVER_FIRE); publisher.addListener(permanentErrorHandler, MoreExecutors.directExecutor()); + } + + private void startPublisher() { publisher.startAsync().awaitRunning(); + assertThat(leakedOffsetStream).isNotNull(); + verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); } @Test public void construct_CallsFactoryNew() { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); verifyNoMoreInteractions(mockPublisherFactory); verifyZeroInteractions(mockBatchPublisher); } @Test public void construct_FlushSendsBatched() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message = Message.builder().build(); Future future = publisher.publish(message); @@ -151,7 +156,7 @@ public void construct_FlushSendsBatched() throws Exception { @Test public void construct_CloseSendsBatched() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message = Message.builder().build(); Future future = publisher.publish(message); @@ -172,9 +177,22 @@ public void construct_CloseSendsBatched() throws Exception { verifyNoMoreInteractions(mockBatchPublisher); } + @Test + public void publishBeforeStart_IsError() throws Exception { + Message message = Message.builder().build(); + Future future = publisher.publish(message); + ExecutionException e = assertThrows(ExecutionException.class, future::get); + Optional statusOr = ExtractStatus.extract(e.getCause()); + assertThat(statusOr.isPresent()).isTrue(); + assertThat(statusOr.get().getCode()).isEqualTo(Code.FAILED_PRECONDITION); + + verifyZeroInteractions(mockPublisherFactory); + verifyZeroInteractions(mockBatchPublisher); + } + @Test public void publishAfterError_IsError() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); leakedOffsetStream.onError(Status.PERMISSION_DENIED.asRuntimeException()); assertThrows(IllegalStateException.class, publisher::awaitTerminated); errorOccurredLatch.await(); @@ -191,7 +209,7 @@ public void publishAfterError_IsError() throws Exception { @Test public void multipleBatches_Ok() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message1 = Message.builder().build(); Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build(); Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build(); @@ -226,7 +244,7 @@ public void multipleBatches_Ok() throws Exception { @Test public void retryableError_RecreatesAndRetriesAll() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build(); Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build(); Future future1 = publisher.publish(message1); @@ -276,7 +294,7 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception { @Test public void invalidOffsetSequence_SetsPermanentException() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message1 = Message.builder().build(); Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build(); Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build(); From 20c782924a6f683fb84914576cf16b3f45e2ce02 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 2 Jun 2020 02:04:54 -0400 Subject: [PATCH 5/7] Throw a permanent error when publishing before starting the service --- .../internal/wire/PublisherImpl.java | 27 +++++++++---------- .../internal/wire/PublisherImplTest.java | 9 +++---- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index e7dad1c03..32c793f60 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -194,14 +194,6 @@ private void processBatch(Collection batch) throws StatusExcep @Override public ApiFuture publish(Message message) { - ApiService.State currentState = state(); - if (currentState != ApiService.State.RUNNING) { - return ApiFutures.immediateFailedFuture( - Status.FAILED_PRECONDITION - .withDescription( - String.format("Cannot publish when Publisher state is %s.", currentState.name())) - .asException()); - } PubSubMessage proto = message.toProto(); if (proto.getSerializedSize() > Constants.MAX_PUBLISH_MESSAGE_BYTES) { Status error = @@ -218,12 +210,12 @@ public ApiFuture publish(Message message) { return ApiFutures.immediateFailedFuture(error.asException()); } try (CloseableMonitor.Hold h = monitor.enter()) { - if (shutdown) { - return ApiFutures.immediateFailedFuture( - Status.FAILED_PRECONDITION - .withDescription("Published after the stream shut down.") - .asException()); - } + ApiService.State currentState = state(); + checkState( + currentState == ApiService.State.RUNNING, + String.format("Cannot publish when Publisher state is %s.", currentState.name())); + // This is an assertion. If the service is running, it should not be shutdown. + checkState(!shutdown, "Published after the stream shut down."); ApiFuture messageFuture = batcher.add(proto); if (batcher.shouldFlush()) { processBatch(batcher.flush()); @@ -245,6 +237,13 @@ void flushToStream() { } } + @VisibleForTesting + boolean isShutdown() { + try (CloseableMonitor.Hold h = monitor.enter()) { + return shutdown; + } + } + // Flushable implementation @Override public void flush() { diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java index f7ba1c457..2f9f04791 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java @@ -178,14 +178,11 @@ public void construct_CloseSendsBatched() throws Exception { } @Test - public void publishBeforeStart_IsError() throws Exception { + public void publishBeforeStart_IsPermanentError() throws Exception { Message message = Message.builder().build(); - Future future = publisher.publish(message); - ExecutionException e = assertThrows(ExecutionException.class, future::get); - Optional statusOr = ExtractStatus.extract(e.getCause()); - assertThat(statusOr.isPresent()).isTrue(); - assertThat(statusOr.get().getCode()).isEqualTo(Code.FAILED_PRECONDITION); + assertThrows(IllegalStateException.class, () -> publisher.publish(message)); + assertThat(publisher.isShutdown()).isTrue(); verifyZeroInteractions(mockPublisherFactory); verifyZeroInteractions(mockBatchPublisher); } From 7702cbab0ce24b2f7227896b3a528eab52e011de Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 2 Jun 2020 17:42:28 -0400 Subject: [PATCH 6/7] Remove comment --- .../com/google/cloud/pubsublite/internal/wire/PublisherImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 32c793f60..0d9a8fc1c 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -214,7 +214,6 @@ public ApiFuture publish(Message message) { checkState( currentState == ApiService.State.RUNNING, String.format("Cannot publish when Publisher state is %s.", currentState.name())); - // This is an assertion. If the service is running, it should not be shutdown. checkState(!shutdown, "Published after the stream shut down."); ApiFuture messageFuture = batcher.add(proto); if (batcher.shouldFlush()) { From 15444866703479a1bd89bb274ec189a757977e49 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Fri, 5 Jun 2020 00:45:00 -0400 Subject: [PATCH 7/7] Check for permanent failure by trying to start publisher --- .../cloud/pubsublite/internal/wire/PublisherImpl.java | 7 ------- .../cloud/pubsublite/internal/wire/PublisherImplTest.java | 3 +-- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 0d9a8fc1c..47466c6e0 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -236,13 +236,6 @@ void flushToStream() { } } - @VisibleForTesting - boolean isShutdown() { - try (CloseableMonitor.Hold h = monitor.enter()) { - return shutdown; - } - } - // Flushable implementation @Override public void flush() { diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java index 2f9f04791..f7a1f58e5 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java @@ -181,8 +181,7 @@ public void construct_CloseSendsBatched() throws Exception { public void publishBeforeStart_IsPermanentError() throws Exception { Message message = Message.builder().build(); assertThrows(IllegalStateException.class, () -> publisher.publish(message)); - - assertThat(publisher.isShutdown()).isTrue(); + assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning()); verifyZeroInteractions(mockPublisherFactory); verifyZeroInteractions(mockBatchPublisher); }