diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java index 497c55b30..eb84a6e0f 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java @@ -98,7 +98,7 @@ public UnboundedReader createReader( return new PubsubLiteUnboundedReader( this, statesBuilder.build(), - subscriberOptions.topicBacklogReader(), + TopicBacklogReader.create(subscriberOptions.topicBacklogReaderSettings()), Ticker.systemTicker()); } catch (StatusException e) { throw new IOException(e); diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index ebc4cc9d0..8d47943b9 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -17,16 +17,10 @@ package com.google.cloud.pubsublite.beam; import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.AdminClient; -import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.ExtractStatus; -import com.google.cloud.pubsublite.internal.TopicStatsClient; -import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; import com.google.cloud.pubsublite.internal.wire.PubsubContext; @@ -40,7 +34,6 @@ import com.google.common.collect.ImmutableSet; import io.grpc.StatusException; import java.io.Serializable; -import java.util.concurrent.ExecutionException; @AutoValue public abstract class SubscriberOptions implements Serializable { @@ -57,8 +50,8 @@ public abstract class SubscriberOptions implements Serializable { /** A set of partitions. If empty, retrieve the set of partitions using an admin client. */ public abstract ImmutableSet partitions(); - /** The class used to read backlog for the subscription described by subscriptionPath() */ - public abstract TopicBacklogReader topicBacklogReader(); + /** The class used to read backlog for the subscription described by subscriptionPath(). */ + public abstract TopicBacklogReaderSettings topicBacklogReaderSettings(); /** A supplier for the subscriber stub to be used. */ public abstract Optional> subscriberStubSupplier(); @@ -143,7 +136,8 @@ public abstract Builder setSubscriberStubSupplier( public abstract Builder setCommitterStubSupplier( SerializableSupplier stubSupplier); - public abstract Builder setTopicBacklogReader(TopicBacklogReader topicBacklogReader); + public abstract Builder setTopicBacklogReaderSettings( + TopicBacklogReaderSettings topicBacklogReaderSettings); // Used in unit tests abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory); @@ -155,41 +149,29 @@ public abstract Builder setCommitterStubSupplier( abstract ImmutableSet partitions(); - abstract Optional topicBacklogReader(); + abstract Optional topicBacklogReaderSettings(); abstract SubscriberOptions autoBuild(); public SubscriberOptions build() throws StatusException { - if (!partitions().isEmpty() && topicBacklogReader().isPresent()) { + if (!partitions().isEmpty() && topicBacklogReaderSettings().isPresent()) { return autoBuild(); } - TopicPath path; - try (AdminClient adminClient = - AdminClient.create( - AdminClientSettings.newBuilder() - .setRegion(subscriptionPath().location().region()) - .build())) { - path = TopicPath.parse(adminClient.getSubscription(subscriptionPath()).get().getTopic()); - } catch (ExecutionException e) { - throw ExtractStatus.toCanonical(e.getCause()); - } catch (Throwable t) { - throw ExtractStatus.toCanonical(t); - } if (partitions().isEmpty()) { - int partition_count = PartitionLookupUtils.numPartitions(path); + int partitionCount = PartitionLookupUtils.numPartitions(subscriptionPath()); ImmutableSet.Builder partitions = ImmutableSet.builder(); - for (int i = 0; i < partition_count; i++) { + for (int i = 0; i < partitionCount; i++) { partitions.add(Partition.of(i)); } setPartitions(partitions.build()); } - if (!topicBacklogReader().isPresent()) { - setTopicBacklogReader( - new TopicBacklogReaderImpl( - TopicStatsClient.create(TopicStatsClientSettings.newBuilder().build()), path)); + if (!topicBacklogReaderSettings().isPresent()) { + setTopicBacklogReaderSettings( + TopicBacklogReaderSettings.newBuilder() + .setTopicPathFromSubscriptionPath(subscriptionPath()) + .build()); } - return autoBuild(); } } diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java index 190b1bd10..c8316eebb 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java @@ -19,6 +19,7 @@ import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import io.grpc.StatusException; import java.util.Map; /** @@ -27,6 +28,10 @@ * partitions within a subscription. */ public interface TopicBacklogReader { + /** Create a TopicBacklogReader from settings. */ + static TopicBacklogReader create(TopicBacklogReaderSettings settings) throws StatusException { + return settings.instantiate(); + } /** * Compute and aggregate message statistics for message between the provided start offset and HEAD * for each partition. diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java new file mode 100644 index 000000000..8afadc18a --- /dev/null +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java @@ -0,0 +1,83 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.beam; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.TopicStatsClient; +import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; +import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceBlockingStub; +import com.google.common.base.Optional; +import io.grpc.StatusException; +import java.io.Serializable; +import java.util.concurrent.ExecutionException; + +@AutoValue +public abstract class TopicBacklogReaderSettings implements Serializable { + /** + * The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If + * both are set, subscriptionPath will be ignored. + */ + abstract TopicPath topicPath(); + + // Optional parameters + abstract Optional> stub(); + + public static Builder newBuilder() { + return new AutoValue_TopicBacklogReaderSettings.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + // Required parameters. + public abstract Builder setTopicPath(TopicPath topicPath); + + public Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) + throws StatusException { + try (AdminClient adminClient = + AdminClient.create( + AdminClientSettings.newBuilder() + .setRegion(subscriptionPath.location().region()) + .build())) { + setTopicPath( + TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic())); + return this; + } catch (ExecutionException e) { + throw ExtractStatus.toCanonical(e.getCause()); + } catch (Throwable t) { + throw ExtractStatus.toCanonical(t); + } + } + + public abstract Builder setStub(SerializableSupplier stub); + + public abstract TopicBacklogReaderSettings build(); + } + + TopicBacklogReader instantiate() throws StatusException { + TopicStatsClientSettings.Builder builder = TopicStatsClientSettings.newBuilder(); + if (stub().isPresent()) { + builder.setStub(stub().get().get()); + } + builder.setRegion(topicPath().location().region()); + return new TopicBacklogReaderImpl(TopicStatsClient.create(builder.build()), topicPath()); + } +}