Skip to content

Commit

Permalink
Provide local tags in PInput, POutput expansions
Browse files Browse the repository at this point in the history
Output an ordered colleciton in PInput and POutput expansions.

This provides information that is necessary to reconstruct a PInput
or POutput from its expansion.
  • Loading branch information
tgroh committed Dec 12, 2016
1 parent 4373937 commit 60410f9
Show file tree
Hide file tree
Showing 19 changed files with 304 additions and 96 deletions.
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
Expand Down Expand Up @@ -79,14 +80,16 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) {

@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
toFinalize.removeAll(node.getInputs());
for (TaggedPValue consumed : node.getInputs()) {
toFinalize.remove(consumed.getValue());
}
AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
stepNames.put(appliedTransform, genStepName());
if (node.getInputs().isEmpty()) {
rootTransforms.add(appliedTransform);
} else {
for (PValue value : node.getInputs()) {
primitiveConsumers.put(value, appliedTransform);
for (TaggedPValue value : node.getInputs()) {
primitiveConsumers.put(value.getValue(), appliedTransform);
}
}
}
Expand All @@ -96,17 +99,12 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) {
toFinalize.add(value);

AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
if (value instanceof PCollectionView) {
views.add((PCollectionView<?>) value);
}
if (!producers.containsKey(value)) {
producers.put(value, appliedTransform);
}
for (PValue expandedValue : value.expand()) {
if (expandedValue instanceof PCollectionView) {
views.add((PCollectionView<?>) expandedValue);
}
if (!producers.containsKey(expandedValue)) {
producers.put(value, appliedTransform);
}
}
}

private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -399,9 +400,9 @@ public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
}
// If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
// the PTransform may produce additional output. It is not done.
for (PValue output : transform.getOutput().expand()) {
if (output instanceof PCollection) {
IsBounded bounded = ((PCollection<?>) output).isBounded();
for (TaggedPValue output : transform.getOutput().expand()) {
if (output.getValue() instanceof PCollection) {
IsBounded bounded = ((PCollection<?>) output.getValue()).isBounded();
if (bounded.equals(IsBounded.UNBOUNDED)
&& !options.isShutdownUnboundedProducersWithMaxWatermark()) {
return false;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
Expand Down Expand Up @@ -74,7 +75,9 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) {
if (node.isRootNode()) {
finalized = true;
} else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
keyedValues.addAll(node.getOutputs());
for (TaggedPValue output : node.getOutputs()) {
keyedValues.add(output.getValue());
}
}
}

Expand All @@ -84,7 +87,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
if (producesKeyedOutputs.contains(producer.getTransform().getClass())) {
keyedValues.addAll(value.expand());
keyedValues.add(value);
}
}

Expand Down
Expand Up @@ -57,7 +57,7 @@
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -755,27 +755,28 @@ private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> tra

private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
Collection<? extends PValue> inputs = transform.getInput().expand();
List<TaggedPValue> inputs = transform.getInput().expand();
if (inputs.isEmpty()) {
inputWmsBuilder.add(THE_END_OF_TIME);
}
for (PValue pvalue : inputs) {
for (TaggedPValue pvalue : inputs) {
Watermark producerOutputWatermark =
getTransformWatermark(graph.getProducer(pvalue)).synchronizedProcessingOutputWatermark;
getTransformWatermark(graph.getProducer(pvalue.getValue()))
.synchronizedProcessingOutputWatermark;
inputWmsBuilder.add(producerOutputWatermark);
}
return inputWmsBuilder.build();
}

private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
Collection<? extends PValue> inputs = transform.getInput().expand();
List<TaggedPValue> inputs = transform.getInput().expand();
if (inputs.isEmpty()) {
inputWatermarksBuilder.add(THE_END_OF_TIME);
}
for (PValue pvalue : inputs) {
for (TaggedPValue pvalue : inputs) {
Watermark producerOutputWatermark =
getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
getTransformWatermark(graph.getProducer(pvalue.getValue())).outputWatermark;
inputWatermarksBuilder.add(producerOutputWatermark);
}
List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
Expand Down Expand Up @@ -959,8 +960,8 @@ synchronized void refreshAll() {
WatermarkUpdate updateResult = myWatermarks.refresh();
if (updateResult.isAdvanced()) {
Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
for (PValue outputPValue : toRefresh.getOutput().expand()) {
additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue));
for (TaggedPValue outputPValue : toRefresh.getOutput().expand()) {
additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue.getValue()));
}
return additionalRefreshes;
}
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -282,7 +283,7 @@ private boolean shouldDefer(TransformHierarchy.Node node) {
if (node.getInputs().size() != 1) {
return false;
}
PValue input = Iterables.getOnlyElement(node.getInputs());
PValue input = Iterables.getOnlyElement(node.getInputs()).getValue();
if (!(input instanceof PCollection)
|| ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
return false;
Expand Down Expand Up @@ -338,7 +339,7 @@ TransformEvaluator<TransformT> translate(
//--- determine if node is bounded/unbounded.
// usually, the input determines if the PCollection to apply the next transformation to
// is BOUNDED or UNBOUNDED, meaning RDD/DStream.
Collection<? extends PValue> pValues;
Collection<TaggedPValue> pValues;
if (node.getInputs().isEmpty()) {
// in case of a PBegin, it's the output.
pValues = node.getOutputs();
Expand All @@ -353,15 +354,15 @@ TransformEvaluator<TransformT> translate(
: translator.translateUnbounded(transformClass);
}

private PCollection.IsBounded isBoundedCollection(Collection<? extends PValue> pValues) {
private PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) {
// anything that is not a PCollection, is BOUNDED.
// For PCollections:
// BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED
// while BOUNDED + UNBOUNDED = UNBOUNDED.
PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
for (PValue pValue: pValues) {
if (pValue instanceof PCollection) {
isBounded = isBounded.and(((PCollection) pValue).isBounded());
for (TaggedPValue pValue: pValues) {
if (pValue.getValue() instanceof PCollection) {
isBounded = isBounded.and(((PCollection) pValue.getValue()).isBounded());
} else {
isBounded = isBounded.and(PCollection.IsBounded.BOUNDED);
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* Captures information about a collection of transformations and their
Expand Down Expand Up @@ -84,10 +85,12 @@ public Node pushNode(String name, PInput input, PTransform<?, ?> transform) {
*/
public void finishSpecifyingInput() {
// Inputs must be completely specified before they are consumed by a transform.
for (PValue inputValue : current.getInputs()) {
inputValue.finishSpecifying();
checkState(producers.get(inputValue) != null, "Producer unknown for input %s", inputValue);
inputValue.finishSpecifying();
for (TaggedPValue inputValue : current.getInputs()) {
inputValue.getValue().finishSpecifying();
checkState(
producers.get(inputValue.getValue()) != null,
"Producer unknown for input %s",
inputValue.getValue());
}
}

Expand All @@ -103,9 +106,9 @@ public void finishSpecifyingInput() {
*/
public void setOutput(POutput output) {
output.finishSpecifyingOutput();
for (PValue value : output.expand()) {
if (!producers.containsKey(value)) {
producers.put(value, current);
for (TaggedPValue value : output.expand()) {
if (!producers.containsKey(value.getValue())) {
producers.put(value.getValue(), current);
}
}
current.setOutput(output);
Expand Down Expand Up @@ -133,8 +136,8 @@ Node getProducer(PValue produced) {
*/
List<Node> getProducingTransforms(POutput output) {
List<Node> producingTransforms = new ArrayList<>();
for (PValue value : output.expand()) {
Node producer = getProducer(value);
for (TaggedPValue value : output.expand()) {
Node producer = getProducer(value.getValue());
if (producer != null) {
producingTransforms.add(producer);
}
Expand Down Expand Up @@ -238,8 +241,8 @@ public boolean isCompositeNode() {
private boolean returnsOthersOutput() {
PTransform<?, ?> transform = getTransform();
if (output != null) {
for (PValue outputValue : output.expand()) {
if (!getProducer(outputValue).getTransform().equals(transform)) {
for (TaggedPValue outputValue : output.expand()) {
if (!getProducer(outputValue.getValue()).getTransform().equals(transform)) {
return true;
}
}
Expand All @@ -256,8 +259,8 @@ public String getFullName() {
}

/** Returns the transform input, in unexpanded form. */
public Collection<? extends PValue> getInputs() {
return input == null ? Collections.<PValue>emptyList() : input.expand();
public List<TaggedPValue> getInputs() {
return input == null ? Collections.<TaggedPValue>emptyList() : input.expand();
}

/**
Expand All @@ -273,8 +276,8 @@ private void setOutput(POutput output) {
// Validate that a primitive transform produces only primitive output, and a composite
// transform does not produce primitive output.
Set<Node> outputProducers = new HashSet<>();
for (PValue outputValue : output.expand()) {
outputProducers.add(getProducer(outputValue));
for (TaggedPValue outputValue : output.expand()) {
outputProducers.add(getProducer(outputValue.getValue()));
}
if (outputProducers.contains(this) && outputProducers.size() != 1) {
Set<String> otherProducerNames = new HashSet<>();
Expand All @@ -296,8 +299,8 @@ private void setOutput(POutput output) {
}

/** Returns the transform output, in unexpanded form. */
public Collection<? extends PValue> getOutputs() {
return output == null ? Collections.<PValue>emptyList() : output.expand();
public List<TaggedPValue> getOutputs() {
return output == null ? Collections.<TaggedPValue>emptyList() : output.expand();
}

/**
Expand All @@ -320,9 +323,9 @@ private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {

if (!isRootNode()) {
// Visit inputs.
for (PValue inputValue : input.expand()) {
if (visitedValues.add(inputValue)) {
visitor.visitValue(inputValue, getProducer(inputValue));
for (TaggedPValue inputValue : input.expand()) {
if (visitedValues.add(inputValue.getValue())) {
visitor.visitValue(inputValue.getValue(), getProducer(inputValue.getValue()));
}
}
}
Expand All @@ -342,9 +345,9 @@ private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {

if (!isRootNode()) {
// Visit outputs.
for (PValue pValue : output.expand()) {
if (visitedValues.add(pValue)) {
visitor.visitValue(pValue, this);
for (TaggedPValue pValue : output.expand()) {
if (visitedValues.add(pValue.getValue())) {
visitor.visitValue(pValue.getValue(), this);
}
}
}
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.transforms.join;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -28,7 +27,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;

Expand Down Expand Up @@ -121,10 +120,10 @@ public <OutputT extends POutput> OutputT apply(
* any tag-specific information.
*/
@Override
public Collection<? extends PValue> expand() {
List<PCollection<?>> retval = new ArrayList<>();
public List<TaggedPValue> expand() {
List<TaggedPValue> retval = new ArrayList<>();
for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) {
retval.add(taggedPCollection.pCollection);
retval.add(TaggedPValue.of(taggedPCollection.tupleTag, taggedPCollection.pCollection));
}
return retval;
}
Expand Down
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.values;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO.Read;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -64,7 +64,7 @@ public Pipeline getPipeline() {
}

@Override
public Collection<? extends PValue> expand() {
public List<TaggedPValue> expand() {
// A PBegin contains no PValues.
return Collections.emptyList();
}
Expand Down

0 comments on commit 60410f9

Please sign in to comment.