Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .readme-partials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ custom_content: |
RetentionConfig.newBuilder()
// How long messages are retained.
.setPeriod(Durations.fromDays(1))
// Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB.
// Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB.
// If the number of bytes stored in any of the topic's partitions grows
// beyond this value, older messages will be dropped to make room for
// newer ones, regardless of the value of `period`.
.setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
.setPerPartitionBytes(100 * 1024 * 1024 * 1024L))
.setName(topicPath.value())
.build();

Expand Down Expand Up @@ -210,6 +210,7 @@ custom_content: |
String topicId = "your-topic-id";
// Choose an existing subscription.
String subscriptionId = "your-subscription-id";
List<Integer> partitionNumbers = ImmutableList.of(0);

SubscriptionPath subscriptionPath =
SubscriptionPaths.newBuilder()
Expand All @@ -228,6 +229,11 @@ custom_content: |
.setMessagesOutstanding(1000L)
.build();

List<Partition> partitions = new ArrayList<>();
for (Integer num : partitionNumbers) {
partitions.add(Partition.of(num));
}

MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Id : " + message.getMessageId());
Expand All @@ -238,6 +244,7 @@ custom_content: |
SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartitions(partitions)
.setReceiver(receiver)
// Flow control settings are set at the partition level.
.setPerPartitionFlowControlSettings(flowControlSettings)
Expand All @@ -251,11 +258,10 @@ custom_content: |
System.out.println("Listening to messages on " + subscriptionPath.value() + "...");

try {
System.out.println(subscriber.state());
// Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
// Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and an
// IllegalStateException will be thrown.
subscriber.awaitTerminated(90, TimeUnit.SECONDS);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
subscriber.stopAsync().awaitTerminated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception {
char zoneId = 'b';
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int partitions = 1;
Integer partitions = 1;

createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions);
}
Expand Down Expand Up @@ -67,11 +67,11 @@ public static void createTopicExample(
RetentionConfig.newBuilder()
// How long messages are retained.
.setPeriod(Durations.fromDays(1))
// Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB.
// Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB.
// If the number of bytes stored in any of the topic's partitions grows
// beyond this value, older messages will be dropped to make room for
// newer ones, regardless of the value of `period`.
.setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
.setPerPartitionBytes(100 * 1024 * 1024 * 1024L))
.setName(topicPath.value())
.build();

Expand Down
19 changes: 14 additions & 5 deletions samples/snippets/src/main/java/pubsublite/SubscriberExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@ public static void main(String... args) throws Exception {
// Choose an existing subscription for the subscribe example to work.
String subscriptionId = "your-subscription-id";
long projectNumber = Long.parseLong("123456789");
// List of partitions to subscribe to. It can be all the partitions in a topic or
// a subset of them. A topic of N partitions has partition numbers [0~N-1].
List<Integer> partitionNumbers = ImmutableList.of(0);

subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, partitionNumbers);
}

public static void subscriberExample(
String cloudRegion,
char zoneId,
long projectNumber,
String subscriptionId)
String subscriptionId,
List<Integer> partitionNumbers)
throws StatusException {

SubscriptionPath subscriptionPath =
Expand All @@ -74,6 +78,11 @@ public static void subscriberExample(
.setMessagesOutstanding(1000L)
.build();

List<Partition> partitions = new ArrayList<>();
for (Integer num : partitionNumbers) {
partitions.add(Partition.of(num));
}

MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Id : " + message.getMessageId());
Expand All @@ -84,6 +93,7 @@ public static void subscriberExample(
SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartitions(partitions)
.setReceiver(receiver)
// Flow control settings are set at the partition level.
.setPerPartitionFlowControlSettings(flowControlSettings)
Expand All @@ -97,11 +107,10 @@ public static void subscriberExample(
System.out.println("Listening to messages on " + subscriptionPath.value() + "...");

try {
System.out.println(subscriber.state());
// Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
// Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and an
// IllegalStateException will be thrown.
subscriber.awaitTerminated(90, TimeUnit.SECONDS);
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
subscriber.stopAsync().awaitTerminated();
Expand Down
21 changes: 7 additions & 14 deletions samples/snippets/src/test/java/pubsublite/QuickStartIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.google.common.collect.ImmutableList;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
Expand All @@ -37,21 +35,18 @@ public class QuickStartIT {

private ByteArrayOutputStream bout;
private PrintStream out;
private Random rand = new Random();
private List<String> cloudRegions =
Arrays.asList(
"us-central1", "europe-north1", "asia-east1", "australia-southeast1", "asia-northeast2");

private static final String GOOGLE_CLOUD_PROJECT_NUMBER =
System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER");
private String CLOUD_REGION = cloudRegions.get(rand.nextInt(cloudRegions.size()));
private static final String CLOUD_REGION = "us-central1";
private static final char ZONE_ID = 'b';
private static final Long PROJECT_NUMBER = Long.parseLong(GOOGLE_CLOUD_PROJECT_NUMBER);
private static final String SUFFIX = UUID.randomUUID().toString();
private static final String TOPIC_NAME = "lite-topic-" + SUFFIX;
private static final String SUBSCRIPTION_NAME = "lite-subscription-" + SUFFIX;
private static final int PARTITIONS = 2;
private static final int MESSAGE_COUNT = 10;
private static final int PARTITIONS = 1;
private static final int MESSAGE_COUNT = 1;
private static final List<Integer> PARTITION_NOS = ImmutableList.of(0);

private static void requireEnvVar(String varName) {
assertNotNull(
Expand Down Expand Up @@ -89,7 +84,7 @@ public void testQuickstart() throws Exception {
// Get a topic.
GetTopicExample.getTopicExample(CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, TOPIC_NAME);
assertThat(bout.toString()).contains(TOPIC_NAME);
assertThat(bout.toString()).contains(String.format("%s partition(s).", PARTITIONS));
assertThat(bout.toString()).contains("1 partition(s).");

bout.reset();
// List topics.
Expand Down Expand Up @@ -162,11 +157,9 @@ public void testQuickstart() throws Exception {
bout.reset();
// Subscribe.
SubscriberExample.subscriberExample(
CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME);
CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME, PARTITION_NOS);
assertThat(bout.toString()).contains("Listening");
for (int i = 0; i < MESSAGE_COUNT; ++i) {
assertThat(bout.toString()).contains(String.format("Data : message-%s", i));
}
assertThat(bout.toString()).contains("Data : message-0");
assertThat(bout.toString()).contains("Subscriber is shut down: TERMINATED");

bout.reset();
Expand Down