Skip to content

Conversation

@scwhittle
Copy link
Contributor

Previously it could occur that an instruction id was observed on the control stream but due to exception it would never register a handler and the data stream would be forever blocked trying to multiplex elements to it's queue.

Fixes #32714


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions github-actions bot added the java label Oct 18, 2024
@scwhittle scwhittle marked this pull request as ready for review October 18, 2024 10:08
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@scwhittle scwhittle force-pushed the timeout_and_poison branch 3 times, most recently from 974a9b4 to fa7cde6 Compare October 21, 2024 10:21
@scwhittle
Copy link
Contributor Author

Test failures appear to be unrelated flakiness

@scwhittle
Copy link
Contributor Author

assign set of reviewers

@scwhittle
Copy link
Contributor Author

Run Java PreCommit

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Override
public void poisonInstructionId(String instructionId) {
LOG.debug("Poisoning instruction {}", instructionId);
for (BeamFnDataGrpcMultiplexer client : multiplexerCache.values()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing this doesn't yet handle is if the instruction that is being poisoned is for an endpoint that has not yet been observed. This is only possible at beginning of the processing though.

I was working on refactoring creation of the bundle processor to determine the api endpoints before possible exception was thrown but it was getting large and would prefer to do it as a follow up because as-is this helps cases where the api endpoint was created before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to getting this in first.

An alternative is for all these multiplexers to share the same poisoned id cache.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for hunting this down and fixing it.

this.receivers = new ConcurrentHashMap<>();
this.erroredInstructionIds = new ConcurrentHashMap<>();
this.poisonedInstructionIds =
CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(20)).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should 20 minutes be pulled out to a top-level constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

instructionId,
(unused) -> {
if (poisonedInstructionIds.getIfPresent(instructionId) != null) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to special case this instead of simply always returning a completable future (even if it won't be used) as before? Or is it for better error messaging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This null in the computeIfAbsent is to avoid memory leak for long-running pipelines processing lots of instructions ids. If we're poisoned, it seems possible we won't unregister and thus if we add the id here it will stay in the map forever.

I removed this helper since I think it is clearer if the two cases do their own handling.

CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
receivers.remove(instructionId);
if (receiverFuture != null && !receiverFuture.isDone()) {
throw new IllegalStateException("Unregistering consumer which was not registered.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message doesn't seem to match the clauses of the if statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added explanation comment.

@Override
public void poisonInstructionId(String instructionId) {
LOG.debug("Poisoning instruction {}", instructionId);
for (BeamFnDataGrpcMultiplexer client : multiplexerCache.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to getting this in first.

An alternative is for all these multiplexers to share the same poisoned id cache.

@codecov
Copy link

codecov bot commented Oct 25, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.95%. Comparing base (1e27978) to head (abc5492).
Report is 121 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #32857      +/-   ##
============================================
+ Coverage     57.41%   58.95%   +1.53%     
- Complexity     1475     3112    +1637     
============================================
  Files           968     1131     +163     
  Lines        154224   174787   +20563     
  Branches       1076     3357    +2281     
============================================
+ Hits          88546   103038   +14492     
- Misses        63477    68398    +4921     
- Partials       2201     3351    +1150     
Flag Coverage Δ
java 69.94% <ø> (+1.35%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once tests are passing.

@scwhittle scwhittle closed this Oct 30, 2024
@scwhittle scwhittle reopened this Oct 30, 2024
@scwhittle
Copy link
Contributor Author

Run Java PreCommit

@scwhittle
Copy link
Contributor Author

Run Java_IOs_Direct PreCommit

@scwhittle
Copy link
Contributor Author

Run Java_Pulsar_IO_Direct PreCommit

@scwhittle scwhittle closed this Nov 4, 2024
@scwhittle scwhittle reopened this Nov 4, 2024
@scwhittle scwhittle closed this Nov 5, 2024
@scwhittle scwhittle reopened this Nov 5, 2024
@scwhittle scwhittle closed this Nov 11, 2024
@scwhittle scwhittle reopened this Nov 11, 2024
@scwhittle
Copy link
Contributor Author

Run Java_IOs_Direct PreCommit

@scwhittle
Copy link
Contributor Author

Run Java_Hadoop_IO_Direct PreCommit

@scwhittle
Copy link
Contributor Author

Run Java PreCommit

@scwhittle
Copy link
Contributor Author

Run Java_Hadoop_IO_Direct PreCommit

…iterable:

- remove reference to completed encoded input page from decoder once we have read it.
- re-read from cache after loading the next page to give eviction a chance to remove blocks
@scwhittle
Copy link
Contributor Author

scwhittle commented Nov 11, 2024

(removed, commented on wrong issue)

@scwhittle
Copy link
Contributor Author

Run Java PreCommit

@scwhittle
Copy link
Contributor Author

Run Java_IOs_Direct PreCommit

@scwhittle
Copy link
Contributor Author

Run Java_Pulsar_IO_Direct PreCommit

@scwhittle
Copy link
Contributor Author

Run Java PreCommit

@scwhittle
Copy link
Contributor Author

Run Java_IOs_Direct PreCommit

@scwhittle
Copy link
Contributor Author

precommit failures were unrelated flakes in :runners:google-cloud-dataflow-java:worker:test

@scwhittle scwhittle merged commit 785ec07 into apache:master Nov 12, 2024
@scwhittle scwhittle deleted the timeout_and_poison branch November 12, 2024 10:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Java SDK blocked due to FnApi data stream multiplexing and ProcessBundleHandler exception handling

2 participants