Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5145] Make PTransform names stable in Join/CoGroupByKey #6212

Merged
merged 2 commits into from Aug 15, 2018

Conversation

toelen
Copy link
Contributor

@toelen toelen commented Aug 13, 2018

In Join and CoGroupByKey there were some instances where apply was called without a name. This fixes this, so pipelines that uses these PTransforms have stable names.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
--- --- --- ---

@toelen toelen force-pushed the fix/stable-names-join-cogroupbykey branch from 78ac217 to a2118e1 Compare August 13, 2018 19:07
@toelen
Copy link
Contributor Author

toelen commented Aug 13, 2018

The failed test UnboundedEventSourceTest.resumeFromCheckpoint doesn't really seem related

@lukecwik
Copy link
Member

Run Java PreCommit

@lukecwik
Copy link
Member

lukecwik commented Aug 13, 2018

Normally names aren't required if there is exactly one type of transform being used inside of PTransform#expand as the default is to generate an approximation using NameUtils#approximatePTransformName. The approximation is always stable and will only produce a collision if more then one application of the same transform type is used.

Are you getting warnings that this isn't stable in some way?

@toelen
Copy link
Contributor Author

toelen commented Aug 13, 2018

We are inner joining a PCollection on itself in a recursive manner (a configurable level deep), and then TestPipeline gives an error that the transforms do not have stable unique names. We use pipeline.getOptions().setStableUniqueNames(PipelineOptions.CheckEnabled.OFF), but it would be nice if we can remove that line.

@lukecwik
Copy link
Member

So names are based upon the names of the parents.

apply("Foo", MyPTransform);
apply("Bar", MyPTransform);

should generate "Foo/MyPTransform/..." and "Bar/MyPTransform/..." names depending on the PTransforms used in expand(). Does the warning/error message give you the transform names that are conflicting?

Also, this change looks fine to me I'm just trying to better understand whether there is a class of bug through recursion that we are hitting which is a larger problem in general.

@lukecwik
Copy link
Member

Run Java PreCommit

@lukecwik
Copy link
Member

R: @lukecwik

@lukecwik lukecwik self-requested a review August 13, 2018 21:12
@lukecwik lukecwik changed the title Make PTransform names stable in Join/CoGroupByKey [BEAM-5145] Make PTransform names stable in Join/CoGroupByKey Aug 13, 2018
@toelen
Copy link
Contributor Author

toelen commented Aug 14, 2018

This is an example of the case we are seeing (in our case with Join and CoGroupByKey). If the setStableUniqueNames call is removed, the test breaks. When adding names in all PTransform.apply calls, the test runs fine again.

public class Mytest {
    @Rule
    public TestPipeline p = TestPipeline.create();

    @Before
    public void setUp() throws Exception {
        p.getOptions().setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
    }

    private static PTransform<PCollection<Integer>, PCollection<Integer>> mytransform() {
        return new PTransform<PCollection<Integer>, PCollection<Integer>>() {
            @Override
            public PCollection<Integer> expand(PCollection<Integer> input) {
                return input.apply(mytransform2())
                            .apply(mytransform2());
            }
        };
    }

    private static PTransform<PCollection<Integer>, PCollection<Integer>> mytransform2() {
        return new PTransform<PCollection<Integer>, PCollection<Integer>>() {
            @Override
            public PCollection<Integer> expand(PCollection<Integer> input) {
                return input;
            }
        };
    }

    @Test
    public void testRecursive() {
        PCollection<Integer> input1 = p.apply(Create.of(1, 2, 3));
        for (int i = 0; i < 3; i++) {
            input1 = input1.apply("my-transform-" + i, mytransform());
        }
        p.run();
    }
}

@lukecwik
Copy link
Member

Run Java PreCommit

@lukecwik
Copy link
Member

Thanks for the example.

Should the following apply lines also have a name?

                return input.apply(mytransform2())
                            .apply(mytransform2());

@toelen
Copy link
Contributor Author

toelen commented Aug 14, 2018

If I change

return input.apply(mytransform2()) .apply(mytransform2());

to

return input.apply("tr-1", mytransform2()) .apply("tr-2", mytransform2());

then the pipeline runs fine, and has stable/unique names. This is what I wanted to do inside Join/CoGroupByKey

@lukecwik
Copy link
Member

Thanks for the details, I'm just trying to get our tests to pass as several are flaky. I'll merge once I can get a green run.

@lukecwik lukecwik merged commit e2583f5 into apache:master Aug 15, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants