diff --git a/google-cloud-pubsublite/pom.xml b/google-cloud-pubsublite/pom.xml
index 79f839a8a..1d0985d8b 100644
--- a/google-cloud-pubsublite/pom.xml
+++ b/google-cloud-pubsublite/pom.xml
@@ -196,6 +196,21 @@
org.codehaus.mojo
flatten-maven-plugin
+
+
+ org.codehaus.mojo
+ clirr-maven-plugin
+
+ false
+
+
+
+
+ check
+
+
+
+
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java
index a446059a7..f5a4df8f0 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java
@@ -24,16 +24,20 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTrackerImpl;
+import com.google.cloud.pubsublite.cloudpubsub.internal.AssigningSubscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.MultiPartitionSubscriber;
+import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.cloudpubsub.internal.SinglePartitionSubscriber;
import com.google.cloud.pubsublite.internal.Preconditions;
+import com.google.cloud.pubsublite.internal.wire.AssignerBuilder;
+import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
+import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc;
-import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
@@ -51,17 +55,21 @@ public abstract class SubscriberSettings {
abstract SubscriptionPath subscriptionPath();
- abstract ImmutableList partitions();
-
abstract FlowControlSettings perPartitionFlowControlSettings();
// Optional parameters.
+
+ // If set, disables auto-assignment.
+ abstract Optional> partitions();
+
abstract Optional> transformer();
abstract Optional subscriberServiceStub();
abstract Optional cursorServiceStub();
+ abstract Optional assignmentServiceStub();
+
abstract Optional nackHandler();
public static Builder newBuilder() {
@@ -76,11 +84,12 @@ public abstract static class Builder {
public abstract Builder setSubscriptionPath(SubscriptionPath path);
- public abstract Builder setPartitions(List partition);
-
public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings);
// Optional parameters.
+ /** If set, disables auto-assignment. */
+ public abstract Builder setPartitions(List partition);
+
public abstract Builder setTransformer(
MessageTransformer transformer);
@@ -89,6 +98,8 @@ public abstract Builder setSubscriberServiceStub(
public abstract Builder setCursorServiceStub(CursorServiceGrpc.CursorServiceStub stub);
+ public abstract Builder setAssignmentServiceStub(PartitionAssignmentServiceStub stub);
+
public abstract Builder setNackHandler(NackHandler nackHandler);
abstract SubscriberSettings autoBuild();
@@ -96,7 +107,8 @@ public abstract Builder setSubscriberServiceStub(
public SubscriberSettings build() throws StatusException {
SubscriberSettings settings = autoBuild();
Preconditions.checkArgument(
- !settings.partitions().isEmpty(), "Must provide at least one partition.");
+ !settings.partitions().isPresent() || !settings.partitions().get().isEmpty(),
+ "Must provide at least one partition if setting partitions explicitly.");
SubscriptionPaths.check(settings.subscriptionPath());
return settings;
}
@@ -113,18 +125,36 @@ Subscriber instantiate() throws StatusException {
wireCommitterBuilder.setSubscriptionPath(subscriptionPath());
cursorServiceStub().ifPresent(wireCommitterBuilder::setCursorStub);
- List perPartitionSubscribers = new ArrayList<>();
- for (Partition partition : partitions()) {
- wireSubscriberBuilder.setPartition(partition);
- wireCommitterBuilder.setPartition(partition);
- perPartitionSubscribers.add(
- new SinglePartitionSubscriber(
+ PartitionSubscriberFactory partitionSubscriberFactory =
+ partition -> {
+ wireSubscriberBuilder.setPartition(partition);
+ wireCommitterBuilder.setPartition(partition);
+ return new SinglePartitionSubscriber(
receiver(),
transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()),
new AckSetTrackerImpl(wireCommitterBuilder.build()),
nackHandler().orElse(new NackHandler() {}),
messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(),
- perPartitionFlowControlSettings()));
+ perPartitionFlowControlSettings());
+ };
+
+ if (!partitions().isPresent()) {
+ AssignerBuilder.Builder assignerBuilder = AssignerBuilder.newBuilder();
+ assignerBuilder.setSubscriptionPath(subscriptionPath());
+ assignmentServiceStub().ifPresent(assignerBuilder::setAssignmentStub);
+ AssignerFactory assignerFactory =
+ receiver -> {
+ assignerBuilder.setReceiver(receiver);
+ return assignerBuilder.build();
+ };
+ return new AssigningSubscriber(partitionSubscriberFactory, assignerFactory);
+ }
+
+ List perPartitionSubscribers = new ArrayList<>();
+ for (Partition partition : partitions().get()) {
+ wireSubscriberBuilder.setPartition(partition);
+ wireCommitterBuilder.setPartition(partition);
+ perPartitionSubscribers.add(partitionSubscriberFactory.New(partition));
}
return MultiPartitionSubscriber.of(perPartitionSubscribers);
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java
new file mode 100755
index 000000000..4c7da665d
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal.wire;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.Endpoints;
+import com.google.cloud.pubsublite.Stubs;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.SubscriptionPaths;
+import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
+import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc;
+import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
+import com.google.common.flogger.GoogleLogger;
+import com.google.protobuf.ByteString;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.UUID;
+
+@AutoValue
+public abstract class AssignerBuilder {
+ private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+ // Required parameters.
+ abstract SubscriptionPath subscriptionPath();
+
+ abstract PartitionAssignmentReceiver receiver();
+
+ // Optional parameters.
+ abstract Optional assignmentStub();
+
+ public static Builder newBuilder() {
+ return new AutoValue_AssignerBuilder.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ // Required parameters.
+ public abstract Builder setSubscriptionPath(SubscriptionPath path);
+
+ public abstract Builder setReceiver(PartitionAssignmentReceiver receiver);
+
+ // Optional parameters.
+ public abstract Builder setAssignmentStub(PartitionAssignmentServiceStub stub);
+
+ abstract AssignerBuilder autoBuild();
+
+ @SuppressWarnings("CheckReturnValue")
+ public Assigner build() throws StatusException {
+ AssignerBuilder builder = autoBuild();
+ SubscriptionPaths.check(builder.subscriptionPath());
+
+ PartitionAssignmentServiceStub stub;
+ if (builder.assignmentStub().isPresent()) {
+ stub = builder.assignmentStub().get();
+ } else {
+ try {
+ stub =
+ Stubs.defaultStub(
+ Endpoints.regionalEndpoint(
+ SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
+ PartitionAssignmentServiceGrpc::newStub);
+ } catch (IOException e) {
+ throw Status.INTERNAL
+ .withCause(e)
+ .withDescription("Creating assigner stub failed.")
+ .asException();
+ }
+ }
+
+ UUID uuid = UUID.randomUUID();
+ ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
+ uuidBuffer.putLong(uuid.getMostSignificantBits());
+ uuidBuffer.putLong(uuid.getLeastSignificantBits());
+ logger.atInfo().log(
+ "Subscription %s using UUID %s for assignment.",
+ builder.subscriptionPath().value(), uuid);
+
+ InitialPartitionAssignmentRequest initial =
+ InitialPartitionAssignmentRequest.newBuilder()
+ .setSubscription(builder.subscriptionPath().value())
+ .setClientId(ByteString.copyFrom(uuidBuffer.array()))
+ .build();
+ return new AssignerImpl(
+ stub, new ConnectedAssignerImpl.Factory(), initial, builder.receiver());
+ }
+ }
+}