From 90b9d2d7f98d6b5828e3c70a8365c10af1f392b9 Mon Sep 17 00:00:00 2001 From: sammcveety Date: Tue, 13 Dec 2016 06:52:03 -0800 Subject: [PATCH 01/11] Update PubsubIO.java --- .../cloud/dataflow/sdk/io/PubsubIO.java | 176 +++++++++++++++--- 1 file changed, 149 insertions(+), 27 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index 48114e61c1..2bec7012dd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -22,6 +22,9 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.ValueProvider; +import com.google.cloud.dataflow.sdk.options.ValueProvider.NestedValueProvider; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -136,7 +139,7 @@ private static void validatePubsubName(String name) { * Populate common {@link DisplayData} between Pubsub source and sink. */ private static void populateCommonDisplayData(DisplayData.Builder builder, - String timestampLabel, String idLabel, PubsubTopic topic) { + String timestampLabel, String idLabel, String topic) { builder .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel) .withLabel("Timestamp Label Attribute")) @@ -144,7 +147,7 @@ private static void populateCommonDisplayData(DisplayData.Builder builder, .withLabel("ID Label Attribute")); if (topic != null) { - builder.add(DisplayData.item("topic", topic.asPath()) + builder.add(DisplayData.item("topic", topic) .withLabel("Pubsub Topic")); } } @@ -254,6 +257,61 @@ public String asPath() { } } + /** + * Used to build a {@link ValueProvider} for {@link PubsubSubscription}. + */ + private static class SubscriptionTranslator + implements SerializableFunction { + @Override + public PubsubSubscription apply(String from) { + return PubsubSubscription.fromPath(from); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link SubscriptionPath}. + */ + private static class SubscriptionPathTranslator + implements SerializableFunction { + @Override + public SubscriptionPath apply(PubsubSubscription from) { + return PubsubClient.subscriptionPathFromName(from.project, from.subscription); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link PubsubTopic}. + */ + private static class TopicTranslator + implements SerializableFunction { + @Override + public PubsubTopic apply(String from) { + return PubsubTopic.fromPath(from); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link TopicPath}. + */ + private static class TopicPathTranslator + implements SerializableFunction { + @Override + public TopicPath apply(PubsubTopic from) { + return PubsubClient.topicPathFromName(from.project, from.topic); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link ProjectPath}. + */ + private static class ProjectPathTranslator + implements SerializableFunction { + @Override + public ProjectPath apply(PubsubTopic from) { + return PubsubClient.projectPathFromId(from.project); + } + } + /** * Class representing a Cloud Pub/Sub Topic. */ @@ -374,6 +432,13 @@ public static class Read { * name. */ public static Bound named(String name) { + return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public static Bound topic(ValueProvider topic) { return new Bound<>(DEFAULT_PUBSUB_CODER).named(name); } @@ -400,6 +465,13 @@ public static Bound topic(String topic) { * of the {@code subscription} string. */ public static Bound subscription(String subscription) { + return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(StaticValueProvider.of(subscription)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public static Bound subscription(ValueProvider subscription) { return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription); } @@ -493,10 +565,10 @@ public static Bound maxReadTime(Duration maxReadTime) { */ public static class Bound extends PTransform> { /** The Cloud Pub/Sub topic to read from. */ - @Nullable private final PubsubTopic topic; + @Nullable private final ValueProvider topic; /** The Cloud Pub/Sub subscription to read from. */ - @Nullable private final PubsubSubscription subscription; + @Nullable private final ValueProvider subscription; /** The name of the message attribute to read timestamps from. */ @Nullable private final String timestampLabel; @@ -517,9 +589,9 @@ private Bound(Coder coder) { this(null, null, null, null, coder, null, 0, null); } - private Bound(String name, PubsubSubscription subscription, PubsubTopic topic, - String timestampLabel, Coder coder, String idLabel, int maxNumRecords, - Duration maxReadTime) { + private Bound(String name, ValueProvider subscription, + ValueProvider topic, String timestampLabel, Coder coder, + String idLabel, int maxNumRecords, Duration maxReadTime) { super(name); this.subscription = subscription; this.topic = topic; @@ -554,8 +626,16 @@ public Bound named(String name) { *

Does not modify this object. */ public Bound subscription(String subscription) { - return new Bound<>(name, PubsubSubscription.fromPath(subscription), topic, timestampLabel, - coder, idLabel, maxNumRecords, maxReadTime); + return subscription(StaticValueProvider.of(subscription)); + } + + /** + * Like {@code subscription()} but with a {@link ValueProvider}. + */ + public Bound subscription(ValueProvider subscription) { + return new Bound<>(name, + NestedValueProvider.of(subscription, new SubscriptionTranslator()), + topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime); } /** @@ -567,8 +647,16 @@ public Bound subscription(String subscription) { *

Does not modify this object. */ public Bound topic(String topic) { - return new Bound<>(name, subscription, PubsubTopic.fromPath(topic), timestampLabel, coder, - idLabel, maxNumRecords, maxReadTime); + return topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public Bound topic(ValueProvider topic) { + return new Bound<>(name, subscription, + NestedValueProvider.of(topic, new TopicTranslator()), + timestampLabel, coder, idLabel, maxNumRecords, maxReadTime); } /** @@ -648,15 +736,14 @@ public PCollection apply(PInput input) { .apply(ParDo.of(new PubsubReader())) .setCoder(coder); } else { - @Nullable ProjectPath projectPath = - topic == null ? null : PubsubClient.projectPathFromId(topic.project); - @Nullable TopicPath topicPath = - topic == null ? null : PubsubClient.topicPathFromName(topic.project, topic.topic); - @Nullable SubscriptionPath subscriptionPath = + @Nullable ValueProvider projectPath = + topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator()); + @Nullable ValueProvider topicPath = + topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator()); + @Nullable ValueProvider subscriptionPath = subscription == null ? null - : PubsubClient.subscriptionPathFromName( - subscription.project, subscription.subscription); + : NestedValueProvider.of(subscription, new SubscriptionPathTranslator()); return input.getPipeline().begin() .apply(new PubsubUnboundedSource( FACTORY, projectPath, topicPath, subscriptionPath, @@ -666,8 +753,11 @@ public PCollection apply(PInput input) { @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - populateCommonDisplayData(builder, timestampLabel, idLabel, topic); + String topicString = + topic == null ? null + : topic.isAccessible() ? topic.get().asPath() + : topic.toString(); + populateCommonDisplayData(builder, timestampLabel, idLabel, topicString); builder .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime) @@ -676,8 +766,10 @@ public void populateDisplayData(DisplayData.Builder builder) { .withLabel("Maximum Read Records"), 0); if (subscription != null) { - builder.add(DisplayData.item("subscription", subscription.asPath()) - .withLabel("Pubsub Subscription")); + String subscriptionString = subscription.isAccessible() + ? subscription.get().asPath() : subscription.toString(); + builder.add(DisplayData.item("subscription", subscriptionString) + .withLabel("Pubsub Subscription")); } } @@ -687,10 +779,18 @@ protected Coder getDefaultOutputCoder() { } public PubsubTopic getTopic() { + return topic == null ? null : topic.get(); + } + + public ValueProvider getTopicProvider() { return topic; } public PubsubSubscription getSubscription() { + return subscription == null ? null : subscription.get(); + } + + public ValueProvider getSubscriptionProvider() { return subscription; } @@ -847,6 +947,13 @@ public static Bound named(String name) { * {@code topic} string. */ public static Bound topic(String topic) { + return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public static Bound topic(ValueProvider topic) { return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic); } @@ -896,7 +1003,7 @@ public static Bound withCoder(Coder coder) { */ public static class Bound extends PTransform, PDone> { /** The Cloud Pub/Sub topic to publish to. */ - @Nullable private final PubsubTopic topic; + @Nullable private final ValueProvider topic; /** The name of the message attribute to publish message timestamps in. */ @Nullable private final String timestampLabel; /** The name of the message attribute to publish unique message IDs in. */ @@ -908,7 +1015,8 @@ private Bound(Coder coder) { } private Bound( - String name, PubsubTopic topic, String timestampLabel, String idLabel, Coder coder) { + String name, ValueProvider topic, String timestampLabel, + String idLabel, Coder coder) { super(name); this.topic = topic; this.timestampLabel = timestampLabel; @@ -936,7 +1044,15 @@ public Bound named(String name) { *

Does not modify this object. */ public Bound topic(String topic) { - return new Bound<>(name, PubsubTopic.fromPath(topic), timestampLabel, idLabel, coder); + return topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public Bound topic(ValueProvider topic) { + return new Bound<>(name, NestedValueProvider.of(topic, new TopicTranslator()), + timestampLabel, idLabel, coder); } /** @@ -987,7 +1103,7 @@ public PDone apply(PCollection input) { case UNBOUNDED: return input.apply(new PubsubUnboundedSink( FACTORY, - PubsubClient.topicPathFromName(topic.project, topic.topic), + NestedValueProvider.of(topic, new TopicPathTranslator()), coder, timestampLabel, idLabel, @@ -999,7 +1115,9 @@ public PDone apply(PCollection input) { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - populateCommonDisplayData(builder, timestampLabel, idLabel, topic); + String topicString = topic.isAccessible() + ? topic.get().asPath() : topic.toString(); + populateCommonDisplayData(builder, timestampLabel, idLabel, topicString); } @Override @@ -1008,6 +1126,10 @@ protected Coder getDefaultOutputCoder() { } public PubsubTopic getTopic() { + return topic.get(); + } + + public ValueProvider getTopicProvider() { return topic; } From 78b3b8208275fb46aaf4461de523a2edd66b51a2 Mon Sep 17 00:00:00 2001 From: sammcveety Date: Tue, 13 Dec 2016 06:55:16 -0800 Subject: [PATCH 02/11] Update PubsubUnboundedSink.java --- .../dataflow/sdk/io/PubsubUnboundedSink.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java index 359af380d2..b86899804d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java @@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.ValueProvider; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; @@ -207,7 +208,7 @@ public void populateDisplayData(Builder builder) { private static class WriterFn extends DoFn>, Void> { private final PubsubClientFactory pubsubFactory; - private final TopicPath topic; + private final ValueProvider topic; private final String timestampLabel; private final String idLabel; private final int publishBatchSize; @@ -227,8 +228,8 @@ private static class WriterFn createAggregator("bytes", new Sum.SumLongFn()); WriterFn( - PubsubClientFactory pubsubFactory, TopicPath topic, String timestampLabel, - String idLabel, int publishBatchSize, int publishBatchBytes) { + PubsubClientFactory pubsubFactory, ValueProvider topic, + String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.timestampLabel = timestampLabel; @@ -243,7 +244,7 @@ private static class WriterFn */ private void publishBatch(List messages, int bytes) throws IOException { - int n = pubsubClient.publish(topic, messages); + int n = pubsubClient.publish(topic.get(), messages); checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", messages.size(), n); batchCounter.addValue(1L); @@ -293,7 +294,11 @@ public void finishBundle(Context c) throws Exception { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("topic", topic.getPath())); + String topicString = + topic == null ? null + : topic.isAccessible() ? topic.get().getPath() + : topic.toString(); + builder.add(DisplayData.item("topic", topicString)); builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); @@ -312,7 +317,7 @@ public void populateDisplayData(Builder builder) { /** * Pubsub topic to publish to. */ - private final TopicPath topic; + private final ValueProvider topic; /** * Coder for elements. It is the responsibility of the underlying Pubsub transport to @@ -366,7 +371,7 @@ public void populateDisplayData(Builder builder) { @VisibleForTesting PubsubUnboundedSink( PubsubClientFactory pubsubFactory, - TopicPath topic, + ValueProvider topic, Coder elementCoder, String timestampLabel, String idLabel, @@ -400,7 +405,7 @@ public PubsubUnboundedSink( } public TopicPath getTopic() { - return topic; + return topic.get(); } @Nullable From 65cf79130b7e8e5ad4970389cca9f4dc0c921c35 Mon Sep 17 00:00:00 2001 From: sammcveety Date: Tue, 13 Dec 2016 07:01:55 -0800 Subject: [PATCH 03/11] Update PubsubUnboundedSource.java --- .../sdk/io/PubsubUnboundedSource.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java index 6910824287..7c28beac01 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java @@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.ValueProvider; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.DoFn; @@ -1111,7 +1112,7 @@ private static class StatsFn extends DoFn { createAggregator("elements", new Sum.SumLongFn()); private final PubsubClientFactory pubsubFactory; - private final SubscriptionPath subscription; + private final ValueProvider subscription; @Nullable private final String timestampLabel; @Nullable @@ -1119,7 +1120,7 @@ private static class StatsFn extends DoFn { public StatsFn( PubsubClientFactory pubsubFactory, - SubscriptionPath subscription, + ValueProvider subscription, @Nullable String timestampLabel, @Nullable @@ -1139,7 +1140,11 @@ public void processElement(ProcessContext c) throws Exception { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("subscription", subscription.getPath())); + String subscriptionString = + subscription == null ? null + : subscription.isAccessible() ? subscription.get().getPath() + : subscription.toString(); + builder.add(DisplayData.item("subscription", subscriptionString)); builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); @@ -1165,14 +1170,14 @@ public void populateDisplayData(Builder builder) { * Project under which to create a subscription if only the {@link #topic} was given. */ @Nullable - private final ProjectPath project; + private final ValueProvider project; /** * Topic to read from. If {@literal null}, then {@link #subscription} must be given. * Otherwise {@link #subscription} must be null. */ @Nullable - private final TopicPath topic; + private final ValueProvider topic; /** * Subscription to read from. If {@literal null} then {@link #topic} must be given. @@ -1183,7 +1188,7 @@ public void populateDisplayData(Builder builder) { * subscription is never deleted. */ @Nullable - private SubscriptionPath subscription; + private ValueProvider subscription; /** * Coder for elements. Elements are effectively double-encoded: first to a byte array @@ -1210,9 +1215,9 @@ public void populateDisplayData(Builder builder) { PubsubUnboundedSource( Clock clock, PubsubClientFactory pubsubFactory, - @Nullable ProjectPath project, - @Nullable TopicPath topic, - @Nullable SubscriptionPath subscription, + @Nullable ValueProvider project, + @Nullable ValueProvider topic, + @Nullable ValueProvider subscription, Coder elementCoder, @Nullable String timestampLabel, @Nullable String idLabel) { @@ -1235,9 +1240,9 @@ public void populateDisplayData(Builder builder) { */ public PubsubUnboundedSource( PubsubClientFactory pubsubFactory, - @Nullable ProjectPath project, - @Nullable TopicPath topic, - @Nullable SubscriptionPath subscription, + @Nullable ValueProvider project, + @Nullable ValueProvider topic, + @Nullable ValueProvider subscription, Coder elementCoder, @Nullable String timestampLabel, @Nullable String idLabel) { @@ -1250,17 +1255,17 @@ public Coder getElementCoder() { @Nullable public ProjectPath getProject() { - return project; + return project == null ? null : project.get(); } @Nullable public TopicPath getTopic() { - return topic; + return topic == null ? null : topic.get(); } @Nullable public SubscriptionPath getSubscription() { - return subscription; + return subscription == null ? null : subscription.get(); } @Nullable @@ -1282,8 +1287,11 @@ public PCollection apply(PBegin input) { input.getPipeline() .getOptions() .as(DataflowPipelineOptions.class))) { - subscription = - pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC); + checkState(project.isAccessible(), "createRandomSubscription must be called at runtime."); + checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime."); + SubscriptionPath subscriptionPath = + pubsubClient.createRandomSubscription( + project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC); LOG.warn("Created subscription {} to topic {}." + " Note this subscription WILL NOT be deleted when the pipeline terminates", subscription, topic); From 5d7255d05ef656a022c2d1d4b1ea447197b3d12f Mon Sep 17 00:00:00 2001 From: sammcveety Date: Tue, 13 Dec 2016 07:01:59 -0800 Subject: [PATCH 04/11] Update PubsubIOTest.java --- .../cloud/dataflow/sdk/io/PubsubIOTest.java | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java index bbba56187d..9248d7ba7f 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java @@ -20,8 +20,11 @@ import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.transforms.display.DataflowDisplayDataEvaluator; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.display.DisplayDataEvaluator; @@ -69,30 +72,14 @@ public void testTopicValidationSuccess() throws Exception { .toString()); } - @Test - public void testTopicValidationBadCharacter() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc"); - } - - @Test - public void testTopicValidationTooLong() throws Exception { - thrown.expect(IllegalArgumentException.class); - PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") - .append("1111111111111111111111111111111111111111111111111111111111111111111111111111") - .toString()); - } - @Test public void testReadDisplayData() { String topic = "projects/project/topics/topic"; String subscription = "projects/project/subscriptions/subscription"; Duration maxReadTime = Duration.standardMinutes(5); PubsubIO.Read.Bound read = PubsubIO.Read - .topic(topic) - .subscription(subscription) + .topic(StaticValueProvider.of(topic)) + .subscription(StaticValueProvider.of(subscription)) .timestampLabel("myTimestamp") .idLabel("myId") .maxNumRecords(1234) @@ -134,6 +121,26 @@ public void testPrimitiveWriteDisplayData() { displayData, hasItem(hasDisplayItem("topic"))); } + @Test + public void testNullTopic() { + String subscription = "projects/project/subscriptions/subscription"; + PubsubIO.Read.Bound read = PubsubIO.Read + .subscription(StaticValueProvider.of(subscription)); + assertNull(read.getTopic()); + assertNotNull(read.getSubscription()); + assertNotNull(DisplayData.from(read)); + } + + @Test + public void testNullSubscription() { + String topic = "projects/project/topics/topic"; + PubsubIO.Read.Bound read = PubsubIO.Read + .topic(StaticValueProvider.of(topic)); + assertNotNull(read.getTopic()); + assertNull(read.getSubscription()); + assertNotNull(DisplayData.from(read)); + } + @Test public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); From 83aa374e3e79f42dc2d53effb1823054979edf2d Mon Sep 17 00:00:00 2001 From: sammcveety Date: Tue, 13 Dec 2016 07:06:06 -0800 Subject: [PATCH 05/11] Update PubsubUnboundedSinkTest.java --- .../sdk/io/PubsubUnboundedSinkTest.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSinkTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSinkTest.java index ef95a643f9..287f07e52c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSinkTest.java @@ -18,6 +18,7 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSink.RecordIdMethod; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.testing.CoderProperties; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -82,9 +83,9 @@ public void sendOneMessage() throws IOException { PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { PubsubUnboundedSink sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), + TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, + Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp())) @@ -110,9 +111,9 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { PubsubUnboundedSink sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), + TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, + Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) @@ -144,9 +145,10 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { PubsubUnboundedSink sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), + StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) From 6487612b95789110d6a655e7dc9fe1a0fda7341c Mon Sep 17 00:00:00 2001 From: sammcveety Date: Tue, 13 Dec 2016 07:08:00 -0800 Subject: [PATCH 06/11] Update PubsubUnboundedSourceTest.java --- .../cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java index 0788347726..f7e4f863de 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java @@ -30,6 +30,7 @@ import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource.PubsubCheckpoint; import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource.PubsubReader; import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource.PubsubSource; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.testing.CoderProperties; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.util.CoderUtils; @@ -85,8 +86,9 @@ public long currentTimeMillis() { }; factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming); PubsubUnboundedSource source = - new PubsubUnboundedSource<>(clock, factory, null, null, SUBSCRIPTION, StringUtf8Coder.of(), - TIMESTAMP_LABEL, ID_LABEL); + new PubsubUnboundedSource<>( + clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION), + StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL); primSource = new PubsubSource<>(source); } From e55517c3d393ff6f967dea4406b64e83a44fa2ac Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 13 Dec 2016 07:37:33 -0800 Subject: [PATCH 07/11] Fixups --- .../com/google/cloud/dataflow/sdk/io/PubsubIO.java | 7 ++++--- .../cloud/dataflow/sdk/io/PubsubUnboundedSink.java | 2 +- .../cloud/dataflow/sdk/io/PubsubUnboundedSource.java | 10 +++++----- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index 2bec7012dd..ce0e392073 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -31,6 +31,7 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; import com.google.cloud.dataflow.sdk.util.CoderUtils; @@ -432,14 +433,14 @@ public static class Read { * name. */ public static Bound named(String name) { - return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic)); + return new Bound<>(DEFAULT_PUBSUB_CODER).named(name); } /** * Like {@code topic()} but with a {@link ValueProvider}. */ public static Bound topic(ValueProvider topic) { - return new Bound<>(DEFAULT_PUBSUB_CODER).named(name); + return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic); } /** @@ -454,7 +455,7 @@ public static Bound topic(ValueProvider topic) { * Dataflow. */ public static Bound topic(String topic) { - return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic); + return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic)); } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java index b86899804d..0f7e24fa46 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java @@ -394,7 +394,7 @@ public void populateDisplayData(Builder builder) { public PubsubUnboundedSink( PubsubClientFactory pubsubFactory, - TopicPath topic, + ValueProvider topic, Coder elementCoder, String timestampLabel, String idLabel, diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java index 7c28beac01..4f3aea9e76 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java @@ -609,7 +609,7 @@ PubsubClient getPubsubClient() { * CAUTION: Retains {@code ackIds}. */ void ackBatch(List ackIds) throws IOException { - pubsubClient.acknowledge(outer.outer.subscription, ackIds); + pubsubClient.acknowledge(outer.outer.subscription.get(), ackIds); ackedIds.add(ackIds); } @@ -619,7 +619,7 @@ void ackBatch(List ackIds) throws IOException { * with the given {@code ockIds}. Does not retain {@code ackIds}. */ public void nackBatch(long nowMsSinceEpoch, List ackIds) throws IOException { - pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 0); + pubsubClient.modifyAckDeadline(outer.outer.subscription.get(), ackIds, 0); numNacked.add(nowMsSinceEpoch, ackIds.size()); } @@ -630,7 +630,7 @@ public void nackBatch(long nowMsSinceEpoch, List ackIds) throws IOExcept */ private void extendBatch(long nowMsSinceEpoch, List ackIds) throws IOException { int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000); - pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, extensionSec); + pubsubClient.modifyAckDeadline(outer.outer.subscription.get(), ackIds, extensionSec); numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size()); } @@ -766,7 +766,7 @@ private void pull() throws IOException { // BLOCKs until received. Collection receivedMessages = pubsubClient.pull(requestTimeMsSinceEpoch, - outer.outer.subscription, + outer.outer.subscription.get(), PULL_BATCH_SIZE, true); if (receivedMessages.isEmpty()) { // Nothing available yet. Try again later. @@ -872,7 +872,7 @@ private void stats() { @Override public boolean start() throws IOException { // Determine the ack timeout. - ackTimeoutMs = pubsubClient.ackDeadlineSeconds(outer.outer.subscription) * 1000; + ackTimeoutMs = pubsubClient.ackDeadlineSeconds(outer.outer.subscription.get()) * 1000; return advance(); } From 11a718749b6932f19be91c33ca33309f3d5cf2e3 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 13 Dec 2016 07:40:48 -0800 Subject: [PATCH 08/11] Fixups --- .../cloud/dataflow/sdk/io/PubsubUnboundedSink.java | 4 ++++ .../cloud/dataflow/sdk/io/PubsubUnboundedSource.java | 10 ++++++++++ 2 files changed, 14 insertions(+) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java index 0f7e24fa46..26ffb9e11b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSink.java @@ -408,6 +408,10 @@ public TopicPath getTopic() { return topic.get(); } + public ValueProvider getTopicProvider() { + return topic; + } + @Nullable public String getTimestampLabel() { return timestampLabel; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java index 4f3aea9e76..575fe39771 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java @@ -1263,11 +1263,21 @@ public TopicPath getTopic() { return topic == null ? null : topic.get(); } + @Nullable + public ValueProvider getTopicProvider() { + return topic; + } + @Nullable public SubscriptionPath getSubscription() { return subscription == null ? null : subscription.get(); } + @Nullable + public ValueProvider getSubscriptionProvider() { + return subscription; + } + @Nullable public String getTimestampLabel() { return timestampLabel; From f29510a443a781c147c90badb9b2c3a9f8e866f4 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 13 Dec 2016 08:36:41 -0800 Subject: [PATCH 09/11] Update native IO --- .../sdk/runners/DataflowPipelineRunner.java | 37 +++++++++++++++---- .../dataflow/sdk/util/PropertyNames.java | 2 + 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 318d0abd20..8955e9ffa3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -73,6 +73,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; import com.google.cloud.dataflow.sdk.options.StreamingOptions; +import com.google.cloud.dataflow.sdk.options.ValueProvider.NestedValueProvider; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; @@ -2376,14 +2377,27 @@ private void translateTyped( "StreamingPubsubIORead is only for streaming pipelines."); context.addStep(transform, "ParallelRead"); context.addInput(PropertyNames.FORMAT, "pubsub"); - if (transform.getTopic() != null) { - context.addInput(PropertyNames.PUBSUB_TOPIC, - transform.getTopic().asV1Beta1Path()); + if (transform.getTopicProvider() != null) { + if (transform.getTopicProvider().isAccessible()) { + context.addInput( + PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path()); + } else { + context.addInput( + PropertyNames.PUBSUB_TOPIC_OVERRIDE, + ((NestedValueProvider) transform.getTopicProvider()).propertyName()); + } } - if (transform.getSubscription() != null) { - context.addInput( - PropertyNames.PUBSUB_SUBSCRIPTION, - transform.getSubscription().asV1Beta1Path()); + if (transform.getSubscriptionProvider() != null) { + if (transform.getSubscriptionProvider().isAccessible()) { + context.addInput( + PropertyNames.PUBSUB_SUBSCRIPTION, + transform.getSubscription().asV1Beta1Path()); + } else { + context.addInput( + PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE, + ((NestedValueProvider) transform.getSubscriptionProvider()) + .propertyName()); + } } if (transform.getTimestampLabel() != null) { context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, @@ -2454,7 +2468,14 @@ private void translateTyped( PubsubIO.Write.Bound overriddenTransform = transform.getOverriddenTransform(); context.addStep(transform, "ParallelWrite"); context.addInput(PropertyNames.FORMAT, "pubsub"); - context.addInput(PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().asV1Beta1Path()); + if (overriddenTransform.getTopicProvider().isAccessible()) { + context.addInput( + PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().asV1Beta1Path()); + } else { + context.addInput( + PropertyNames.PUBSUB_TOPIC_OVERRIDE, + ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); + } if (overriddenTransform.getTimestampLabel() != null) { context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel()); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java index 75c21e6e66..5771fa44e4 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java @@ -83,8 +83,10 @@ public class PropertyNames { public static final String PHASE = "phase"; public static final String PUBSUB_ID_LABEL = "pubsub_id_label"; public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription"; + public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override"; public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label"; public static final String PUBSUB_TOPIC = "pubsub_topic"; + public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override"; public static final String SCALAR_FIELD_NAME = "value"; public static final String SERIALIZED_FN = "serialized_fn"; public static final String SHARD_NAME_TEMPLATE = "shard_template"; From 3d1b852276e86ade02b1ef72235a302edb63eaff Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Wed, 14 Dec 2016 13:28:12 -0800 Subject: [PATCH 10/11] Undo another merge edit --- .../main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index 038604ee9d..55dd24f6b7 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -1123,9 +1123,7 @@ public PDone apply(PCollection input) { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String topicString = topic.isAccessible() - ? topic.get().asPath() : topic.toString(); - populateCommonDisplayData(builder, timestampLabel, idLabel, topicString); + populateCommonDisplayData(builder, timestampLabel, idLabel, topic); } @Override From 569f64b87d294290aba6a7bce47dbf566dcf415d Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Wed, 14 Dec 2016 13:29:19 -0800 Subject: [PATCH 11/11] And undo another merge conflict --- .../cloud/dataflow/sdk/io/PubsubIOTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java index 9248d7ba7f..fbb563e4d7 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java @@ -72,6 +72,22 @@ public void testTopicValidationSuccess() throws Exception { .toString()); } + @Test + public void testTopicValidationBadCharacter() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc"); + } + + @Test + public void testTopicValidationTooLong() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-") + .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append("1111111111111111111111111111111111111111111111111111111111111111111111111111") + .toString()); + } + @Test public void testReadDisplayData() { String topic = "projects/project/topics/topic";