Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsublite.Constants;
Expand Down Expand Up @@ -209,12 +210,11 @@ public ApiFuture<Offset> publish(Message message) {
return ApiFutures.immediateFailedFuture(error.asException());
}
try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) {
return ApiFutures.immediateFailedFuture(
Status.FAILED_PRECONDITION
.withDescription("Published after the stream shut down.")
.asException());
}
ApiService.State currentState = state();
checkState(
currentState == ApiService.State.RUNNING,
String.format("Cannot publish when Publisher state is %s.", currentState.name()));
checkState(!shutdown, "Published after the stream shut down.");
ApiFuture<Offset> messageFuture = batcher.add(proto);
if (batcher.shouldFlush()) {
processBatch(batcher.flush());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,25 @@ public void setUp() throws StatusException {
INITIAL_PUBLISH_REQUEST.getInitialRequest(),
BATCHING_SETTINGS_THAT_NEVER_FIRE);
publisher.addListener(permanentErrorHandler, MoreExecutors.directExecutor());
}

private void startPublisher() {
publisher.startAsync().awaitRunning();

assertThat(leakedOffsetStream).isNotNull();
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
}

@Test
public void construct_CallsFactoryNew() {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
verifyNoMoreInteractions(mockPublisherFactory);
verifyZeroInteractions(mockBatchPublisher);
}

@Test
public void construct_FlushSendsBatched() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message = Message.builder().build();
Future<Offset> future = publisher.publish(message);

Expand All @@ -151,7 +156,7 @@ public void construct_FlushSendsBatched() throws Exception {

@Test
public void construct_CloseSendsBatched() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message = Message.builder().build();
Future<Offset> future = publisher.publish(message);

Expand All @@ -172,9 +177,18 @@ public void construct_CloseSendsBatched() throws Exception {
verifyNoMoreInteractions(mockBatchPublisher);
}

@Test
public void publishBeforeStart_IsPermanentError() throws Exception {
Message message = Message.builder().build();
assertThrows(IllegalStateException.class, () -> publisher.publish(message));
assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning());
verifyZeroInteractions(mockPublisherFactory);
verifyZeroInteractions(mockBatchPublisher);
}

@Test
public void publishAfterError_IsError() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
leakedOffsetStream.onError(Status.PERMISSION_DENIED.asRuntimeException());
assertThrows(IllegalStateException.class, publisher::awaitTerminated);
errorOccurredLatch.await();
Expand All @@ -191,7 +205,7 @@ public void publishAfterError_IsError() throws Exception {

@Test
public void multipleBatches_Ok() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message1 = Message.builder().build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
Expand Down Expand Up @@ -226,7 +240,7 @@ public void multipleBatches_Ok() throws Exception {

@Test
public void retryableError_RecreatesAndRetriesAll() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build();
Future<Offset> future1 = publisher.publish(message1);
Expand Down Expand Up @@ -276,7 +290,7 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {

@Test
public void invalidOffsetSequence_SetsPermanentException() throws Exception {
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
startPublisher();
Message message1 = Message.builder().build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
Expand Down