diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 0aa316b99..47466c6e0 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.ApiService; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.cloud.pubsublite.Constants; @@ -209,12 +210,11 @@ public ApiFuture publish(Message message) { return ApiFutures.immediateFailedFuture(error.asException()); } try (CloseableMonitor.Hold h = monitor.enter()) { - if (shutdown) { - return ApiFutures.immediateFailedFuture( - Status.FAILED_PRECONDITION - .withDescription("Published after the stream shut down.") - .asException()); - } + ApiService.State currentState = state(); + checkState( + currentState == ApiService.State.RUNNING, + String.format("Cannot publish when Publisher state is %s.", currentState.name())); + checkState(!shutdown, "Published after the stream shut down."); ApiFuture messageFuture = batcher.add(proto); if (batcher.shouldFlush()) { processBatch(batcher.flush()); diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java index fe14542dd..f7a1f58e5 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java @@ -116,20 +116,25 @@ public void setUp() throws StatusException { INITIAL_PUBLISH_REQUEST.getInitialRequest(), BATCHING_SETTINGS_THAT_NEVER_FIRE); publisher.addListener(permanentErrorHandler, MoreExecutors.directExecutor()); + } + + private void startPublisher() { publisher.startAsync().awaitRunning(); + assertThat(leakedOffsetStream).isNotNull(); + verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); } @Test public void construct_CallsFactoryNew() { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); verifyNoMoreInteractions(mockPublisherFactory); verifyZeroInteractions(mockBatchPublisher); } @Test public void construct_FlushSendsBatched() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message = Message.builder().build(); Future future = publisher.publish(message); @@ -151,7 +156,7 @@ public void construct_FlushSendsBatched() throws Exception { @Test public void construct_CloseSendsBatched() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message = Message.builder().build(); Future future = publisher.publish(message); @@ -172,9 +177,18 @@ public void construct_CloseSendsBatched() throws Exception { verifyNoMoreInteractions(mockBatchPublisher); } + @Test + public void publishBeforeStart_IsPermanentError() throws Exception { + Message message = Message.builder().build(); + assertThrows(IllegalStateException.class, () -> publisher.publish(message)); + assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning()); + verifyZeroInteractions(mockPublisherFactory); + verifyZeroInteractions(mockBatchPublisher); + } + @Test public void publishAfterError_IsError() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); leakedOffsetStream.onError(Status.PERMISSION_DENIED.asRuntimeException()); assertThrows(IllegalStateException.class, publisher::awaitTerminated); errorOccurredLatch.await(); @@ -191,7 +205,7 @@ public void publishAfterError_IsError() throws Exception { @Test public void multipleBatches_Ok() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message1 = Message.builder().build(); Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build(); Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build(); @@ -226,7 +240,7 @@ public void multipleBatches_Ok() throws Exception { @Test public void retryableError_RecreatesAndRetriesAll() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build(); Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build(); Future future1 = publisher.publish(message1); @@ -276,7 +290,7 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception { @Test public void invalidOffsetSequence_SetsPermanentException() throws Exception { - verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + startPublisher(); Message message1 = Message.builder().build(); Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build(); Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();