Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dead letter queue core #29164

Merged
merged 82 commits into from
Dec 5, 2023

Conversation

johnjcasey
Copy link
Contributor

Add the base utilities for creating error handlers and adding DLQs to Java IOs

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

johnjcasey and others added 30 commits July 5, 2023 16:05
Create gradle task and github actions config for GCS using this.
…tests

Feature/automate performance tests
…tests

add destination for triggered tests
…tests

move env variables to correct location
…tests

try uploading against separate dataset
…tests

update branch checkout, try to view the failure log
…tests

update to use correct BigQuery instance
@johnjcasey
Copy link
Contributor Author

Run Java PreCommit

public abstract static class Record implements Serializable {

/** The failing record, encoded as JSON. Will be null if serialization as JSON fails. */
public abstract @Nullable String getJsonRecord();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, thinking about this, we don't expect all records to be (faithfully) represented in json, but that's not the point. The point is to give the user some idea what the record is without having to scrutinize the bytes.

Should we lengthen this to humanReadableJsonRecord to better capture its intent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that, I'll make that change

throws Exception {
if (exception != null) {
throw exception;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we should be throwing a generic exception if one is not provided?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something more directly like "Encountered bad record." Might be good to put (some of?) the record itself in the message as it won't be stored elsewhere.

Preconditions.checkArgumentNotNull(record);
ObjectWriter objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();

// Build up record information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this logic would be better placed as factory methods on BadRecord itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the whole block down to line 92.


@Override
public void close() {
closed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be an error (or no-op?) to close this twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely yes

closed = true;
PCollection<ErrorT> flattened;
if (errorCollections.isEmpty()) {
LOG.warn("Empty list of error pcollections passed to ErrorHandler.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warn is probably too strong, and likely not very actionable.

* configured an error handler or not.
*/
@Internal
class NoOpErrorHandler<ErrorT, OutputT extends POutput> implements ErrorHandler<ErrorT, OutputT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoOp seems to imply that we do nothing with the errors (like /dev/null). What we want here is something that we can call addErrorCollection on when we know that the passed error collection will be empty, but should never be used in as an actual error handler. In particular, if there happens to be something in those collections, that's a bug. (Should this be asserted? Actually, in this case it's fine to call close() and all. It's a NoneShallPassErrorHandler.)

I just can't shake the feeling that this'd all be so much easier if we finish pushing error handling into ParDo itself...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename this. I agree that I'll probably want to rework this once we update the ParDo itself, but that will take quite a lot of design that we don't have in place yet.

BigEndianIntegerCoder.of(),
new RuntimeException(),
"Integer was odd",
"NoOpDoFn");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd have to extract this from the DoFn Context object. (Or, more hacky, leverage the same globals that Counters and logging do.)

@johnjcasey
Copy link
Contributor Author

This should be good to go. The pulsar tests are know to be quite flaky

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks good. Just some minor comments.

throws Exception {
if (exception != null) {
throw exception;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something more directly like "Encountered bad record." Might be good to put (some of?) the record itself in the message as it won't be stored elsewhere.

Preconditions.checkArgumentNotNull(record);
ObjectWriter objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();

// Build up record information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the whole block down to line 92.

public void testErrorHandlerWithBRHTransform() throws Exception {
PCollection<Integer> record = pipeline.apply(Create.of(1, 2, 3, 4));
DummySinkTransform<BadRecord> transform = new DummySinkTransform<>();
ErrorHandler<BadRecord, PCollection<BadRecord>> eh =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use a try/autoclosable context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lets us manually get access to the output collection, which is useful in the test context

@johnjcasey johnjcasey merged commit 22ae2e3 into apache:master Dec 5, 2023
33 checks passed
Naireen pushed a commit to Naireen/beam that referenced this pull request Jan 3, 2024
* Update 2.50 release notes to include new Kafka topicPattern feature

* Create groovy class for io performance tests
Create gradle task and github actions config for GCS using this.

* delete unnecessary class

* fix env call

* fix call to gradle

* run on hosted runner for testing

* add additional checkout

* add destination for triggered tests

* move env variables to correct location

* try uploading against separate dataset

* try without a user

* update branch checkout, try to view the failure log

* run on failure

* update to use correct BigQuery instance

* convert to matrix

* add result reporting

* add failure clause

* remove failure clause, update to run on self-hosted

* address comments, clean up build

* clarify branching

* Add error handling base implementation & test DLQ enabled class

* Add test cases

* apply spotless

* Fix Checkstyles

* Fix Checkstyles

* make DLH serializable

* rename dead letter to bad record

* make DLH serializable

* Change bad record router name, and use multioutputreceiver instead of process context

* Refactor BadRecord to be nested

* clean up checkstyle

* Update error handler test

* Add metrics for counting error records, and for measuring feature usage

* apply spotless

* fix checkstyle

* make metric reporting static

* spotless

* Rework annotations to be an explicit label on a PTransform, instead of using java annotations

* fix checkstyle

* Address comments

* Address comments

* Fix test cases, spotless

* remove flatting without error collections

* fix nullness

* spotless + encoding issues

* spotless

* throw error when error handler isn't used

* add concrete bad record error handler class

* spotless, fix test category

* fix checkstyle

* clean up comments

* fix test case

* remove "failing transform" field on bad record, add note to CHANGES.md

* fix failing test cases

* fix failing test cases

* apply spotless

* apply final comments

* apply final comments

* apply final comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants