Skip to content

Commit

Permalink
This closes #1788
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Jan 24, 2017
2 parents 338012d + 0e1893a commit 6ecbfb9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,22 +290,27 @@ private void setOutput(POutput output) {
for (TaggedPValue outputValue : output.expand()) {
outputProducers.add(getProducer(outputValue.getValue()));
}
if (outputProducers.contains(this) && outputProducers.size() != 1) {
Set<String> otherProducerNames = new HashSet<>();
for (Node outputProducer : outputProducers) {
if (outputProducer != this) {
otherProducerNames.add(outputProducer.getFullName());
if (outputProducers.contains(this)) {
if (!parts.isEmpty() || outputProducers.size() > 1) {
Set<String> otherProducerNames = new HashSet<>();
for (Node outputProducer : outputProducers) {
if (outputProducer != this) {
otherProducerNames.add(outputProducer.getFullName());
}
}
throw new IllegalArgumentException(
String.format(
"Output of composite transform [%s] contains a primitive %s produced by it. "
+ "Only primitive transforms are permitted to produce primitive outputs."
+ "%n Outputs: %s"
+ "%n Other Producers: %s"
+ "%n Components: %s",
getFullName(),
POutput.class.getSimpleName(),
output.expand(),
otherProducerNames,
parts));
}
throw new IllegalArgumentException(
String.format(
"Output of transform [%s] contains a %s produced by it as well as other "
+ "Transforms. A primitive transform must produce all of its outputs, and "
+ "outputs of a composite transform must be produced by a component transform "
+ "or be part of the input."
+ "%n Other Outputs: %s"
+ "%n Other Producers: %s",
getFullName(), POutput.class.getSimpleName(), output.expand(), otherProducerNames));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,39 @@ public PCollectionList<Long> expand(PCollectionList<Long> input) {
}
});
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("produced by it as well as other Transforms");
thrown.expectMessage("primitive transform must produce all of its outputs");
thrown.expectMessage("composite transform must be produced by a component transform");
thrown.expectMessage("contains a primitive POutput produced by it");
thrown.expectMessage("AddPc");
thrown.expectMessage("Create");
thrown.expectMessage(appended.expand().toString());
hierarchy.setOutput(appended);
}

@Test
public void producingOwnOutputWithCompositeFails() {
final PCollection<Long> comp =
PCollection.createPrimitiveOutputInternal(
pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
PTransform<PBegin, PCollection<Long>> root =
new PTransform<PBegin, PCollection<Long>>() {
@Override
public PCollection<Long> expand(PBegin input) {
return comp;
}
};
hierarchy.pushNode("Composite", PBegin.in(pipeline), root);

Create.Values<Integer> create = Create.of(1);
hierarchy.pushNode("Create", PBegin.in(pipeline), create);
hierarchy.setOutput(pipeline.apply(create));
hierarchy.popNode();

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("contains a primitive POutput produced by it");
thrown.expectMessage("primitive transforms are permitted to produce");
thrown.expectMessage("Composite");
hierarchy.setOutput(comp);
}

@Test
public void visitVisitsAllPushed() {
TransformHierarchy.Node root = hierarchy.getCurrent();
Expand Down

0 comments on commit 6ecbfb9

Please sign in to comment.