From 2b50adb51a92857a605eff049ad580eed31544f4 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 24 Mar 2020 17:16:49 -0400 Subject: [PATCH 01/18] feat: Add flow control support to publisher --- .../com/google/cloud/pubsub/v1/Publisher.java | 175 +++++++++++++++- .../pubsub/v1/SequentialExecutorService.java | 4 + .../cloud/pubsub/v1/PublisherImplTest.java | 190 ++++++++++++++++++ 3 files changed, 366 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index e28427eea..45455fa2b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -25,6 +25,8 @@ import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.BackgroundResourceAggregation; import com.google.api.gax.core.CredentialsProvider; @@ -55,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -108,6 +111,8 @@ public class Publisher { private ScheduledFuture currentAlarmFuture; private final ApiFunction messageTransform; + private MessageFlowController flowController = null; + /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { return 1000L; @@ -122,6 +127,16 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings(); + if (flowControl != null + && flowControl.getLimitExceededBehavior() != FlowController.LimitExceededBehavior.Ignore) { + this.flowController = + new MessageFlowController( + flowControl.getMaxOutstandingElementCount(), + flowControl.getMaxOutstandingRequestBytes(), + flowControl.getLimitExceededBehavior()); + } + this.enableMessageOrdering = builder.enableMessageOrdering; this.messageTransform = builder.messageTransform; @@ -221,6 +236,19 @@ public ApiFuture publish(PubsubMessage message) { final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); + + if (flowController != null) { + try { + flowController.acquire(outstandingPublish.messageSize); + } catch (Exception e) { + if (!orderingKey.isEmpty()) { + sequentialExecutor.stopPublish(orderingKey); + } + outstandingPublish.publishResult.setException(e); + return outstandingPublish.publishResult; + } + } + List batchesToSend; messagesBatchLock.lock(); try { @@ -454,7 +482,7 @@ public ApiFuture call() { ApiFutures.addCallback(future, futureCallback, directExecutor()); } - private static final class OutstandingBatch { + private final class OutstandingBatch { final List outstandingPublishes; final long creationTime; int attempt; @@ -484,6 +512,9 @@ private List getMessages() { private void onFailure(Throwable t) { for (OutstandingPublish outstandingPublish : outstandingPublishes) { + if (flowController != null) { + flowController.release(outstandingPublish.messageSize); + } outstandingPublish.publishResult.setException(t); } } @@ -491,7 +522,11 @@ private void onFailure(Throwable t) { private void onSuccess(Iterable results) { Iterator messagesResultsIt = outstandingPublishes.iterator(); for (String messageId : results) { - messagesResultsIt.next().publishResult.set(messageId); + OutstandingPublish nextPublish = messagesResultsIt.next(); + if (flowController != null) { + flowController.release(nextPublish.messageSize); + } + nextPublish.publishResult.set(messageId); } } } @@ -602,6 +637,10 @@ public static final class Builder { .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD) .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore) + .build()) .build(); static final RetrySettings DEFAULT_RETRY_SETTINGS = RetrySettings.newBuilder() @@ -759,7 +798,137 @@ public Publisher build() throws IOException { } } - private static class MessagesBatch { + private static class MessageFlowController { + private final Lock lock; + private final Long messageLimit; + private final Long byteLimit; + private final FlowController.LimitExceededBehavior limitBehavior; + + private Long outstandingMessages; + private Long outstandingBytes; + private LinkedList awaitingMessageAcquires; + private LinkedList awaitingBytesAcquires; + + MessageFlowController( + Long messageLimit, Long byteLimit, FlowController.LimitExceededBehavior limitBehavior) { + this.messageLimit = messageLimit; + this.byteLimit = byteLimit; + this.limitBehavior = limitBehavior; + this.lock = new ReentrantLock(); + + this.outstandingMessages = 0L; + this.outstandingBytes = 0L; + + this.awaitingMessageAcquires = new LinkedList(); + this.awaitingBytesAcquires = new LinkedList(); + } + + void acquire(long messageSize) throws FlowController.FlowControlException { + lock.lock(); + try { + if (outstandingMessages >= messageLimit + && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingElementCountReachedException(messageLimit); + } + if (outstandingBytes + messageSize >= byteLimit + && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit); + } + + // We can acquire or we should wait until we can acquire. + // Start by acquiring a slot for a message. + CountDownLatch messageWaiter = null; + while (outstandingMessages >= messageLimit) { + if (messageWaiter == null) { + // This message gets added to the back of the line. + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.addLast(messageWaiter); + } else { + // This message already in line stays at the head of the line. + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.removeFirst(); + awaitingMessageAcquires.addFirst(messageWaiter); + } + lock.unlock(); + try { + messageWaiter.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + ++outstandingMessages; + if (messageWaiter != null) { + awaitingMessageAcquires.removeFirst(); + } + + // There may be some surplus messages left; let the next message waiting for a token have + // one. + if (!awaitingMessageAcquires.isEmpty() && outstandingMessages < messageLimit) { + awaitingMessageAcquires.getFirst().countDown(); + } + + // Now acquire space for bytes. + CountDownLatch bytesWaiter = null; + Long bytesRemaining = messageSize; + while (outstandingBytes + bytesRemaining >= byteLimit) { + // Take what is available. + Long available = byteLimit - outstandingBytes; + bytesRemaining -= available; + outstandingBytes = byteLimit; + if (bytesWaiter == null) { + // This message gets added to the back of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.addLast(bytesWaiter); + } else { + // This message already in line stays at the head of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.removeFirst(); + awaitingBytesAcquires.addFirst(bytesWaiter); + } + lock.unlock(); + try { + bytesWaiter.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + + outstandingBytes += bytesRemaining; + if (bytesWaiter != null) { + awaitingBytesAcquires.removeFirst(); + } + // There may be some surplus bytes left; let the next message waiting for bytes have some. + if (!awaitingBytesAcquires.isEmpty() && outstandingBytes < byteLimit) { + awaitingBytesAcquires.getFirst().countDown(); + } + } finally { + lock.unlock(); + } + } + + private void notifyNextAcquires() { + if (!awaitingMessageAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingMessageAcquires.getFirst(); + awaitingAcquire.countDown(); + } + if (!awaitingBytesAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingBytesAcquires.getFirst(); + awaitingAcquire.countDown(); + } + } + + void release(long messageSize) { + lock.lock(); + --outstandingMessages; + outstandingBytes -= messageSize; + notifyNextAcquires(); + lock.unlock(); + } + } + + private class MessagesBatch { private List messages; private int batchedBytes; private String orderingKey; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 712f51eb5..292921850 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -247,6 +247,10 @@ void resumePublish(String key) { keysWithErrors.remove(key); } + void stopPublish(String key) { + keysWithErrors.add(key); + } + /** Cancels every task in the queue associated with {@code key}. */ private void cancelQueuedTasks(final String key, Throwable e) { keysWithErrors.add(key); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index b1109c8fc..ea9e9850f 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -25,6 +25,8 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -43,7 +45,10 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; @@ -923,6 +928,191 @@ public void testShutDown() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } + @Test + public void testPublishFlowControl_throwException() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .build(); + + // Sending a message that is too large results in an exception. + ApiFuture publishFuture1 = sendTestMessage(publisher, "AAAAAAAAAAA"); + try { + publishFuture1.get(); + fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); + } + + // Sending a second message succeeds. + ApiFuture publishFuture2 = sendTestMessage(publisher, "AAAA"); + + // Sending a third message fails because of the outstanding message. + ApiFuture publishFuture3 = sendTestMessage(publisher, "AA"); + try { + publishFuture3.get(); + fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class); + } + + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + assertEquals("1", publishFuture2.get()); + + // Sending another message succeeds. + ApiFuture publishFuture4 = sendTestMessage(publisher, "AAAA"); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("2")); + assertEquals("2", publishFuture4.get()); + } + + @Test + public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Sending a message that is too large results in an exception. + ApiFuture publishFuture1 = + sendTestMessageWithOrderingKey(publisher, "AAAAAAAAAAA", "a"); + try { + publishFuture1.get(); + fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); + } + + // Sending a second message for the same ordering key fails because the first one failed. + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a"); + try { + publishFuture2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + } + + @Test + public void testPublishFlowControl_block() throws Exception { + final Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .setMaxOutstandingElementCount(2L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .build(); + Executor responseExecutor = Executors.newScheduledThreadPool(10); + final CountDownLatch sendResponse1 = new CountDownLatch(1); + final CountDownLatch sentResponse1 = new CountDownLatch(1); + final CountDownLatch sendResponse2 = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + sendResponse1.await(); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("1")); + sentResponse1.countDown(); + sendResponse2.await(); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("2")); + } catch (Exception e) { + } + } + }); + + // Sending two messages succeeds. + ApiFuture publishFuture1 = sendTestMessage(publisher, "AA"); + ApiFuture publishFuture2 = sendTestMessage(publisher, "AA"); + + // Sending a third message blocks because messages are outstanding. + final CountDownLatch publish3Completed = new CountDownLatch(1); + final CountDownLatch sentResponse3 = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + ApiFuture publishFuture3 = sendTestMessage(publisher, "AAAAAA"); + publish3Completed.countDown(); + } + }); + + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + sendResponse1.countDown(); + sentResponse1.await(); + sendResponse2.countDown(); + } catch (Exception e) { + } + } + }); + + // Sending a fourth message blocks because although only one message has been sent, + // the third message claimed the tokens for outstanding bytes. + final CountDownLatch publish4Completed = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + publish3Completed.await(); + ApiFuture publishFuture4 = sendTestMessage(publisher, "A"); + publish4Completed.countDown(); + } catch (Exception e) { + } + } + }); + + publish3Completed.await(); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); + sentResponse3.countDown(); + + publish4Completed.await(); + } + private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) From 6bb47bc432bb3a0bbce9483e98d4aa27fd46fe8b Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Wed, 25 Mar 2020 11:41:39 -0400 Subject: [PATCH 02/18] make suggested fixes --- .../java/com/google/cloud/pubsub/v1/Publisher.java | 8 +++----- .../com/google/cloud/pubsub/v1/PublisherImplTest.java | 10 +++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 45455fa2b..659d84bb6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -240,7 +240,7 @@ public ApiFuture publish(PubsubMessage message) { if (flowController != null) { try { flowController.acquire(outstandingPublish.messageSize); - } catch (Exception e) { + } catch (FlowController.FlowControlException e) { if (!orderingKey.isEmpty()) { sequentialExecutor.stopPublish(orderingKey); } @@ -846,8 +846,7 @@ void acquire(long messageSize) throws FlowController.FlowControlException { } else { // This message already in line stays at the head of the line. messageWaiter = new CountDownLatch(1); - awaitingMessageAcquires.removeFirst(); - awaitingMessageAcquires.addFirst(messageWaiter); + awaitingMessageAcquires.set(0, messageWaiter); } lock.unlock(); try { @@ -883,8 +882,7 @@ void acquire(long messageSize) throws FlowController.FlowControlException { } else { // This message already in line stays at the head of the line. bytesWaiter = new CountDownLatch(1); - awaitingBytesAcquires.removeFirst(); - awaitingBytesAcquires.addFirst(bytesWaiter); + awaitingBytesAcquires.set(0, bytesWaiter); } lock.unlock(); try { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index ea9e9850f..5ee5fdcfc 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -1042,7 +1042,7 @@ public void testPublishFlowControl_block() throws Exception { .build(); Executor responseExecutor = Executors.newScheduledThreadPool(10); final CountDownLatch sendResponse1 = new CountDownLatch(1); - final CountDownLatch sentResponse1 = new CountDownLatch(1); + final CountDownLatch response1Sent = new CountDownLatch(1); final CountDownLatch sendResponse2 = new CountDownLatch(1); responseExecutor.execute( new Runnable() { @@ -1052,7 +1052,7 @@ public void run() { sendResponse1.await(); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1")); - sentResponse1.countDown(); + response1Sent.countDown(); sendResponse2.await(); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("2")); @@ -1067,7 +1067,7 @@ public void run() { // Sending a third message blocks because messages are outstanding. final CountDownLatch publish3Completed = new CountDownLatch(1); - final CountDownLatch sentResponse3 = new CountDownLatch(1); + final CountDownLatch response3Sent = new CountDownLatch(1); responseExecutor.execute( new Runnable() { @Override @@ -1083,7 +1083,7 @@ public void run() { public void run() { try { sendResponse1.countDown(); - sentResponse1.await(); + response1Sent.await(); sendResponse2.countDown(); } catch (Exception e) { } @@ -1108,7 +1108,7 @@ public void run() { publish3Completed.await(); testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); - sentResponse3.countDown(); + response3Sent.countDown(); publish4Completed.await(); } From 9c113c3e32c28cf0d1de8aad3409b5c509fb1ada Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 2 May 2020 13:25:32 -0400 Subject: [PATCH 03/18] chore: Remove note that ordering keys requires enablements. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 282df369f..2c3fa86a1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -305,7 +305,7 @@ public void run() { * * @param key The key for which to resume publishing. */ - @BetaApi("Ordering is not yet fully supported and requires special project enablements.") + @BetaApi public void resumePublish(String key) { Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); sequentialExecutor.resumePublish(key); @@ -765,7 +765,7 @@ public Builder setRetrySettings(RetrySettings retrySettings) { } /** Sets the message ordering option. */ - @BetaApi("Ordering is not yet fully supported and requires special project enablements.") + @BetaApi public Builder setEnableMessageOrdering(boolean enableMessageOrdering) { this.enableMessageOrdering = enableMessageOrdering; return this; From 6947740747aa76784967fc1cb93a129a9408e1fe Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 20 Jun 2020 20:58:18 -0400 Subject: [PATCH 04/18] feat: Add support for server-side flow control --- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 7 +++++++ .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 1 + 2 files changed, 8 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 047e1ba75..0fa671190 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -25,6 +25,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcStatusCode; @@ -71,6 +72,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; + private final FlowControlSettings flowControlSettings; + private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); private final Waiter ackOperationsWaiter = new Waiter(); @@ -93,6 +96,7 @@ public StreamingSubscriberConnection( Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, + FlowControlSettings flowControlSettings, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, @@ -112,6 +116,7 @@ public StreamingSubscriberConnection( executor, systemExecutor, clock); + this.flowControlSettings = flowControlSettings; } @Override @@ -209,6 +214,8 @@ private void initialize() { .setSubscription(subscription) .setStreamAckDeadlineSeconds(60) .setClientId(clientId) + .setMaxOutstandingMessages(flowControlSettings.getMaxOutstandingElementCount()) + .setMaxOutstandingBytes(flowControlSettings.getMaxOutstandingRequestBytes()) .build()); /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 0054408ee..bd30bb112 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -332,6 +332,7 @@ private void startStreamingConnections() { ackLatencyDistribution, subStub, i, + flowControlSettings, flowController, executor, alarmsExecutor, From fb08aa3732f12b7ba69b399a3ac450ac78ba27ae Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 20 Jun 2020 21:00:05 -0400 Subject: [PATCH 05/18] Revert "chore: Remove note that ordering keys requires enablements." This reverts commit 9c113c3e32c28cf0d1de8aad3409b5c509fb1ada. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 2c3fa86a1..282df369f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -305,7 +305,7 @@ public void run() { * * @param key The key for which to resume publishing. */ - @BetaApi + @BetaApi("Ordering is not yet fully supported and requires special project enablements.") public void resumePublish(String key) { Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); sequentialExecutor.resumePublish(key); @@ -765,7 +765,7 @@ public Builder setRetrySettings(RetrySettings retrySettings) { } /** Sets the message ordering option. */ - @BetaApi + @BetaApi("Ordering is not yet fully supported and requires special project enablements.") public Builder setEnableMessageOrdering(boolean enableMessageOrdering) { this.enableMessageOrdering = enableMessageOrdering; return this; From c9c2028f5d208ad957898e35d43578cd50439d33 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Wed, 24 Jun 2020 00:41:01 -0400 Subject: [PATCH 06/18] fix: Fix import order --- .../google/cloud/pubsub/v1/StreamingSubscriberConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 0fa671190..885554f74 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -24,8 +24,8 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcStatusCode; From 78ab82d5a457be7e41f00bb63643c3678dc68ec7 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Thu, 23 Jul 2020 17:16:27 -0400 Subject: [PATCH 07/18] fix: Make error message more clear about where ordering must be enabled when publishing. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 282df369f..b781a2e3d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -233,7 +233,9 @@ public ApiFuture publish(PubsubMessage message) { final String orderingKey = message.getOrderingKey(); Preconditions.checkState( orderingKey.isEmpty() || enableMessageOrdering, - "Cannot publish a message with an ordering key when message ordering is not enabled."); + "Cannot publish a message with an ordering key when message ordering is not enabled in the " + + "Publisher client. Please create a Publisher client with " + + "setEnableMessageOrdering(true) in the builder."); final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); From 8b734677bd48d1db50b849c3f649492467255940 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sun, 27 Sep 2020 12:38:35 -0400 Subject: [PATCH 08/18] fix: Ensure that messages that are in pending batches for an ordering key are canceled when a previous publish for the ordering keys fails. --- .../com/google/cloud/pubsub/v1/Publisher.java | 20 ++++++++ .../pubsub/v1/SequentialExecutorService.java | 4 ++ .../cloud/pubsub/v1/PublisherImplTest.java | 51 +++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 6a9f68659..5779b1fe7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -256,6 +256,11 @@ public ApiFuture publish(PubsubMessage message) { List batchesToSend; messagesBatchLock.lock(); try { + if (sequentialExecutor.keyHasError(orderingKey)) { + outstandingPublish.publishResult.setException( + SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); + return outstandingPublish.publishResult; + } MessagesBatch messagesBatch = messagesBatches.get(orderingKey); if (messagesBatch == null) { messagesBatch = new MessagesBatch(batchingSettings, orderingKey); @@ -462,6 +467,21 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { try { + if (outstandingBatch.orderingKey != null && !outstandingBatch.orderingKey.isEmpty()) { + messagesBatchLock.lock(); + try { + MessagesBatch messagesBatch = messagesBatches.get(outstandingBatch.orderingKey); + if (messagesBatch != null) { + for (OutstandingPublish outstanding : messagesBatch.messages) { + outstanding.publishResult.setException( + SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); + } + messagesBatches.remove(outstandingBatch.orderingKey); + } + } finally { + messagesBatchLock.unlock(); + } + } outstandingBatch.onFailure(t); } finally { messagesWaiter.incrementPendingCount(-outstandingBatch.size()); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 292921850..4866e6be4 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -243,6 +243,10 @@ public void cancel(Throwable e) { return future; } + boolean keyHasError(String key) { + return keysWithErrors.contains(key); + } + void resumePublish(String key) { keysWithErrors.remove(key); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index d7687ae07..e5a785aed 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -566,6 +566,57 @@ public void testResumePublish() throws Exception { shutdownTestPublisher(publisher); } + @Test + public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(500)) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Send two messages that will fulfill the first batch, which will return a failure. + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); + + // A third message will fail because the first attempt to publish failed. + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); + + try { + publishFuture1.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture2.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture3.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // A subsequent attempt fails immediately. + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); + try { + publishFuture4.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + } + private ApiFuture sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) { return publisher.publish( From 8be37d79643ba2b420fd0d88426eb421fb33fc48 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 29 Sep 2020 15:24:27 -0400 Subject: [PATCH 09/18] fix: Only check keyHasError if ordering keys is non-empty --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 5779b1fe7..07a550496 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -256,7 +256,7 @@ public ApiFuture publish(PubsubMessage message) { List batchesToSend; messagesBatchLock.lock(); try { - if (sequentialExecutor.keyHasError(orderingKey)) { + if (!orderingKey.isEmpty() && sequentialExecutor.keyHasError(orderingKey)) { outstandingPublish.publishResult.setException( SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); return outstandingPublish.publishResult; From 83342f5e3677e71375cad663600af150ebd77e82 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 31 Aug 2021 07:25:41 -0400 Subject: [PATCH 10/18] fix: Set publish timeouts to be consistent with desired values --- .../java/com/google/cloud/pubsub/v1/Publisher.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index d29a619fd..af7472e96 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -657,8 +657,11 @@ public static final class Builder { static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 1000L; // 1 kB static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(1); private static final Duration DEFAULT_INITIAL_RPC_TIMEOUT = Duration.ofSeconds(5); - private static final Duration DEFAULT_MAX_RPC_TIMEOUT = Duration.ofSeconds(600); + private static final Duration DEFAULT_MAX_RPC_TIMEOUT = Duration.ofSeconds(60); private static final Duration DEFAULT_TOTAL_TIMEOUT = Duration.ofSeconds(600); + private static final Duration DEFAULT_INITIAL_RETRY_DELAY = Duration.ofMillis(100); + private static final Duration DEFAULT_MAX_RETRY_DELAY = Duration.ofSeconds(60); + private static final double DEFAULT_MULTIPLIER = 1.3; static final BatchingSettings DEFAULT_BATCHING_SETTINGS = BatchingSettings.newBuilder() .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) @@ -672,11 +675,11 @@ public static final class Builder { static final RetrySettings DEFAULT_RETRY_SETTINGS = RetrySettings.newBuilder() .setTotalTimeout(DEFAULT_TOTAL_TIMEOUT) - .setInitialRetryDelay(Duration.ofMillis(100)) - .setRetryDelayMultiplier(1.3) - .setMaxRetryDelay(Duration.ofSeconds(60)) + .setInitialRetryDelay(DEFAULT_INITIAL_RETRY_DELAY) + .setRetryDelayMultiplier(DEFAULT_MULTIPLIER) + .setMaxRetryDelay(DEFAULT_MAX_RETRY_DELAY) .setInitialRpcTimeout(DEFAULT_INITIAL_RPC_TIMEOUT) - .setRpcTimeoutMultiplier(1) + .setRpcTimeoutMultiplier(DEFAULT_MULTIPLIER) .setMaxRpcTimeout(DEFAULT_MAX_RPC_TIMEOUT) .build(); static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; From b40094d15adde5b32abee3770bf78c982b2dd9e9 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 17 Jun 2022 13:25:46 -0400 Subject: [PATCH 11/18] samples: create BigQuery subscription --- samples/install-without-bom/pom.xml | 5 ++ samples/snapshot/pom.xml | 5 ++ samples/snippets/pom.xml | 8 ++- .../CreateBigQuerySubscriptionExample.java | 62 +++++++++++++++++++ .../src/test/java/pubsub/AdminIT.java | 55 ++++++++++++++++ 5 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index 6a4fe37f7..8a328c049 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -83,6 +83,11 @@ 2.7.1 tests + + com.google.cloud + google-cloud-bigquery + 2.13.3 + diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index ac9337294..edb8eb276 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -82,6 +82,11 @@ 2.7.1 tests + + com.google.cloud + google-cloud-bigquery + 2.13.3 + diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 5f968168a..3d6ad4427 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -45,7 +45,7 @@ com.google.cloud libraries-bom - 25.3.0 + 25.4.0 pom import @@ -57,7 +57,11 @@ com.google.cloud google-cloud-pubsub - + + com.google.cloud + google-cloud-bigquery + + org.apache.avro diff --git a/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java new file mode 100644 index 000000000..56dff055c --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java @@ -0,0 +1,62 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_create_bigquery_subscription] +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.pubsub.v1.BigQueryConfig; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.Subscription; +import java.io.IOException; + +public class CreateBigQuerySubscriptionExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + String subscriptionId = "your-subscription-id"; + String bigqueryTableId = "your-bigquery-table-id"; + + createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId); + } + + public static void createBigQuerySubscription( + String projectId, String topicId, String subscriptionId, String bigqueryTableId) + throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + + ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + BigQueryConfig bigqueryConfig = + BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build(); + + Subscription subscription = + subscriptionAdminClient.createSubscription( + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setBigqueryConfig(bigqueryConfig) + .build()); + + System.out.println("Created a BigQuery subscription: " + subscription.getAllFields()); + } + } +} +// [END pubsub_create_bigquery_subscription] diff --git a/samples/snippets/src/test/java/pubsub/AdminIT.java b/samples/snippets/src/test/java/pubsub/AdminIT.java index b84751e0a..de53ae013 100644 --- a/samples/snippets/src/test/java/pubsub/AdminIT.java +++ b/samples/snippets/src/test/java/pubsub/AdminIT.java @@ -20,6 +20,17 @@ import static junit.framework.TestCase.assertNotNull; import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.pubsub.v1.SubscriptionName; @@ -48,6 +59,10 @@ public class AdminIT { private static final String exactlyOnceSubscriptionId = "iam-exactly-once-subscription-" + _suffix; private static final String pushEndpoint = "https://my-test-project.appspot.com/push"; + private static final String bigqueryDatasetId = + "java_samples_data_set" + _suffix.replace("-", "_"); + private static final String bigquerySubscriptionId = "iam-bigquery-subscription-" + _suffix; + private static final String bigqueryTableId = "java_samples_table_" + _suffix; private static final TopicName topicName = TopicName.of(projectId, topicId); private static final SubscriptionName pullSubscriptionName = @@ -105,6 +120,32 @@ public void tearDown() throws Exception { System.setOut(null); } + private void createBigQueryTable() throws Exception { + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + DatasetInfo datasetInfo = DatasetInfo.newBuilder(projectId, bigqueryDatasetId).build(); + bigquery.create(datasetInfo); + + Schema schema = + Schema.of( + Field.of("data", StandardSQLTypeName.STRING), + Field.of("message_id", StandardSQLTypeName.STRING), + Field.of("attributes", StandardSQLTypeName.STRING), + Field.of("subscription_name", StandardSQLTypeName.STRING), + Field.of("publish_time", StandardSQLTypeName.TIMESTAMP)); + + TableId tableId = TableId.of(projectId, bigqueryDatasetId, bigqueryTableId); + TableDefinition tableDefinition = StandardTableDefinition.of(schema); + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + + bigquery.create(tableInfo); + } + + private void deleteBigQueryTable() throws Exception { + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + DatasetId datasetId = DatasetId.of(projectId, bigqueryDatasetId); + bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents()); + } + @Test public void testAdmin() throws Exception { // Test create topic. @@ -208,17 +249,31 @@ public void testAdmin() throws Exception { .contains("Created a subscription with exactly once delivery enabled:"); assertThat(bout.toString()).contains("enable_exactly_once_delivery=true"); + bout.reset(); + // Test create a BigQuery subscription + createBigQueryTable(); + String bigqueryTablePath = String.join(".", projectId, bigqueryDatasetId, bigqueryTableId); + System.err.println("SAMPLE: " + bigqueryTablePath); + CreateBigQuerySubscriptionExample.createBigQuerySubscription( + projectId, topicId, bigquerySubscriptionId, bigqueryTablePath); + assertThat(bout.toString()).contains("Created a BigQuery subscription:"); + assertThat(bout.toString()).contains(bigqueryTablePath); + bout.reset(); // Test delete subscription. DeleteSubscriptionExample.deleteSubscriptionExample(projectId, pullSubscriptionId); DeleteSubscriptionExample.deleteSubscriptionExample(projectId, pushSubscriptionId); DeleteSubscriptionExample.deleteSubscriptionExample(projectId, orderedSubscriptionId); DeleteSubscriptionExample.deleteSubscriptionExample(projectId, exactlyOnceSubscriptionId); + DeleteSubscriptionExample.deleteSubscriptionExample(projectId, bigquerySubscriptionId); assertThat(bout.toString()).contains("Deleted subscription."); bout.reset(); // Test delete topic. DeleteTopicExample.deleteTopicExample(projectId, topicId); assertThat(bout.toString()).contains("Deleted topic."); + + // Delete BigQuery table. + deleteBigQueryTable(); } } From 3ee105d4b88d114efd8a18c5b4f8b0326c747d7a Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 17 Jun 2022 13:26:53 -0400 Subject: [PATCH 12/18] samples: create BigQuery subscription --- .../src/main/java/pubsub/CreateBigQuerySubscriptionExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java index 56dff055c..55618615a 100644 --- a/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Google LLC + * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 37e842f10975e5108782781a3028fabd5a0d960c Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 17 Jun 2022 13:27:41 -0400 Subject: [PATCH 13/18] samples: create BigQuery subscription --- samples/snippets/src/test/java/pubsub/AdminIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/snippets/src/test/java/pubsub/AdminIT.java b/samples/snippets/src/test/java/pubsub/AdminIT.java index de53ae013..756c11ddd 100644 --- a/samples/snippets/src/test/java/pubsub/AdminIT.java +++ b/samples/snippets/src/test/java/pubsub/AdminIT.java @@ -253,7 +253,6 @@ public void testAdmin() throws Exception { // Test create a BigQuery subscription createBigQueryTable(); String bigqueryTablePath = String.join(".", projectId, bigqueryDatasetId, bigqueryTableId); - System.err.println("SAMPLE: " + bigqueryTablePath); CreateBigQuerySubscriptionExample.createBigQuerySubscription( projectId, topicId, bigquerySubscriptionId, bigqueryTablePath); assertThat(bout.toString()).contains("Created a BigQuery subscription:"); From 0d513a196f4e4d36f67e363df9ad1cf5bb5e1e8a Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 17 Jun 2022 13:37:04 -0400 Subject: [PATCH 14/18] Make table ID more clear --- .../src/main/java/pubsub/CreateBigQuerySubscriptionExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java index 55618615a..a84fec123 100644 --- a/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java @@ -30,7 +30,7 @@ public static void main(String... args) throws Exception { String projectId = "your-project-id"; String topicId = "your-topic-id"; String subscriptionId = "your-subscription-id"; - String bigqueryTableId = "your-bigquery-table-id"; + String bigqueryTableId = "your-project.your-dataset.your-table; createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId); } From df8a01da0ac2f5120a745cc5013a9603cf0ccdb1 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 17 Jun 2022 13:42:00 -0400 Subject: [PATCH 15/18] Fix string --- .../src/main/java/pubsub/CreateBigQuerySubscriptionExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java index a84fec123..002b6ef1f 100644 --- a/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java @@ -30,7 +30,7 @@ public static void main(String... args) throws Exception { String projectId = "your-project-id"; String topicId = "your-topic-id"; String subscriptionId = "your-subscription-id"; - String bigqueryTableId = "your-project.your-dataset.your-table; + String bigqueryTableId = "your-project.your-dataset.your-table"; createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId); } From 5b72e583b0faa2cb0a2d23eb110057288d663449 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 21 Jun 2022 21:51:46 +0000 Subject: [PATCH 16/18] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 909bd6e80..34cf23820 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file com.google.cloud libraries-bom - 25.3.0 + 25.4.0 pom import @@ -31,6 +31,10 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file com.google.cloud google-cloud-pubsub + + com.google.cloud + google-cloud-bigquery + @@ -243,6 +247,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m | Native Image Pub Sub Sample | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | | Publish Operations | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | | Create Avro Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | +| Create Big Query Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java) | | Create Proto Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | | Create Pull Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) | | Create Push Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreatePushSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreatePushSubscriptionExample.java) | From ff338fffdc1c99077bdef186e7dcc048e73f1d8e Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 21 Jun 2022 19:44:15 -0400 Subject: [PATCH 17/18] Fix pom and test creation/teardown --- samples/snippets/pom.xml | 4 ++-- samples/snippets/src/test/java/pubsub/AdminIT.java | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 3d6ad4427..be4bb4f5c 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -57,12 +57,12 @@ com.google.cloud google-cloud-pubsub + + com.google.cloud google-cloud-bigquery - - org.apache.avro avro diff --git a/samples/snippets/src/test/java/pubsub/AdminIT.java b/samples/snippets/src/test/java/pubsub/AdminIT.java index 756c11ddd..e42c81290 100644 --- a/samples/snippets/src/test/java/pubsub/AdminIT.java +++ b/samples/snippets/src/test/java/pubsub/AdminIT.java @@ -94,6 +94,9 @@ public void setUp() throws Exception { bout = new ByteArrayOutputStream(); out = new PrintStream(bout); System.setOut(out); + + // Create table for BigQuery subscription. + createBigQueryTable(); } @After @@ -117,6 +120,10 @@ public void tearDown() throws Exception { } catch (NotFoundException ignored) { // ignore this as resources may not have been created } + + // Delete BigQuery table. + deleteBigQueryTable(); + System.setOut(null); } @@ -251,7 +258,6 @@ public void testAdmin() throws Exception { bout.reset(); // Test create a BigQuery subscription - createBigQueryTable(); String bigqueryTablePath = String.join(".", projectId, bigqueryDatasetId, bigqueryTableId); CreateBigQuerySubscriptionExample.createBigQuerySubscription( projectId, topicId, bigquerySubscriptionId, bigqueryTablePath); @@ -271,8 +277,5 @@ public void testAdmin() throws Exception { // Test delete topic. DeleteTopicExample.deleteTopicExample(projectId, topicId); assertThat(bout.toString()).contains("Deleted topic."); - - // Delete BigQuery table. - deleteBigQueryTable(); } } From 0e509abdce01638b44a8b3011bcb24f40b39b8f6 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 22 Jun 2022 00:26:39 +0000 Subject: [PATCH 18/18] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.md b/README.md index 34cf23820..8cddd9ef4 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,6 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file com.google.cloud google-cloud-pubsub - - com.google.cloud - google-cloud-bigquery -