From c2e81425dee9e1fe9e37a5809634dfa1b8888c91 Mon Sep 17 00:00:00 2001 From: dpcollins-google Date: Thu, 11 Jun 2020 15:41:23 -0400 Subject: [PATCH 1/4] [feat] Add the ability to use automatic subscriber assignment to the subscriber settings. Also disable clirr as it fires on this PR. We are not providing interface stability guarantees. --- google-cloud-pubsublite/pom.xml | 15 +++ .../cloudpubsub/SubscriberSettings.java | 56 ++++++++--- .../internal/wire/AssignerBuilder.java | 98 +++++++++++++++++++ samples/snippets/pom.xml | 2 +- .../java/pubsublite/CreateTopicExample.java | 6 +- .../java/pubsublite/SubscriberExample.java | 23 ++--- .../test/java/pubsublite/QuickStartIT.java | 13 +-- 7 files changed, 174 insertions(+), 39 deletions(-) create mode 100755 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java diff --git a/google-cloud-pubsublite/pom.xml b/google-cloud-pubsublite/pom.xml index 79f839a8a..1d0985d8b 100644 --- a/google-cloud-pubsublite/pom.xml +++ b/google-cloud-pubsublite/pom.xml @@ -196,6 +196,21 @@ org.codehaus.mojo flatten-maven-plugin + + + org.codehaus.mojo + clirr-maven-plugin + + false + + + + + check + + + + diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java index a446059a7..f5a4df8f0 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java @@ -24,16 +24,20 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.SubscriptionPaths; import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTrackerImpl; +import com.google.cloud.pubsublite.cloudpubsub.internal.AssigningSubscriber; import com.google.cloud.pubsublite.cloudpubsub.internal.MultiPartitionSubscriber; +import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory; import com.google.cloud.pubsublite.cloudpubsub.internal.SinglePartitionSubscriber; import com.google.cloud.pubsublite.internal.Preconditions; +import com.google.cloud.pubsublite.internal.wire.AssignerBuilder; +import com.google.cloud.pubsublite.internal.wire.AssignerFactory; import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; import com.google.cloud.pubsublite.proto.CursorServiceGrpc; +import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub; import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc; -import com.google.common.collect.ImmutableList; import com.google.pubsub.v1.PubsubMessage; import io.grpc.StatusException; import java.util.ArrayList; @@ -51,17 +55,21 @@ public abstract class SubscriberSettings { abstract SubscriptionPath subscriptionPath(); - abstract ImmutableList partitions(); - abstract FlowControlSettings perPartitionFlowControlSettings(); // Optional parameters. + + // If set, disables auto-assignment. + abstract Optional> partitions(); + abstract Optional> transformer(); abstract Optional subscriberServiceStub(); abstract Optional cursorServiceStub(); + abstract Optional assignmentServiceStub(); + abstract Optional nackHandler(); public static Builder newBuilder() { @@ -76,11 +84,12 @@ public abstract static class Builder { public abstract Builder setSubscriptionPath(SubscriptionPath path); - public abstract Builder setPartitions(List partition); - public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings); // Optional parameters. + /** If set, disables auto-assignment. */ + public abstract Builder setPartitions(List partition); + public abstract Builder setTransformer( MessageTransformer transformer); @@ -89,6 +98,8 @@ public abstract Builder setSubscriberServiceStub( public abstract Builder setCursorServiceStub(CursorServiceGrpc.CursorServiceStub stub); + public abstract Builder setAssignmentServiceStub(PartitionAssignmentServiceStub stub); + public abstract Builder setNackHandler(NackHandler nackHandler); abstract SubscriberSettings autoBuild(); @@ -96,7 +107,8 @@ public abstract Builder setSubscriberServiceStub( public SubscriberSettings build() throws StatusException { SubscriberSettings settings = autoBuild(); Preconditions.checkArgument( - !settings.partitions().isEmpty(), "Must provide at least one partition."); + !settings.partitions().isPresent() || !settings.partitions().get().isEmpty(), + "Must provide at least one partition if setting partitions explicitly."); SubscriptionPaths.check(settings.subscriptionPath()); return settings; } @@ -113,18 +125,36 @@ Subscriber instantiate() throws StatusException { wireCommitterBuilder.setSubscriptionPath(subscriptionPath()); cursorServiceStub().ifPresent(wireCommitterBuilder::setCursorStub); - List perPartitionSubscribers = new ArrayList<>(); - for (Partition partition : partitions()) { - wireSubscriberBuilder.setPartition(partition); - wireCommitterBuilder.setPartition(partition); - perPartitionSubscribers.add( - new SinglePartitionSubscriber( + PartitionSubscriberFactory partitionSubscriberFactory = + partition -> { + wireSubscriberBuilder.setPartition(partition); + wireCommitterBuilder.setPartition(partition); + return new SinglePartitionSubscriber( receiver(), transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()), new AckSetTrackerImpl(wireCommitterBuilder.build()), nackHandler().orElse(new NackHandler() {}), messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(), - perPartitionFlowControlSettings())); + perPartitionFlowControlSettings()); + }; + + if (!partitions().isPresent()) { + AssignerBuilder.Builder assignerBuilder = AssignerBuilder.newBuilder(); + assignerBuilder.setSubscriptionPath(subscriptionPath()); + assignmentServiceStub().ifPresent(assignerBuilder::setAssignmentStub); + AssignerFactory assignerFactory = + receiver -> { + assignerBuilder.setReceiver(receiver); + return assignerBuilder.build(); + }; + return new AssigningSubscriber(partitionSubscriberFactory, assignerFactory); + } + + List perPartitionSubscribers = new ArrayList<>(); + for (Partition partition : partitions().get()) { + wireSubscriberBuilder.setPartition(partition); + wireCommitterBuilder.setPartition(partition); + perPartitionSubscribers.add(partitionSubscriberFactory.New(partition)); } return MultiPartitionSubscriber.of(perPartitionSubscribers); } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java new file mode 100755 index 000000000..3da57453a --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Endpoints; +import com.google.cloud.pubsublite.Stubs; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.SubscriptionPaths; +import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; +import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc; +import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub; +import com.google.protobuf.ByteString; +import io.grpc.Status; +import io.grpc.StatusException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.UUID; + +@AutoValue +public abstract class AssignerBuilder { + // Required parameters. + abstract SubscriptionPath subscriptionPath(); + + abstract PartitionAssignmentReceiver receiver(); + + // Optional parameters. + abstract Optional assignmentStub(); + + public static Builder newBuilder() { + return new AutoValue_AssignerBuilder.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + // Required parameters. + public abstract Builder setSubscriptionPath(SubscriptionPath path); + + public abstract Builder setReceiver(PartitionAssignmentReceiver receiver); + + // Optional parameters. + public abstract Builder setAssignmentStub(PartitionAssignmentServiceStub stub); + + abstract AssignerBuilder autoBuild(); + + @SuppressWarnings("CheckReturnValue") + public Assigner build() throws StatusException { + AssignerBuilder builder = autoBuild(); + SubscriptionPaths.check(builder.subscriptionPath()); + + PartitionAssignmentServiceStub stub; + if (builder.assignmentStub().isPresent()) { + stub = builder.assignmentStub().get(); + } else { + try { + stub = + Stubs.defaultStub( + Endpoints.regionalEndpoint( + SubscriptionPaths.getZone(builder.subscriptionPath()).region()), + PartitionAssignmentServiceGrpc::newStub); + } catch (IOException e) { + throw Status.INTERNAL + .withCause(e) + .withDescription("Creating assigner stub failed.") + .asException(); + } + } + + UUID uuid = UUID.randomUUID(); + ByteBuffer uuidBuffer = ByteBuffer.allocate(16); + uuidBuffer.putLong(uuid.getMostSignificantBits()); + uuidBuffer.putLong(uuid.getLeastSignificantBits()); + + InitialPartitionAssignmentRequest initial = + InitialPartitionAssignmentRequest.newBuilder() + .setSubscription(builder.subscriptionPath().value()) + .setClientId(ByteString.copyFrom(uuidBuffer.array())) + .build(); + return new AssignerImpl( + stub, new ConnectedAssignerImpl.Factory(), initial, builder.receiver()); + } + } +} diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 4a157f386..be98347c2 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -46,7 +46,7 @@ com.google.cloud google-cloud-pubsublite - 0.1.7 + 0.1.9-SNAPSHOT com.google.cloud diff --git a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java index dd1c44684..5ee7bb754 100644 --- a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java +++ b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception { char zoneId = 'b'; String topicId = "your-topic-id"; long projectNumber = Long.parseLong("123456789"); - Integer partitions = 1; + int partitions = 1; createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions); } @@ -67,11 +67,11 @@ public static void createTopicExample( RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); diff --git a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java index a826b8c35..072c2b2e9 100644 --- a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java +++ b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java @@ -46,19 +46,15 @@ public static void main(String... args) throws Exception { // Choose an existing subscription for the subscribe example to work. String subscriptionId = "your-subscription-id"; long projectNumber = Long.parseLong("123456789"); - // List of partitions to subscribe to. It can be all the partitions in a topic or - // a subset of them. A topic of N partitions has partition numbers [0~N-1]. - List partitionNumbers = ImmutableList.of(0); - subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, partitionNumbers); + subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId); } public static void subscriberExample( String cloudRegion, char zoneId, long projectNumber, - String subscriptionId, - List partitionNumbers) + String subscriptionId) throws StatusException { SubscriptionPath subscriptionPath = @@ -78,11 +74,6 @@ public static void subscriberExample( .setMessagesOutstanding(1000L) .build(); - List partitions = new ArrayList<>(); - for (Integer num : partitionNumbers) { - partitions.add(Partition.of(num)); - } - MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -93,7 +84,6 @@ public static void subscriberExample( SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) - .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -107,10 +97,11 @@ public static void subscriberExample( System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters - // unrecoverable errors before then, its state will change to FAILED and an - // IllegalStateException will be thrown. - subscriber.awaitTerminated(30, TimeUnit.SECONDS); + System.out.println(subscriber.state()); + // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters + // unrecoverable errors before then, its state will change to FAILED and + // an IllegalStateException will be thrown. + subscriber.awaitTerminated(90, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java index 32829aec7..ec895dc55 100644 --- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java +++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java @@ -44,9 +44,8 @@ public class QuickStartIT { private static final String SUFFIX = UUID.randomUUID().toString(); private static final String TOPIC_NAME = "lite-topic-" + SUFFIX; private static final String SUBSCRIPTION_NAME = "lite-subscription-" + SUFFIX; - private static final int PARTITIONS = 1; - private static final int MESSAGE_COUNT = 1; - private static final List PARTITION_NOS = ImmutableList.of(0); + private static final int PARTITIONS = 2; + private static final int MESSAGE_COUNT = 10; private static void requireEnvVar(String varName) { assertNotNull( @@ -84,7 +83,7 @@ public void testQuickstart() throws Exception { // Get a topic. GetTopicExample.getTopicExample(CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, TOPIC_NAME); assertThat(bout.toString()).contains(TOPIC_NAME); - assertThat(bout.toString()).contains("1 partition(s)."); + assertThat(bout.toString()).contains(String.format("%s partition(s).", PARTITIONS)); bout.reset(); // List topics. @@ -157,9 +156,11 @@ public void testQuickstart() throws Exception { bout.reset(); // Subscribe. SubscriberExample.subscriberExample( - CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME, PARTITION_NOS); + CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME); assertThat(bout.toString()).contains("Listening"); - assertThat(bout.toString()).contains("Data : message-0"); + for (int i = 0; i < MESSAGE_COUNT; ++i) { + assertThat(bout.toString()).contains(String.format("Data : message-%s", i)); + } assertThat(bout.toString()).contains("Subscriber is shut down: TERMINATED"); bout.reset(); From 0c43e230cba1306b3f3e30df80f0ebb36c48c19c Mon Sep 17 00:00:00 2001 From: dpcollins-google Date: Wed, 15 Jul 2020 11:05:18 -0400 Subject: [PATCH 2/4] No-op commit to update github. --- .readme-partials.yaml | 15 ++++----------- samples/snippets/pom.xml | 2 +- .../src/test/java/pubsublite/QuickStartIT.java | 2 -- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/.readme-partials.yaml b/.readme-partials.yaml index aec52a7b2..97e45aaec 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -40,11 +40,11 @@ custom_content: | RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); @@ -210,7 +210,6 @@ custom_content: | String topicId = "your-topic-id"; // Choose an existing subscription. String subscriptionId = "your-subscription-id"; - List partitionNumbers = ImmutableList.of(0); SubscriptionPath subscriptionPath = SubscriptionPaths.newBuilder() @@ -229,11 +228,6 @@ custom_content: | .setMessagesOutstanding(1000L) .build(); - List partitions = new ArrayList<>(); - for (Integer num : partitionNumbers) { - partitions.add(Partition.of(num)); - } - MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -244,7 +238,6 @@ custom_content: | SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) - .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -258,10 +251,10 @@ custom_content: | System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters + // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters // unrecoverable errors before then, its state will change to FAILED and an // IllegalStateException will be thrown. - subscriber.awaitTerminated(30, TimeUnit.SECONDS); + subscriber.awaitTerminated(90, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index be98347c2..4a157f386 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -46,7 +46,7 @@ com.google.cloud google-cloud-pubsublite - 0.1.9-SNAPSHOT + 0.1.7 com.google.cloud diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java index ec895dc55..768ccd83f 100644 --- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java +++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java @@ -19,10 +19,8 @@ import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.assertNotNull; -import com.google.common.collect.ImmutableList; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.List; import java.util.UUID; import org.junit.After; import org.junit.Before; From efed3dd715bdb3284903204f57ef3329ed60dea7 Mon Sep 17 00:00:00 2001 From: dpcollins-google Date: Wed, 15 Jul 2020 17:56:42 -0400 Subject: [PATCH 3/4] Undo sample changes. --- .readme-partials.yaml | 15 ++++++++---- samples/snippets/pom.xml | 2 +- .../java/pubsublite/CreateTopicExample.java | 6 ++--- .../java/pubsublite/SubscriberExample.java | 23 +++++++++++++------ .../test/java/pubsublite/QuickStartIT.java | 15 ++++++------ 5 files changed, 39 insertions(+), 22 deletions(-) diff --git a/.readme-partials.yaml b/.readme-partials.yaml index 97e45aaec..aec52a7b2 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -40,11 +40,11 @@ custom_content: | RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); @@ -210,6 +210,7 @@ custom_content: | String topicId = "your-topic-id"; // Choose an existing subscription. String subscriptionId = "your-subscription-id"; + List partitionNumbers = ImmutableList.of(0); SubscriptionPath subscriptionPath = SubscriptionPaths.newBuilder() @@ -228,6 +229,11 @@ custom_content: | .setMessagesOutstanding(1000L) .build(); + List partitions = new ArrayList<>(); + for (Integer num : partitionNumbers) { + partitions.add(Partition.of(num)); + } + MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -238,6 +244,7 @@ custom_content: | SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) + .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -251,10 +258,10 @@ custom_content: | System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters + // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters // unrecoverable errors before then, its state will change to FAILED and an // IllegalStateException will be thrown. - subscriber.awaitTerminated(90, TimeUnit.SECONDS); + subscriber.awaitTerminated(30, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 4a157f386..be98347c2 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -46,7 +46,7 @@ com.google.cloud google-cloud-pubsublite - 0.1.7 + 0.1.9-SNAPSHOT com.google.cloud diff --git a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java index 5ee7bb754..dd1c44684 100644 --- a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java +++ b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception { char zoneId = 'b'; String topicId = "your-topic-id"; long projectNumber = Long.parseLong("123456789"); - int partitions = 1; + Integer partitions = 1; createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions); } @@ -67,11 +67,11 @@ public static void createTopicExample( RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); diff --git a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java index 072c2b2e9..a826b8c35 100644 --- a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java +++ b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java @@ -46,15 +46,19 @@ public static void main(String... args) throws Exception { // Choose an existing subscription for the subscribe example to work. String subscriptionId = "your-subscription-id"; long projectNumber = Long.parseLong("123456789"); + // List of partitions to subscribe to. It can be all the partitions in a topic or + // a subset of them. A topic of N partitions has partition numbers [0~N-1]. + List partitionNumbers = ImmutableList.of(0); - subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId); + subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, partitionNumbers); } public static void subscriberExample( String cloudRegion, char zoneId, long projectNumber, - String subscriptionId) + String subscriptionId, + List partitionNumbers) throws StatusException { SubscriptionPath subscriptionPath = @@ -74,6 +78,11 @@ public static void subscriberExample( .setMessagesOutstanding(1000L) .build(); + List partitions = new ArrayList<>(); + for (Integer num : partitionNumbers) { + partitions.add(Partition.of(num)); + } + MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -84,6 +93,7 @@ public static void subscriberExample( SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) + .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -97,11 +107,10 @@ public static void subscriberExample( System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - System.out.println(subscriber.state()); - // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters - // unrecoverable errors before then, its state will change to FAILED and - // an IllegalStateException will be thrown. - subscriber.awaitTerminated(90, TimeUnit.SECONDS); + // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters + // unrecoverable errors before then, its state will change to FAILED and an + // IllegalStateException will be thrown. + subscriber.awaitTerminated(30, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java index 768ccd83f..32829aec7 100644 --- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java +++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java @@ -19,8 +19,10 @@ import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.assertNotNull; +import com.google.common.collect.ImmutableList; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.List; import java.util.UUID; import org.junit.After; import org.junit.Before; @@ -42,8 +44,9 @@ public class QuickStartIT { private static final String SUFFIX = UUID.randomUUID().toString(); private static final String TOPIC_NAME = "lite-topic-" + SUFFIX; private static final String SUBSCRIPTION_NAME = "lite-subscription-" + SUFFIX; - private static final int PARTITIONS = 2; - private static final int MESSAGE_COUNT = 10; + private static final int PARTITIONS = 1; + private static final int MESSAGE_COUNT = 1; + private static final List PARTITION_NOS = ImmutableList.of(0); private static void requireEnvVar(String varName) { assertNotNull( @@ -81,7 +84,7 @@ public void testQuickstart() throws Exception { // Get a topic. GetTopicExample.getTopicExample(CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, TOPIC_NAME); assertThat(bout.toString()).contains(TOPIC_NAME); - assertThat(bout.toString()).contains(String.format("%s partition(s).", PARTITIONS)); + assertThat(bout.toString()).contains("1 partition(s)."); bout.reset(); // List topics. @@ -154,11 +157,9 @@ public void testQuickstart() throws Exception { bout.reset(); // Subscribe. SubscriberExample.subscriberExample( - CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME); + CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME, PARTITION_NOS); assertThat(bout.toString()).contains("Listening"); - for (int i = 0; i < MESSAGE_COUNT; ++i) { - assertThat(bout.toString()).contains(String.format("Data : message-%s", i)); - } + assertThat(bout.toString()).contains("Data : message-0"); assertThat(bout.toString()).contains("Subscriber is shut down: TERMINATED"); bout.reset(); From 214af544883f4d833e2f369c2f12462437297465 Mon Sep 17 00:00:00 2001 From: dpcollins-google Date: Wed, 15 Jul 2020 18:03:02 -0400 Subject: [PATCH 4/4] Revert lastt samples change. --- .../cloud/pubsublite/internal/wire/AssignerBuilder.java | 5 +++++ samples/snippets/pom.xml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java index 3da57453a..4c7da665d 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java @@ -24,6 +24,7 @@ import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc; import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub; +import com.google.common.flogger.GoogleLogger; import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusException; @@ -34,6 +35,7 @@ @AutoValue public abstract class AssignerBuilder { + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); // Required parameters. abstract SubscriptionPath subscriptionPath(); @@ -85,6 +87,9 @@ public Assigner build() throws StatusException { ByteBuffer uuidBuffer = ByteBuffer.allocate(16); uuidBuffer.putLong(uuid.getMostSignificantBits()); uuidBuffer.putLong(uuid.getLeastSignificantBits()); + logger.atInfo().log( + "Subscription %s using UUID %s for assignment.", + builder.subscriptionPath().value(), uuid); InitialPartitionAssignmentRequest initial = InitialPartitionAssignmentRequest.newBuilder() diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index be98347c2..4a157f386 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -46,7 +46,7 @@ com.google.cloud google-cloud-pubsublite - 0.1.9-SNAPSHOT + 0.1.7 com.google.cloud