New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8626] Implement status fn api handler in python sdk #10598
Conversation
93665a9
to
7cc2797
Compare
R: @angoenka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @y1chi !
@@ -110,6 +112,15 @@ def __init__(self, | |||
data_channel_factory=self._data_channel_factory, | |||
fns=self._fns) | |||
|
|||
if status_address: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move it in sdk_worker_main
where we keep other reporting related code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to do the actual initialization inside sdk_worker since I want to pass the active bundle cache in sdk worker in order to report the dangling operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor | ||
|
||
|
||
def thread_dump(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have thread dump code in sdk_worker_main.py
under get_thread_dump
.
Shall we reuse it or move it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made few changes to the thread dump format. I'll reuse the function, I think eventually we probably won't need the status http server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we can get rid of status http server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, Lets add a jira to clean StatusServer in sdk_worker_main
beam_fn_api_pb2.WorkerStatusResponse(id=request.id, | ||
status_info=response)) | ||
|
||
def generate_status_response(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also expose it over a http server on dynamic port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's possible but since eventually we'll be able to query the runner like localhost:port/sdk_status?id=<sdk_id>, it has same effect as exposing it individually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable.
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Few minor comments.
@@ -110,6 +112,15 @@ def __init__(self, | |||
data_channel_factory=self._data_channel_factory, | |||
fns=self._fns) | |||
|
|||
if status_address: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
status_address, self._bundle_processor_cache) | ||
except Exception: | ||
traceback_string = traceback.format_exc() | ||
_LOGGER.info('Error creating worker status request handler, skipping ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_LOGGER.info('Error creating worker status request handler, skipping ' | |
_LOGGER.warn('Error creating worker status request handler, skipping ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -78,8 +67,7 @@ def do_GET(self): # pylint: disable=invalid-name | |||
self.send_header('Content-Type', 'text/plain') | |||
self.end_headers() | |||
|
|||
for line in StatusServer.get_thread_dump(): | |||
self.wfile.write(line.encode('utf-8')) | |||
self.wfile.write(thread_dump()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add a jira to clean StatusServer
from here completely once we have rolled out Debug capture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor | ||
|
||
|
||
def thread_dump(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, Lets add a jira to clean StatusServer in sdk_worker_main
stack_traces[stack_trace].append(thread_ident_name) | ||
|
||
all_traces = ['=' * 10 + 'THREAD DUMP' + '=' * 10] | ||
for stack, identity in stack_traces.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a jira to group threads which have same thread stack for easier analysis and reducing text size.
This can be a starter task for new beam contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it already included in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It's already in this PR.
We can print names of all the threads along with count so that we don't miss any information.
return '\n'.join(x.encode('utf-8') for x in all_traces) | ||
|
||
|
||
def active_processing_bundles_state(bundle_process_cache): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
stack_traces[stack_trace].append(thread_ident_name) | ||
|
||
all_traces = ['=' * 10 + 'THREAD DUMP' + '=' * 10] | ||
for stack, identity in stack_traces.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It's already in this PR.
We can print names of all the threads along with count so that we don't miss any information.
all_traces = ['=' * 10 + 'THREAD DUMP' + '=' * 10] | ||
for stack, identity in stack_traces.items(): | ||
ident, name = identity[0] | ||
trace = '--- Thread #%s name: %s %s---\n' % ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trace = '--- Thread #%s name: %s %s---\n' % ( | |
trace = '--- Threads (%d) %s --- \n' % (len(identity), [ident+':'+name for (ident, name) in identity]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already printed in a separated line below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
LGTM
Retest this please |
1 similar comment
Retest this please |
Run Python PreCommit |
Retest this please |
1 similar comment
Retest this please |
retest this please |
Retest this please |
Run Dataflow ValidatesRunner |
I am not sure why the tests are not running on this PR. |
retest this please |
1 similar comment
retest this please |
retest this please |
retest this please |
1 similar comment
retest this please |
Run PythonLint PreCommit |
2 similar comments
Run PythonLint PreCommit |
Run PythonLint PreCommit |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.