Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Transform does not have a stable unique name #51

Closed
lbergelson opened this issue Jul 29, 2015 · 5 comments
Closed

Transform does not have a stable unique name #51

lbergelson opened this issue Jul 29, 2015 · 5 comments

Comments

@lbergelson
Copy link

After updating to dataflow 0.4.20150727 we've started getting this exception at runtime. It seems to happen nearly every time we apply any transform without explicitly giving it a name. Is this the intent?

This entirely reasonable code now results in an unintuitive runtime crash:

   @Test
    public void exampleFailure(){
        Pipeline p = TestPipeline.create();
        final PCollection<Integer> pInts1 = p.apply(Create.of(Arrays.asList(1, 2, 3)));
        final PCollection<Integer> pInts2 = p.apply(Create.of(Arrays.asList(1, 2, 3)));
        p.run();
    }
java.lang.IllegalStateException: Transform Create.Values2 does not have a stable unique name. This will prevent reloading of pipelines.
    at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:330)
    at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:253)
    at com.google.cloud.dataflow.sdk.values.PBegin.apply(PBegin.java:48)
    at com.google.cloud.dataflow.sdk.Pipeline.apply(Pipeline.java:137)
    at org.broadinstitute.hellbender.engine.dataflow.transforms.GetOverlappingReadsAndVariantsUnitTest.exampleFailure(GetOverlappingReadsAndVariantsUnitTest.java:63)

Adding some boilerplate fixes it

    @Test
    public void exampleFailure(){
        Pipeline p = TestPipeline.create();
        final PCollection<Integer> pInts1 = p.apply("Create.Values1",Create.of(Arrays.asList(1, 2, 3)));
        final PCollection<Integer> pInts2 = p.apply("Create.Values2",Create.of(Arrays.asList(1, 2, 3)));
        p.run();
    }

but this should be unnecessary since I gave it the same name it had already inferred for that transform.

Three questions:

  1. Is this a bug? If it's working as intended then the single parameter version of apply should really be removed since it's highly unsafe.
  2. Is it safe to use a randomly generated name?
    @Test
    public void exampleFailure(){
        Pipeline p = TestPipeline.create();
        final PCollection<Integer> pInts1 = p.apply(UUID.randomUUID().toString(),Create.of(Arrays.asList(1, 2, 3)));
        final PCollection<Integer> pInts2 = p.apply(UUID.randomUUID().toString(),Create.of(Arrays.asList(1, 2, 3)));
        p.run();
    }

I assume it's not because it says "stable". What's the danger though? If a randomly generated name isn't ok, is it what should we do for programmatically generated transforms? Is an incremented counter ok?

  1. Could you better define "stable", "unique", and "reload pipeline"? Is that globabally unique? Unique per pipeline run? What does it mean to reload a pipeline? Is that something that happens by magic behind the scenes or is that something we'd have to initiate manually?
@bjchambers
Copy link
Contributor

Thanks for letting us know about this -- we're going to work on improving the documentation on this and such, but in the meantime here is some more information about the error.

By default the missing stable unique names is a warning not an error, but in your example the use of TestPipeline enables the --stableUniqueNames=ERROR option which treats these as errors. We do this so that we're testing in the most restricted environment possible and don't introduce any warnings that will lead to trouble for users.

  • Unique: The fully scoped name of the transform application needs to be unique within the pipeline. We generate the fully scoped name from the name of the PTransform#apply method we're currently in plus the name of the PTransform being applied. In your case, you have two instances of Create being applied at the top level. The system generates a suffix to deduplicate these applications.

The single parameter version of apply is safe if you don't have the same PTransform being applied multiple times in the same context.

  • Stable: The name doesn't change across invocations of the pipeline. If there is a conflict, the system will generate a unique name by adding a suffix. This isn't stable since re-ordering of the applications would lead to different names. Similarly, randomly generated names won't be stable, but the fact that you provide a string will trick the system into thinking it was stable (since it can't tell the difference between "stableName", and UUID.randomUUID().toString()).
  • Reload / Update is an upcoming feature that will compare Job graphs between invocations, and these stable/unique names are necessary for this comparison. We put this warning in early to avoid users building pipelines that will have future problems when these features are available.

@lbergelson
Copy link
Author

Ah, thank you for the information. I didn't realize it was possible to downgrade it to a warning again.

I disagree with the description of apply as safe. It's safe as long as I have analyzed the entire call stack to ensure that no one else ever calls the same built in transform, which is not a reasonable thing to have to do. It means that your code's validity depends on the private implementation details of everyone else's code.

    @Test
    public void exampleFailure(){
        Pipeline p = TestPipeline.create();
        final PCollection<String> pStrings = getSomeStrings(p, Arrays.asList("john", "jane", "jahne"));
        final PCollection<Integer> pInts2 = getSomeInts(p, Arrays.asList(4,5,6));
        p.run();
    }

    private PCollection<Integer> getSomeInts(Pipeline p, List<Integer> ints){
        return p.apply(Create.of(ints));
    }

    private PCollection<String> getSomeStrings(Pipeline p, List<String> strings){
        return p.apply(Create.of(strings));
    }

Each of those functions works fine in isolation, but together they break. The only way I see to fix it is to add a mandatory name parameter to them and then thread it through all of your function calls.

Since the uniqueness requirement is easily handled by the system, would it be possible to make the pipeline creation ordering be deterministic in order to satisfy the stability requirement as well? It seems like making this the users responsibility is going to create a large amount of boiler plate and tricky context dependent errors that are very difficult to unit test for.

@bjchambers
Copy link
Contributor

Regarding safety of apply: The names are all scoped to the containing PTransform. Rather than writing functions such as getSomeInts(), re-usable components should be packaged into a PTransform.

private static class GetSomeInts extends PTransform<PInput, PCollection<Integer>> {
  private List<Integer> ints;
  public GetSomeInts(List<Integer> ints) {
    this.ints = ints;
  }  

  @Override
  public PCollection<Integer> apply(PInput input) {
    return input.apply(Create.of(ints));
  }
}

If you apply(new GetSomeInts(Arrays.asList(5, 6, 7))) the full name of the create in there will be GetSomeInts/Create.Values. Each PTransform creates a new namespace. This allows you to not worry about the implementation details of PTransforms that you apply, as long as they have unique names within the context that you applied them.


The unique names are currently assigned deterministically. The reason this isn't enough, is that it doesn't satisfy stability. Consider the following code:

...
    .apply(Create.of(Arrays.asList(5, 6, 7))          // Create.Values1
...
    .apply(Create.of(Arrays.asList("hello", "world")) // Create.Values2

If you change that to:

...
    .apply(Create.of(Arrays.asList("hello", "world")) // Create.Values1
...
    .apply(Create.of(Arrays.asList(5, 6, 7))          // Create.Values2

The names have changed which would prevent updating. But, the pipeline hasn't actually changed, you just reordered some code.

@lbergelson
Copy link
Author

I see what you're saying about scoping now. It's not intuitive that it's necessary to wrap everything into it's own PTransform instead of the less bulky function call.

Apply still seems rather dangerous to me, since very simple and reasonable things are now runtime errors.

    @Test void anotherFailure() {
        SerializableFunction<Integer,Boolean> pred = a -> a > 1;
        Pipeline p = TestPipeline.create();
        final PCollection<Integer> pInts = p.apply(Create.of(Arrays.asList(1,2,3)));
        final PCollection<Integer> pFiltered = pInts.apply(Filter.by(pred)).apply(Filter.by(a -> a < 3));
        p.run();
    }

In the case of reordering the code and wanting to somehow restart it without change, it seems like you could perform an analysis of the graph and produce a unique identifier based on the transforms place in the topology rather than the name of the function.

I'm not sure you want the name of a transform to be tied to it's functionality either. The name is the human readable identifier, which should be entirely distinct from any sort of functional identifier. That said, I'm not sure I entirely understand how reloading a pipeline in intended to work so I may be confused about what the name is for.

@dhalperi
Copy link
Contributor

Hi Louis,

The documentation around updating pipelines has been much improved since we last gave you an update. I hope that is able to provide some insight into that functionality.

I can see why the need to provide names can be frustrating, which is why we let you downgrade to the warning. We've found that human-readable names are useful in several places (including in the Dataflow Monitoring UI) and are less error-prone than other automatic techniques.

In your sample pipeline, you might decide to reorder the filters (B then A instead of A then B), because you've determined after running the job for a while that the second filter drops many more elements than the first. It is crucially important that we catch this change during a job update, or elements that were checkpointed between A and B in the first run would be reloaded between B and A in the second run -- thus going through A twice and never through B.

I don't think that this is blocking you any more, so I'll close this issue. Please reopen with questions or if there is more work to be done here.

Thanks!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants