Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Datastore's Read fails Pipeline validation and throws on DataflowRunner #28034

Open
2 of 15 tasks
rafaelsms opened this issue Aug 16, 2023 · 9 comments
Open
2 of 15 tasks

Comments

@rafaelsms
Copy link

What happened?

Hello!

Sorry if this is a duplicate. To be honest, I don't know much about Apache Beam and Dataflow, so I am still learning and might be doing something wrong, let me know :)

I attempted to create a somewhat minimal example below. The exception is thrown by a pipeline that just reads an entry from Datastore when running on DataflowRunner. It works and succeeds when using DirectRunner.

The validation code was added in the PR #26675, throwing at this line.

Example code:

    private static Read buildQueryTransform(Options options, String kind, String column3Value) {
        Query.Builder queryBuilder = Query.newBuilder();
        queryBuilder.addKindBuilder().setName(kind);
        queryBuilder.addProjection(makeProjection(COLUMN_NAME_1));
        queryBuilder.addProjection(makeProjection(COLUMN_NAME_2));
        queryBuilder.setFilter(makeFilter(COLUMN_NAME_3, Operator.EQUAL, makeValue(column3Value)));
        Query query = queryBuilder.build();
        return DatastoreIO.v1().read().withProjectId(options.getDataset()).withQuery(query);
    }

    private static Projection.Builder makeProjection(String propertyName) {
        Projection.Builder prjBuilder = Projection.newBuilder();
        prjBuilder.setProperty(makePropertyReference(propertyName));
        return prjBuilder;
    }

    public static void main(String[] args) {
        // ...

        Pipeline pipeline = Pipeline.create(options);

        // ...

        Read reader = buildQueryTransform(options, sourceKind, value);
        pipeline.apply("Read " + value + " from " + sourceKind, reader);

        // ...

        PipelineResult result = pipeline.run(); // throws IllegalArgumentException

        // ...
    }

Exception thrown:

SEVERE: Unexpected error in Dataflow job
java.lang.IllegalArgumentException: Transform Read-[column 3 value]-from-[datastore kind name]-Create-Values-Read-CreateSource--Impulse is not a composite transform but does not have a specified URN. outputs {
  key: "org.apache.beam.sdk.values.PCollection.<init>:397#bb20b45fd4d95138"
  value: "Read [column 3 value] from [datastore kind name]/Create.Values/Read(CreateSource)/Impulse.out"
}
unique_name: "Read [column 3 value] from [datastore kind name]/Create.Values/Read(CreateSource)/Impulse"

        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:219)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:121)
        at org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:101)
        at org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:106)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:1122)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:198)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:321)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:307)
        at [our main class].main(MainDataflowJob.java:120)

Other log related to this unique name:

<record>
  <date>2023-08-16T13:56:55</date>
  <millis>1692205015473</millis>
  <sequence>278</sequence>
  <logger>org.apache.beam.sdk.runners.TransformHierarchy</logger>
  <level>FINE</level>
  <class>org.apache.beam.sdk.runners.TransformHierarchy$Node</class>
  <method>visit</method>
  <thread>1</thread>
  <message>Visiting primitive node Node{fullName=Read [column 3 value] from [datastore kind name]/Create.Values/Read(CreateSource)/Impulse, transform=Impulse}</message>
</record>
<record>
  <date>2023-08-16T13:56:55</date>
  <millis>1692205015473</millis>
  <sequence>279</sequence>
  <logger>org.apache.beam.sdk.runners.TransformHierarchy</logger>
  <level>FINE</level>
  <class>org.apache.beam.sdk.runners.TransformHierarchy$Node</class>
  <method>visit</method>
  <thread>1</thread>
  <message>Visiting output value Read [column 3 value] from [datastore kind name]/Create.Values/Read(CreateSource)/Impulse.out [PCollection@1917161212]</message>
</record>

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@haffar
Copy link

haffar commented Aug 25, 2023

@rafaelsms Ignore what I said before. I too was seeing the same error as you. After spending a few hours banging my head on the wall, I discovered it has to do with the way I am building and deploying my Beam application on GCP Dataflow.
I was building a fat executable jar and executing it via java -jar, and that method gives me the error. If you execute the main class instead, it works! So the solution is, dont build a fat jar, and execute the main class via mvn or gradle directly.
Take a look at the GCP documentation here:
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline

@rafaelsms
Copy link
Author

@haffar that is a strange solution, I too was building a fat jar, so I will try running it through Gradle on monday! Thank you for spending the time to tell me :)

@rafaelsms
Copy link
Author

I will try running it through Gradle on monday!

I can't do that in our setup, the jar is built and moved around for later use. Looking forward for any other alternative if there is one.

@christopherfrieler
Copy link

Just ran into this issue as well. @haffar , can you share more of the insights you gained during your analysis? Does it have to do with all the file duplicates when packaging the fat jar?

According to any examples I found, you have to set the duplicatesStrategy to "EXCLUDE", which seems a little dirty anyway. So maybe we can exclude certain files or dependencies to work around this issue and get the right version of the problematic files into the fat jar. I just don't understand enough of the Apache Beam runners yet to identify any file that might be the cause.

@haffar
Copy link

haffar commented Sep 1, 2023

Just ran into this issue as well. @haffar , can you share more of the insights you gained during your analysis? Does it have to do with all the file duplicates when packaging the fat jar?

According to any examples I found, you have to set the duplicatesStrategy to "EXCLUDE", which seems a little dirty anyway. So maybe we can exclude certain files or dependencies to work around this issue and get the right version of the problematic files into the fat jar. I just don't understand enough of the Apache Beam runners yet to identify any file that might be the cause.

@christopherfrieler unfortunately I am in the same boat. This was my first attempt at a beam project, and I figured a fat jar would be the way to go, but apparently not. I did not know about setting the duplicateStrategy to "Exclude", but I just tried it and it did not make a difference.

@henrihs
Copy link
Contributor

henrihs commented Sep 1, 2023

This was my first attempt at a beam project, and I figured a fat jar would be the way to go, but apparently not.

@haffar: I can assure you that building fat jars and deploying to Dataflow worked fine at least up until v2.45.0. I hit this issue when I upgraded to 2.49.0, and it seems to be this URN validation thats gets in the way somehow.

I had to manage with deploying with gradle for now as I didn't want to stick with the older versions.

@gzfhz
Copy link

gzfhz commented Sep 13, 2023

@henrihs We had the same problem with gradle's shadowJar task that is used to create the fat jar. And it's resolved by adding mergeServiceFiles():
shadowJar {
...
mergeServiceFiles()
}

@christopherfrieler
Copy link

Using Gradle Shadow Plugin with mergeServiceFiles() works. Thanks @gzfhz .

However, this requires an additional plugin. If the service files wouldn't collide, the regular JarTask could do the job.

And the example at https://github.com/apache/beam-starter-kotlin/blob/main/app/build.gradle.kts is also outdated.

@donallmc
Copy link

Update: This still seems to be an issue. I am unable to deploy a flex template using beam 2.56.0 but it works if I downgrade all the way down to 2.48.0. I have not confirmed the switch to gradle works (but I have no reason to believe it doesn't) since switching build systems is not really something we want to get into right now!

The fact that the Gradle approach works specifically with the shadow plugin makes me wonder if there's some service file or other resource that maven shadow handles differently and is getting squashed? I'm not familiar enough with the dataflow codebase to be able to easily find which one, though. If anybody has any thoughts I'd be happy to do further investigation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants