From 9eb2b992d671e43f71ce2ff893fcb2f1af4c5894 Mon Sep 17 00:00:00 2001 From: Evan Palmer Date: Thu, 24 Sep 2020 10:48:18 -0400 Subject: [PATCH 1/2] Add an independent TopicBacklogReaderSettings --- .../beam/PubsubLiteUnboundedSource.java | 2 +- .../pubsublite/beam/SubscriberOptions.java | 44 ++----- .../pubsublite/beam/TopicBacklogReader.java | 5 + .../beam/TopicBacklogReaderSettings.java | 83 +++++++++++++ .../pubsublite/beam/templates/Example.java | 114 ++++++++++++++++++ 5 files changed, 216 insertions(+), 32 deletions(-) create mode 100644 pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java create mode 100644 pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java index 497c55b30..eb84a6e0f 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java @@ -98,7 +98,7 @@ public UnboundedReader createReader( return new PubsubLiteUnboundedReader( this, statesBuilder.build(), - subscriberOptions.topicBacklogReader(), + TopicBacklogReader.create(subscriberOptions.topicBacklogReaderSettings()), Ticker.systemTicker()); } catch (StatusException e) { throw new IOException(e); diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index ebc4cc9d0..8d47943b9 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -17,16 +17,10 @@ package com.google.cloud.pubsublite.beam; import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.AdminClient; -import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.ExtractStatus; -import com.google.cloud.pubsublite.internal.TopicStatsClient; -import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; import com.google.cloud.pubsublite.internal.wire.PubsubContext; @@ -40,7 +34,6 @@ import com.google.common.collect.ImmutableSet; import io.grpc.StatusException; import java.io.Serializable; -import java.util.concurrent.ExecutionException; @AutoValue public abstract class SubscriberOptions implements Serializable { @@ -57,8 +50,8 @@ public abstract class SubscriberOptions implements Serializable { /** A set of partitions. If empty, retrieve the set of partitions using an admin client. */ public abstract ImmutableSet partitions(); - /** The class used to read backlog for the subscription described by subscriptionPath() */ - public abstract TopicBacklogReader topicBacklogReader(); + /** The class used to read backlog for the subscription described by subscriptionPath(). */ + public abstract TopicBacklogReaderSettings topicBacklogReaderSettings(); /** A supplier for the subscriber stub to be used. */ public abstract Optional> subscriberStubSupplier(); @@ -143,7 +136,8 @@ public abstract Builder setSubscriberStubSupplier( public abstract Builder setCommitterStubSupplier( SerializableSupplier stubSupplier); - public abstract Builder setTopicBacklogReader(TopicBacklogReader topicBacklogReader); + public abstract Builder setTopicBacklogReaderSettings( + TopicBacklogReaderSettings topicBacklogReaderSettings); // Used in unit tests abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory); @@ -155,41 +149,29 @@ public abstract Builder setCommitterStubSupplier( abstract ImmutableSet partitions(); - abstract Optional topicBacklogReader(); + abstract Optional topicBacklogReaderSettings(); abstract SubscriberOptions autoBuild(); public SubscriberOptions build() throws StatusException { - if (!partitions().isEmpty() && topicBacklogReader().isPresent()) { + if (!partitions().isEmpty() && topicBacklogReaderSettings().isPresent()) { return autoBuild(); } - TopicPath path; - try (AdminClient adminClient = - AdminClient.create( - AdminClientSettings.newBuilder() - .setRegion(subscriptionPath().location().region()) - .build())) { - path = TopicPath.parse(adminClient.getSubscription(subscriptionPath()).get().getTopic()); - } catch (ExecutionException e) { - throw ExtractStatus.toCanonical(e.getCause()); - } catch (Throwable t) { - throw ExtractStatus.toCanonical(t); - } if (partitions().isEmpty()) { - int partition_count = PartitionLookupUtils.numPartitions(path); + int partitionCount = PartitionLookupUtils.numPartitions(subscriptionPath()); ImmutableSet.Builder partitions = ImmutableSet.builder(); - for (int i = 0; i < partition_count; i++) { + for (int i = 0; i < partitionCount; i++) { partitions.add(Partition.of(i)); } setPartitions(partitions.build()); } - if (!topicBacklogReader().isPresent()) { - setTopicBacklogReader( - new TopicBacklogReaderImpl( - TopicStatsClient.create(TopicStatsClientSettings.newBuilder().build()), path)); + if (!topicBacklogReaderSettings().isPresent()) { + setTopicBacklogReaderSettings( + TopicBacklogReaderSettings.newBuilder() + .setTopicPathFromSubscriptionPath(subscriptionPath()) + .build()); } - return autoBuild(); } } diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java index 190b1bd10..c8316eebb 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReader.java @@ -19,6 +19,7 @@ import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import io.grpc.StatusException; import java.util.Map; /** @@ -27,6 +28,10 @@ * partitions within a subscription. */ public interface TopicBacklogReader { + /** Create a TopicBacklogReader from settings. */ + static TopicBacklogReader create(TopicBacklogReaderSettings settings) throws StatusException { + return settings.instantiate(); + } /** * Compute and aggregate message statistics for message between the provided start offset and HEAD * for each partition. diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java new file mode 100644 index 000000000..8afadc18a --- /dev/null +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/TopicBacklogReaderSettings.java @@ -0,0 +1,83 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.beam; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.TopicStatsClient; +import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; +import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceBlockingStub; +import com.google.common.base.Optional; +import io.grpc.StatusException; +import java.io.Serializable; +import java.util.concurrent.ExecutionException; + +@AutoValue +public abstract class TopicBacklogReaderSettings implements Serializable { + /** + * The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If + * both are set, subscriptionPath will be ignored. + */ + abstract TopicPath topicPath(); + + // Optional parameters + abstract Optional> stub(); + + public static Builder newBuilder() { + return new AutoValue_TopicBacklogReaderSettings.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + // Required parameters. + public abstract Builder setTopicPath(TopicPath topicPath); + + public Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) + throws StatusException { + try (AdminClient adminClient = + AdminClient.create( + AdminClientSettings.newBuilder() + .setRegion(subscriptionPath.location().region()) + .build())) { + setTopicPath( + TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic())); + return this; + } catch (ExecutionException e) { + throw ExtractStatus.toCanonical(e.getCause()); + } catch (Throwable t) { + throw ExtractStatus.toCanonical(t); + } + } + + public abstract Builder setStub(SerializableSupplier stub); + + public abstract TopicBacklogReaderSettings build(); + } + + TopicBacklogReader instantiate() throws StatusException { + TopicStatsClientSettings.Builder builder = TopicStatsClientSettings.newBuilder(); + if (stub().isPresent()) { + builder.setStub(stub().get().get()); + } + builder.setRegion(topicPath().location().region()); + return new TopicBacklogReaderImpl(TopicStatsClient.create(builder.build()), topicPath()); + } +} diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java new file mode 100644 index 000000000..c41f1e373 --- /dev/null +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java @@ -0,0 +1,114 @@ +package com.google.cloud.pubsublite.beam.templates; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.beam.PublisherOptions; +import com.google.cloud.pubsublite.beam.PubsubLiteIO; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.protobuf.ByteString; +import io.grpc.StatusException; +import java.io.IOException; +import java.util.Arrays; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.FlatMapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.TypeDescriptors; + +public class PubsubLiteToPubsubLite { + + static PublisherOptions publisherOptions() throws StatusException { + return PublisherOptions.newBuilder().setTopicPath(TopicPath.parse("/yolo")).build(); + } + + static Pipeline publishPipeline() throws StatusException { + PipelineOptions options = PipelineOptionsFactory.create(); + + + Pipeline pipeline = Pipeline.create(options); + + pipeline + .apply("ReadLines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) + .apply("ExtractWords", FlatMapElements + .into(TypeDescriptors.strings()) + .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) + .apply(ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String word, OutputReceiver output) + throws StatusException { + output.output(Message.builder().setData(ByteString.copyFromUtf8(word)).build()); + } + })) + .apply(PubsubLiteIO.addUuids()) + .apply(PubsubLiteIO.write(publisherOptions())); + return pipeline; + + } + + static Pipeline subscribePipeline() throws StatusException { + PipelineOptions options = PipelineOptionsFactory.create(); + + + Pipeline pipeline = Pipeline.create(options); + + pipeline + .apply("ReadLines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) + .apply("ExtractWords", FlatMapElements + .into(TypeDescriptors.strings()) + .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) + .apply(ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String word, OutputReceiver output) + throws StatusException { + output.output(Message.builder().setData(ByteString.copyFromUtf8(word)).build()); + } + })) + .apply(PubsubLiteIO.addUuids()) + .apply(PubsubLiteIO.write(publisherOptions())); + return pipeline; + + } + + public static void main(String[] args) throws IOException, StatusException { + /** + * Sets up and starts streaming pipeline. + * + * @throws IOException if there is a problem setting up resources + */ + PipelineOptions options = PipelineOptionsFactory.create(); + + + Pipeline pipeline = Pipeline.create(options); + + pipeline + .apply("ReadLines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) + .apply("ExtractWords", FlatMapElements + .into(TypeDescriptors.strings()) + .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) + .apply(ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String word, OutputReceiver output) + throws StatusException { + output.output(Message.builder().setData(ByteString.copyFromUtf8(word)).build()); + } + })) + .apply(PubsubLiteIO.addUuids()) + .apply(PubsubLiteIO.write(publisherOptions())); + + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + } + } + +} From 153a3e0fe8f1c2830db370ce6dc0baeca47a6642 Mon Sep 17 00:00:00 2001 From: palmere-google <68394592+palmere-google@users.noreply.github.com> Date: Thu, 24 Sep 2020 10:58:07 -0400 Subject: [PATCH 2/2] Delete Example.java --- .../pubsublite/beam/templates/Example.java | 114 ------------------ 1 file changed, 114 deletions(-) delete mode 100644 pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java deleted file mode 100644 index c41f1e373..000000000 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/templates/Example.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.google.cloud.pubsublite.beam.templates; - -import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; - -import com.google.cloud.pubsublite.Message; -import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.beam.PublisherOptions; -import com.google.cloud.pubsublite.beam.PubsubLiteIO; -import com.google.cloud.pubsublite.proto.PubSubMessage; -import com.google.protobuf.ByteString; -import io.grpc.StatusException; -import java.io.IOException; -import java.util.Arrays; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.FlatMapElements; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.TypeDescriptors; - -public class PubsubLiteToPubsubLite { - - static PublisherOptions publisherOptions() throws StatusException { - return PublisherOptions.newBuilder().setTopicPath(TopicPath.parse("/yolo")).build(); - } - - static Pipeline publishPipeline() throws StatusException { - PipelineOptions options = PipelineOptionsFactory.create(); - - - Pipeline pipeline = Pipeline.create(options); - - pipeline - .apply("ReadLines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) - .apply("ExtractWords", FlatMapElements - .into(TypeDescriptors.strings()) - .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) - .apply(ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(@Element String word, OutputReceiver output) - throws StatusException { - output.output(Message.builder().setData(ByteString.copyFromUtf8(word)).build()); - } - })) - .apply(PubsubLiteIO.addUuids()) - .apply(PubsubLiteIO.write(publisherOptions())); - return pipeline; - - } - - static Pipeline subscribePipeline() throws StatusException { - PipelineOptions options = PipelineOptionsFactory.create(); - - - Pipeline pipeline = Pipeline.create(options); - - pipeline - .apply("ReadLines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) - .apply("ExtractWords", FlatMapElements - .into(TypeDescriptors.strings()) - .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) - .apply(ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(@Element String word, OutputReceiver output) - throws StatusException { - output.output(Message.builder().setData(ByteString.copyFromUtf8(word)).build()); - } - })) - .apply(PubsubLiteIO.addUuids()) - .apply(PubsubLiteIO.write(publisherOptions())); - return pipeline; - - } - - public static void main(String[] args) throws IOException, StatusException { - /** - * Sets up and starts streaming pipeline. - * - * @throws IOException if there is a problem setting up resources - */ - PipelineOptions options = PipelineOptionsFactory.create(); - - - Pipeline pipeline = Pipeline.create(options); - - pipeline - .apply("ReadLines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) - .apply("ExtractWords", FlatMapElements - .into(TypeDescriptors.strings()) - .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) - .apply(ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(@Element String word, OutputReceiver output) - throws StatusException { - output.output(Message.builder().setData(ByteString.copyFromUtf8(word)).build()); - } - })) - .apply(PubsubLiteIO.addUuids()) - .apply(PubsubLiteIO.write(publisherOptions())); - - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - - } - } - -}