diff --git a/.readme-partials.yaml b/.readme-partials.yaml index 531742fea..aec52a7b2 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -40,11 +40,11 @@ custom_content: | RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); @@ -210,6 +210,7 @@ custom_content: | String topicId = "your-topic-id"; // Choose an existing subscription. String subscriptionId = "your-subscription-id"; + List partitionNumbers = ImmutableList.of(0); SubscriptionPath subscriptionPath = SubscriptionPaths.newBuilder() @@ -228,6 +229,11 @@ custom_content: | .setMessagesOutstanding(1000L) .build(); + List partitions = new ArrayList<>(); + for (Integer num : partitionNumbers) { + partitions.add(Partition.of(num)); + } + MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -238,6 +244,7 @@ custom_content: | SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) + .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -251,11 +258,10 @@ custom_content: | System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - System.out.println(subscriber.state()); - // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters + // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters // unrecoverable errors before then, its state will change to FAILED and an // IllegalStateException will be thrown. - subscriber.awaitTerminated(90, TimeUnit.SECONDS); + subscriber.awaitTerminated(30, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java index 5ee7bb754..dd1c44684 100644 --- a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java +++ b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception { char zoneId = 'b'; String topicId = "your-topic-id"; long projectNumber = Long.parseLong("123456789"); - int partitions = 1; + Integer partitions = 1; createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions); } @@ -67,11 +67,11 @@ public static void createTopicExample( RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); diff --git a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java index 49e313749..a826b8c35 100644 --- a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java +++ b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java @@ -46,15 +46,19 @@ public static void main(String... args) throws Exception { // Choose an existing subscription for the subscribe example to work. String subscriptionId = "your-subscription-id"; long projectNumber = Long.parseLong("123456789"); + // List of partitions to subscribe to. It can be all the partitions in a topic or + // a subset of them. A topic of N partitions has partition numbers [0~N-1]. + List partitionNumbers = ImmutableList.of(0); - subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId); + subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, partitionNumbers); } public static void subscriberExample( String cloudRegion, char zoneId, long projectNumber, - String subscriptionId) + String subscriptionId, + List partitionNumbers) throws StatusException { SubscriptionPath subscriptionPath = @@ -74,6 +78,11 @@ public static void subscriberExample( .setMessagesOutstanding(1000L) .build(); + List partitions = new ArrayList<>(); + for (Integer num : partitionNumbers) { + partitions.add(Partition.of(num)); + } + MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -84,6 +93,7 @@ public static void subscriberExample( SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) + .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -97,11 +107,10 @@ public static void subscriberExample( System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - System.out.println(subscriber.state()); - // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters + // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters // unrecoverable errors before then, its state will change to FAILED and an // IllegalStateException will be thrown. - subscriber.awaitTerminated(90, TimeUnit.SECONDS); + subscriber.awaitTerminated(30, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java index 50d5b36e4..32829aec7 100644 --- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java +++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java @@ -22,9 +22,7 @@ import com.google.common.collect.ImmutableList; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.Arrays; import java.util.List; -import java.util.Random; import java.util.UUID; import org.junit.After; import org.junit.Before; @@ -37,21 +35,18 @@ public class QuickStartIT { private ByteArrayOutputStream bout; private PrintStream out; - private Random rand = new Random(); - private List cloudRegions = - Arrays.asList( - "us-central1", "europe-north1", "asia-east1", "australia-southeast1", "asia-northeast2"); private static final String GOOGLE_CLOUD_PROJECT_NUMBER = System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER"); - private String CLOUD_REGION = cloudRegions.get(rand.nextInt(cloudRegions.size())); + private static final String CLOUD_REGION = "us-central1"; private static final char ZONE_ID = 'b'; private static final Long PROJECT_NUMBER = Long.parseLong(GOOGLE_CLOUD_PROJECT_NUMBER); private static final String SUFFIX = UUID.randomUUID().toString(); private static final String TOPIC_NAME = "lite-topic-" + SUFFIX; private static final String SUBSCRIPTION_NAME = "lite-subscription-" + SUFFIX; - private static final int PARTITIONS = 2; - private static final int MESSAGE_COUNT = 10; + private static final int PARTITIONS = 1; + private static final int MESSAGE_COUNT = 1; + private static final List PARTITION_NOS = ImmutableList.of(0); private static void requireEnvVar(String varName) { assertNotNull( @@ -89,7 +84,7 @@ public void testQuickstart() throws Exception { // Get a topic. GetTopicExample.getTopicExample(CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, TOPIC_NAME); assertThat(bout.toString()).contains(TOPIC_NAME); - assertThat(bout.toString()).contains(String.format("%s partition(s).", PARTITIONS)); + assertThat(bout.toString()).contains("1 partition(s)."); bout.reset(); // List topics. @@ -162,11 +157,9 @@ public void testQuickstart() throws Exception { bout.reset(); // Subscribe. SubscriberExample.subscriberExample( - CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME); + CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME, PARTITION_NOS); assertThat(bout.toString()).contains("Listening"); - for (int i = 0; i < MESSAGE_COUNT; ++i) { - assertThat(bout.toString()).contains(String.format("Data : message-%s", i)); - } + assertThat(bout.toString()).contains("Data : message-0"); assertThat(bout.toString()).contains("Subscriber is shut down: TERMINATED"); bout.reset();