-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Labels
Description
In:
class BigQueryWriteFn(DoFn):
- def _flush_batch(self, destination):
Return an additional pvalue.TaggedOutput with the detailed ERROR from failed insertion to BigQuery.
Today the error returns only the row (payload) of the error, like this:
// Return Statement
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination,
row)))
for row in failed_rows
]
For error analysis it is super important to understand WHAT is causing the error.
In this same function, we only need to return the error from BigQuery in an additional pvalue.TaggedOutput:
// Function that captures the error
passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
The new return would look like this:
// new return statement
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination,
row, error)))
for row in failed_rows
]
Thank you!
Imported from Jira BEAM-10640. Original Jira may contain additional context.
Reported by: leiterenato.