-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10603] Gracefully shutdown the channel reader in the test_stream_impl #12701
Conversation
Change-Id: I5fac037a7dcc141ff9d0ff8058fa3de993626c15
Codecov Report
@@ Coverage Diff @@
## master #12701 +/- ##
==========================================
- Coverage 40.30% 40.09% -0.21%
==========================================
Files 451 455 +4
Lines 53168 54026 +858
==========================================
+ Hits 21429 21663 +234
- Misses 31739 32363 +624
Continue to review full report at Codecov.
|
R: @pabloem |
except grpc.RpcError as e: | ||
# This happens when the Python interpreter shuts down or whn in a | ||
# notebook environment when the kernel is interrupted. | ||
if e.code() == grpc.StatusCode.UNAVAILABLE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we deal with other statuscodes? Shouldn't we still write the endofstream to the file for other errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far I've only seen UNAVAILABLE as a non-error code. But looking at https://developers.google.com/maps-booking/reference/grpc-api/status_codes I think we can also add CANCELLED to not forward the exception.
At L290, we do have a finally clause which will run regardless, so the endofstream will always be written to the channel.
Change-Id: Ic149d64301572201b29813f94bff704e9e7ca778
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added CANCELLED to the if statement
except grpc.RpcError as e: | ||
# This happens when the Python interpreter shuts down or whn in a | ||
# notebook environment when the kernel is interrupted. | ||
if e.code() == grpc.StatusCode.UNAVAILABLE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far I've only seen UNAVAILABLE as a non-error code. But looking at https://developers.google.com/maps-booking/reference/grpc-api/status_codes I think we can also add CANCELLED to not forward the exception.
At L290, we do have a finally clause which will run regardless, so the endofstream will always be written to the channel.
Run Portable_Python PreCommit |
Run Python2_PVR_Flink PreCommit |
LGTM. I'll merge once we get clean runs for tests. |
Run Python PreCommit |
2 similar comments
Run Python PreCommit |
Run Python PreCommit |
thanks @rohdesamuel |
Change-Id: I5fac037a7dcc141ff9d0ff8058fa3de993626c15
In the case that the kernel is interrupted we want to gracefully shutdown the grpc streaming request from the TestStream. Otherwsie, this throws an esoteric exception.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.