Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Storage Write API fails on Batch Pipelines on 2.49 #27670

Closed
2 of 15 tasks
pranavbhandari24 opened this issue Jul 25, 2023 · 11 comments · Fixed by #27699
Closed
2 of 15 tasks

[Bug]: Storage Write API fails on Batch Pipelines on 2.49 #27670

pranavbhandari24 opened this issue Jul 25, 2023 · 11 comments · Fixed by #27699
Assignees

Comments

@pranavbhandari24
Copy link
Contributor

pranavbhandari24 commented Jul 25, 2023

What happened?

When writing to a BigQuery sink in a Batch pipeline using Storage Write API the pipeline fails due to the following error,

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Schema field not found: eventid

at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output ( org/apache.beam.runners.dataflow.worker/GroupAlsoByWindowsParDoFn.java:187 )
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue ( org/apache.beam.runners.dataflow.worker/GroupAlsoByWindowFnRunner.java:108 )
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement ( org/apache.beam.runners.dataflow.worker.util/BatchGroupAlsoByWindowReshuffleFn.java:56 )
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement ( org/apache.beam.runners.dataflow.worker.util/BatchGroupAlsoByWindowReshuffleFn.java:39 )
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement ( org/apache.beam.runners.dataflow.worker/GroupAlsoByWindowFnRunner.java:121 )
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement ( org/apache.beam.runners.dataflow.worker/GroupAlsoByWindowFnRunner.java:73 )
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement ( org/apache.beam.runners.dataflow.worker/GroupAlsoByWindowsParDoFn.java:117 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process ( org/apache.beam.runners.dataflow.worker.util.common.worker/ParDoOperation.java:44 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process ( org/apache.beam.runners.dataflow.worker.util.common.worker/OutputReceiver.java:54 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop ( org/apache.beam.runners.dataflow.worker.util.common.worker/ReadOperation.java:218 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start ( org/apache.beam.runners.dataflow.worker.util.common.worker/ReadOperation.java:169 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute ( org/apache.beam.runners.dataflow.worker.util.common.worker/MapTaskExecutor.java:83 )
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork ( org/apache.beam.runners.dataflow.worker/BatchDataflowWorker.java:319 )
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork ( org/apache.beam.runners.dataflow.worker/BatchDataflowWorker.java:291 )
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork ( org/apache.beam.runners.dataflow.worker/BatchDataflowWorker.java:221 )
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork ( org/apache.beam.runners.dataflow.worker/DataflowBatchWorkerHarness.java:147 )
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call ( org/apache.beam.runners.dataflow.worker/DataflowBatchWorkerHarness.java:127 )
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call ( org/apache.beam.runners.dataflow.worker/DataflowBatchWorkerHarness.java:114 )
at java.util.concurrent.FutureTask.run ( java/util.concurrent/FutureTask.java:264 )
at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run ( org/apache.beam.sdk.util/UnboundedScheduledExecutorService.java:163 )
at java.util.concurrent.ThreadPoolExecutor.runWorker ( java/util.concurrent/ThreadPoolExecutor.java:1128 )
at java.util.concurrent.ThreadPoolExecutor$Worker.run ( java/util.concurrent/ThreadPoolExecutor.java:628 )
at java.lang.Thread.run ( java/lang/Thread.java:834 )
Caused by: org.apache.beam.sdk.util.UserCodeException

at org.apache.beam.sdk.util.UserCodeException.wrap ( UserCodeException.java:39 )
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiConvertMessages$ConvertMessagesDoFn$DoFnInvoker.invokeProcessElement
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement ( SimpleDoFnRunner.java:211 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement ( SimpleDoFnRunner.java:188 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement ( SimpleParDoFn.java:340 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process ( ParDoOperation.java:44 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process ( OutputReceiver.java:54 )
at org.apache.beam.runners.dataflow.worker.AssignWindowsParDoFnFactory$AssignWindowsParDoFn.processElement ( AssignWindowsParDoFnFactory.java:115 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process ( ParDoOperation.java:44 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process ( OutputReceiver.java:54 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output ( SimpleParDoFn.java:285 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue ( SimpleDoFnRunner.java:275 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900 ( SimpleDoFnRunner.java:85 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output ( SimpleDoFnRunner.java:423 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output ( SimpleDoFnRunner.java:411 )
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement ( PrepareWrite.java:84 )
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement ( SimpleDoFnRunner.java:211 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement ( SimpleDoFnRunner.java:185 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement ( SimpleParDoFn.java:340 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process ( ParDoOperation.java:44 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process ( OutputReceiver.java:54 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output ( SimpleParDoFn.java:285 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue ( SimpleDoFnRunner.java:275 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900 ( SimpleDoFnRunner.java:85 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output ( SimpleDoFnRunner.java:423 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output ( SimpleDoFnRunner.java:411 )
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement ( JdbcIO.java:1528 )
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement ( SimpleDoFnRunner.java:211 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement ( SimpleDoFnRunner.java:188 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement ( SimpleParDoFn.java:340 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process ( ParDoOperation.java:44 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process ( OutputReceiver.java:54 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output ( SimpleParDoFn.java:285 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue ( SimpleDoFnRunner.java:275 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900 ( SimpleDoFnRunner.java:85 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output ( SimpleDoFnRunner.java:423 )
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output ( DoFnOutputReceivers.java:76 )
at org.apache.beam.sdk.transforms.MapElements$2.processElement ( MapElements.java:151 )
at org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement ( SimpleDoFnRunner.java:211 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement ( SimpleDoFnRunner.java:188 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement ( SimpleParDoFn.java:340 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process ( ParDoOperation.java:44 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process ( OutputReceiver.java:54 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output ( SimpleParDoFn.java:285 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue ( SimpleDoFnRunner.java:275 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900 ( SimpleDoFnRunner.java:85 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output ( SimpleDoFnRunner.java:423 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output ( SimpleDoFnRunner.java:411 )
at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement ( ReshuffleOverrideFactory.java:86 )
at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement ( SimpleDoFnRunner.java:211 )
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement ( SimpleDoFnRunner.java:188 )
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement ( SimpleParDoFn.java:340 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process ( ParDoOperation.java:44 )
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process ( OutputReceiver.java:54 )
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output ( GroupAlsoByWindowsParDoFn.java:185 )
Caused by: java.lang.RuntimeException

at org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto$SchemaInformation.getSchemaForField ( TableRowToStorageApiProto.java:371 )
at org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.messageFromMap ( TableRowToStorageApiProto.java:472 )
at org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.messageFromTableRow ( TableRowToStorageApiProto.java:625 )
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinationsTableRow$TableRowConverter.toMessage ( StorageApiDynamicDestinationsTableRow.java:170 )
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiConvertMessages$ConvertMessagesDoFn.processElement ( StorageApiConvertMessages.java:159 )

The field eventId exists but for some reason the case of the field changes.

This does not occur in when using File Loads method.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@johnjcasey
Copy link
Contributor

johnjcasey commented Jul 25, 2023

this looks to be caused by this change: #26975

Pranav and I debugged offline, and it looks like this is specific to batch mode

@reuvenlax any idea why we would get this issue when using storage write in batch, but not in streaming?

@johnjcasey
Copy link
Contributor

@ahmedabu98 as well

@pranavbhandari24 pranavbhandari24 changed the title [Bug]: Storage Write API Write fails on Batch Pipelines on 2.49 [Bug]: Storage Write API fails on Batch Pipelines on 2.49 Jul 26, 2023
@ahmedabu98
Copy link
Contributor

@pranavbhandari24 can you provide some more info?

Is there a simple pipeline we can use to reproduce this? Does this fail consistently or under some conditions?

@pranavbhandari24
Copy link
Contributor Author

The pipeline fails consistently. I was able to reproduce this error on Jdbc To BigQuery template.

Let me try to reproduce this using a simple pipeline

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Jul 26, 2023

Couldn't reproduce it with the following pipeline (tried to make it as close as possible to the write configuration in the template):

public static void main(String[] args) {
    List<TableRow> rows = Arrays.asList(
        new TableRow().set("int", 1).set("str", "a"),
        new TableRow().set("str", "a").set("int", 1),
        new TableRow().set("str", "a").set("int", 1));

    Pipeline p = Pipeline.create();

    p.apply(Create.of(rows))
        .apply(BigQueryIO.writeTableRows()
            .to("google.com:clouddfe:ahmedabualsaud_test.copy")
            .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
//                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    p.run().waitUntilFinish();
}

@ahmedabu98
Copy link
Contributor

Ahh, I was able to reproduce it with these rows:

List<TableRow> rows = Arrays.asList(
    new TableRow().set("int", 1).set("str", "a"),
    new TableRow().set("STR", "b").set("int", 2),
    new TableRow().set("str", "c").set("INT", 3));

looks like functions in TableRowToStorageApiProto may be case-sensitive, when probably they don't need to be

@ahmedabu98
Copy link
Contributor

Hmmm this is a pretty old bug.. I ran it with different versions and am tracing it back to 2.39.0

@ahmedabu98
Copy link
Contributor

Introduced in #17404

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Jul 26, 2023

Have a PR addressing this in #27699. BTW this is in code shared by all Storage API modes (batch, streaming modes. also at_least_once), so probably affects all storage writes.

@github-actions github-actions bot added this to the 2.50.0 Release milestone Jul 26, 2023
@reuvenlax
Copy link
Contributor

BigQuery column names are not case sensitive, so we should fail if the schema has two such column names.

@ahmedabu98
Copy link
Contributor

we should fail if the schema has duplicate names

Should we do this in the main BigQueryIO module? So it can apply to all write methods

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants