Skip to content

Commit

Permalink
Revert "samples: update samples for automatic subscriber assignment (#…
Browse files Browse the repository at this point in the history
…189)" (#198)

This reverts commit 945f775.
  • Loading branch information
anguillanneuf committed Aug 11, 2020
1 parent 945f775 commit eefe717
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 27 deletions.
16 changes: 11 additions & 5 deletions .readme-partials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ custom_content: |
RetentionConfig.newBuilder()
// How long messages are retained.
.setPeriod(Durations.fromDays(1))
// Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB.
// Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB.
// If the number of bytes stored in any of the topic's partitions grows
// beyond this value, older messages will be dropped to make room for
// newer ones, regardless of the value of `period`.
.setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
.setPerPartitionBytes(100 * 1024 * 1024 * 1024L))
.setName(topicPath.value())
.build();
Expand Down Expand Up @@ -210,6 +210,7 @@ custom_content: |
String topicId = "your-topic-id";
// Choose an existing subscription.
String subscriptionId = "your-subscription-id";
List<Integer> partitionNumbers = ImmutableList.of(0);
SubscriptionPath subscriptionPath =
SubscriptionPaths.newBuilder()
Expand All @@ -228,6 +229,11 @@ custom_content: |
.setMessagesOutstanding(1000L)
.build();
List<Partition> partitions = new ArrayList<>();
for (Integer num : partitionNumbers) {
partitions.add(Partition.of(num));
}
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Id : " + message.getMessageId());
Expand All @@ -238,6 +244,7 @@ custom_content: |
SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartitions(partitions)
.setReceiver(receiver)
// Flow control settings are set at the partition level.
.setPerPartitionFlowControlSettings(flowControlSettings)
Expand All @@ -251,11 +258,10 @@ custom_content: |
System.out.println("Listening to messages on " + subscriptionPath.value() + "...");
try {
System.out.println(subscriber.state());
// Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
// Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and an
// IllegalStateException will be thrown.
subscriber.awaitTerminated(90, TimeUnit.SECONDS);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
subscriber.stopAsync().awaitTerminated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception {
char zoneId = 'b';
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int partitions = 1;
Integer partitions = 1;

createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions);
}
Expand Down Expand Up @@ -67,11 +67,11 @@ public static void createTopicExample(
RetentionConfig.newBuilder()
// How long messages are retained.
.setPeriod(Durations.fromDays(1))
// Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB.
// Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB.
// If the number of bytes stored in any of the topic's partitions grows
// beyond this value, older messages will be dropped to make room for
// newer ones, regardless of the value of `period`.
.setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
.setPerPartitionBytes(100 * 1024 * 1024 * 1024L))
.setName(topicPath.value())
.build();

Expand Down
19 changes: 14 additions & 5 deletions samples/snippets/src/main/java/pubsublite/SubscriberExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@ public static void main(String... args) throws Exception {
// Choose an existing subscription for the subscribe example to work.
String subscriptionId = "your-subscription-id";
long projectNumber = Long.parseLong("123456789");
// List of partitions to subscribe to. It can be all the partitions in a topic or
// a subset of them. A topic of N partitions has partition numbers [0~N-1].
List<Integer> partitionNumbers = ImmutableList.of(0);

subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, partitionNumbers);
}

public static void subscriberExample(
String cloudRegion,
char zoneId,
long projectNumber,
String subscriptionId)
String subscriptionId,
List<Integer> partitionNumbers)
throws StatusException {

SubscriptionPath subscriptionPath =
Expand All @@ -74,6 +78,11 @@ public static void subscriberExample(
.setMessagesOutstanding(1000L)
.build();

List<Partition> partitions = new ArrayList<>();
for (Integer num : partitionNumbers) {
partitions.add(Partition.of(num));
}

MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Id : " + message.getMessageId());
Expand All @@ -84,6 +93,7 @@ public static void subscriberExample(
SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartitions(partitions)
.setReceiver(receiver)
// Flow control settings are set at the partition level.
.setPerPartitionFlowControlSettings(flowControlSettings)
Expand All @@ -97,11 +107,10 @@ public static void subscriberExample(
System.out.println("Listening to messages on " + subscriptionPath.value() + "...");

try {
System.out.println(subscriber.state());
// Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
// Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and an
// IllegalStateException will be thrown.
subscriber.awaitTerminated(90, TimeUnit.SECONDS);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
subscriber.stopAsync().awaitTerminated();
Expand Down
21 changes: 7 additions & 14 deletions samples/snippets/src/test/java/pubsublite/QuickStartIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.google.common.collect.ImmutableList;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
Expand All @@ -37,21 +35,18 @@ public class QuickStartIT {

private ByteArrayOutputStream bout;
private PrintStream out;
private Random rand = new Random();
private List<String> cloudRegions =
Arrays.asList(
"us-central1", "europe-north1", "asia-east1", "australia-southeast1", "asia-northeast2");

private static final String GOOGLE_CLOUD_PROJECT_NUMBER =
System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER");
private String CLOUD_REGION = cloudRegions.get(rand.nextInt(cloudRegions.size()));
private static final String CLOUD_REGION = "us-central1";
private static final char ZONE_ID = 'b';
private static final Long PROJECT_NUMBER = Long.parseLong(GOOGLE_CLOUD_PROJECT_NUMBER);
private static final String SUFFIX = UUID.randomUUID().toString();
private static final String TOPIC_NAME = "lite-topic-" + SUFFIX;
private static final String SUBSCRIPTION_NAME = "lite-subscription-" + SUFFIX;
private static final int PARTITIONS = 2;
private static final int MESSAGE_COUNT = 10;
private static final int PARTITIONS = 1;
private static final int MESSAGE_COUNT = 1;
private static final List<Integer> PARTITION_NOS = ImmutableList.of(0);

private static void requireEnvVar(String varName) {
assertNotNull(
Expand Down Expand Up @@ -89,7 +84,7 @@ public void testQuickstart() throws Exception {
// Get a topic.
GetTopicExample.getTopicExample(CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, TOPIC_NAME);
assertThat(bout.toString()).contains(TOPIC_NAME);
assertThat(bout.toString()).contains(String.format("%s partition(s).", PARTITIONS));
assertThat(bout.toString()).contains("1 partition(s).");

bout.reset();
// List topics.
Expand Down Expand Up @@ -162,11 +157,9 @@ public void testQuickstart() throws Exception {
bout.reset();
// Subscribe.
SubscriberExample.subscriberExample(
CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME);
CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME, PARTITION_NOS);
assertThat(bout.toString()).contains("Listening");
for (int i = 0; i < MESSAGE_COUNT; ++i) {
assertThat(bout.toString()).contains(String.format("Data : message-%s", i));
}
assertThat(bout.toString()).contains("Data : message-0");
assertThat(bout.toString()).contains("Subscriber is shut down: TERMINATED");

bout.reset();
Expand Down

0 comments on commit eefe717

Please sign in to comment.