Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
import com.google.cloud.dataflow.sdk.options.StreamingOptions;
import com.google.cloud.dataflow.sdk.options.ValueProvider.NestedValueProvider;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
Expand Down Expand Up @@ -2379,14 +2380,27 @@ private <T> void translateTyped(
"StreamingPubsubIORead is only for streaming pipelines.");
context.addStep(transform, "ParallelRead");
context.addInput(PropertyNames.FORMAT, "pubsub");
if (transform.getTopic() != null) {
context.addInput(PropertyNames.PUBSUB_TOPIC,
transform.getTopic().asV1Beta1Path());
if (transform.getTopicProvider() != null) {
if (transform.getTopicProvider().isAccessible()) {
context.addInput(
PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
} else {
context.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) transform.getTopicProvider()).propertyName());
}
}
if (transform.getSubscription() != null) {
context.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION,
transform.getSubscription().asV1Beta1Path());
if (transform.getSubscriptionProvider() != null) {
if (transform.getSubscriptionProvider().isAccessible()) {
context.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION,
transform.getSubscription().asV1Beta1Path());
} else {
context.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
((NestedValueProvider) transform.getSubscriptionProvider())
.propertyName());
}
}
if (transform.getTimestampLabel() != null) {
context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
Expand Down Expand Up @@ -2457,7 +2471,14 @@ private <T> void translateTyped(
PubsubIO.Write.Bound<T> overriddenTransform = transform.getOverriddenTransform();
context.addStep(transform, "ParallelWrite");
context.addInput(PropertyNames.FORMAT, "pubsub");
context.addInput(PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().asV1Beta1Path());
if (overriddenTransform.getTopicProvider().isAccessible()) {
context.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().asV1Beta1Path());
} else {
context.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
if (overriddenTransform.getTimestampLabel() != null) {
context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
overriddenTransform.getTimestampLabel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ public class PropertyNames {
public static final String PHASE = "phase";
public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
public static final String PUBSUB_TOPIC = "pubsub_topic";
public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";
public static final String SCALAR_FIELD_NAME = "value";
public static final String SERIALIZED_FN = "serialized_fn";
public static final String SHARD_NAME_TEMPLATE = "shard_template";
Expand Down