Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable to handle NotFound and AccessDenied errors in the streaming insert of BigQueryIO #31310

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

yu-iskw
Copy link
Contributor

@yu-iskw yu-iskw commented May 16, 2024

Addresses #31226

Issue Summary

The BigQueryServicesImpl of the Apache Beam SDK does not handle the errors of "Not Found" and "Permission Denied" when inserting data into BigQuery fails. This results in a Dataflow job attempting to insert the data into BigQuery infinitely.

Detailed Description

Problem Statement

  • Error Handling: The BigQueryServicesImpl does not manage "Not Found" and "Permission Denied" errors.
  • Infinite Retries: If data insertion into BigQuery fails, the Dataflow job retries indefinitely.

Current Workarounds

  • Fixed Destination Datasets/Tables: Errors can be resolved by creating the dataset or table, or by granting the required permissions to the service account of the Dataflow job.
  • Dynamic Destination Tables: When destination tables are determined dynamically by the input data:
    • A destination table might not exist due to incorrect input data.
    • A destination table might exist, but the Dataflow job should not insert data into it due to incorrect input data.
    • In these cases, creating incorrect destination tables or granting permissions to insert into them is not advisable.

Potential Solutions

  • Custom BigQueryServices: Modify the behavior of BigQueryServicesImpl by creating a custom BigQueryServices within the Apache Beam SDK namespace using the withTestServices method. However, this method is not recommended for production use due to its complexity.
  • Dead-letter Topic: Routing failed records to a dead-letter topic in Pub/Sub is not recommended.
  • Retry Policy: Handling "Not Found" and "Permission Denied" errors in the pipeline with a retry policy would be ideal. Currently, BigQueryServicesImpl can handle errors returned by the BigQuery API.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Signed-off-by: Yu Ishikawa <yu-iskw@users.noreply.github.com>
Signed-off-by: Yu Ishikawa <yu-iskw@users.noreply.github.com>
Signed-off-by: Yu Ishikawa <yu-iskw@users.noreply.github.com>
Signed-off-by: Yu Ishikawa <yu-iskw@users.noreply.github.com>
Signed-off-by: Yu Ishikawa <yu-iskw@users.noreply.github.com>
@yu-iskw
Copy link
Contributor Author

yu-iskw commented May 16, 2024

@m-trieu @ahmedabu98 I am not sure you are the appropriate persons to discuss the pull request. I want to discuss solutions to solve #20211 and #31226 , though I am still in the middle of fixing the unit tests.

I put what I want to achieve to the description of the pull request.. In short, I want to enable to handle errors as NotFound and AccessDenied with the retry policy in the streaming insert mode of BigQueryIO. We can get the implementation better. But, it would be a good point to start the discussion. What are your thoughts?

Comment on lines +1006 to +1021
// Return errors so that we can handle even NotFound and AccessDenied with the retry
// policy
if (ApiErrorExtractor.INSTANCE.itemNotFound(e)
|| ApiErrorExtractor.INSTANCE.accessDenied(e)) {
LOG.warn("Ignore the error: " + e.getMessage());
List<TableDataInsertAllResponse.InsertErrors> errors = new ArrayList<>();
TableDataInsertAllResponse.InsertErrors errorDetail =
new TableDataInsertAllResponse.InsertErrors();
ErrorProto errorProto = new ErrorProto();
errorProto.setReason(errorReason);
errorProto.setMessage("Item not found or access denied: " + e.getMessage());
errorDetail.setIndex(0L);
errorDetail.setErrors(Collections.singletonList(errorProto));
errors.add(errorDetail);
return errors;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it encounters errors as NotFound and AccessDenied when inserting data into BigQuery, it doesn't return the response format to get insertErrors. In the BigQuery SDK in Java, it raises IOException by errors as NotFound and AccessDenied. The implementation would be a workaround to capture the exception. I'm not sure it is going to a right direction. But, it is an easy way to change the behavior so that we can retrieve errors with the retry policy.

https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

@yu-iskw yu-iskw marked this pull request as ready for review May 16, 2024 06:34
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@yu-iskw
Copy link
Contributor Author

yu-iskw commented May 20, 2024

@damccorm What are your thoughts on this? The issue you opened is critical, specifically for users who take advantage of dynamic destination.

@damccorm
Copy link
Contributor

(note I just ran the migration tool for the issue, I didn't actually file it myself)

I actually think the current behavior (before this PR) is correct/reasonable. I'll chime in on #20211

@yu-iskw
Copy link
Contributor Author

yu-iskw commented May 21, 2024

@damccorm Thank you for the comment.

Is there any good ideas to handle infinite retries? I would like to know the best practices for handling errors such as NotFound and AccessDenied, if we encounter infinite retries to insert data into BigQuery.

We take advantage of a BigQuery dynamic destination to a determine destination table based on the contents of a PubSub message. If a PubSub message contains invalid values to determine the destination table, then inserting the record into BigQuery will fail with a NotFound error. However, creating a BigQuery table for invalid destinations is not a good idea.

One of the possible solutions to forward invalid messages to a dead-letter topic. But, Dataflow doesn't recommend using a dead-letter topic.

https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub#dead-letter-topics

@damccorm
Copy link
Contributor

But, Dataflow doesn't recommend using a dead-letter topic.

That is strictly for pubsub reads. There is nothing wrong with using a dead letter queue for failed records (this is actually an encouraged pattern, see "When performing writes from Dataflow to a connector, consider using an ErrorHandler to handle any failed writes..." https://cloud.google.com/dataflow/docs/guides/io-connector-best-practices)

@yu-iskw
Copy link
Contributor Author

yu-iskw commented May 26, 2024

@damccorm Thank you so much for the information. I didn't know ErrorHandler. Is there any other documentations about how ErrorHandler behaves, specifically in BigQueryIO? I trid to find documentations about ErrorHandler in the Apache Beam documentation, but I can't find anything. And the Dataflow documentation is linked to the reference to ErrorHandler which is a generic class.Indeed, I looked for unit tests with ErrorHandler in the beam repository. If there is any other documentation about ErrorHandler, that would be helpful very much.

https://github.com/apache/beam/blob/v2.56.0/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java#L740-L756

@johnjcasey
Copy link
Contributor

When an error occurs in BQIO, if there is an error handler configured, instead of letting that error propagate up to trigger retries, it is sent to the Error handler provided. That error handler can be any ptransform capable of handling the BadRecord type.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants