-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8618] Tear down unused DoFns periodically in Python SDK harness. #10655
Conversation
99d025b
to
94d67c7
Compare
@@ -280,6 +283,7 @@ def get(self, instruction_id, bundle_descriptor_id): | |||
try: | |||
# pop() is threadsafe | |||
processor = self.cached_bundle_processors[bundle_descriptor_id].pop() | |||
self.last_access_time[bundle_descriptor_id] = time.time() | |||
except IndexError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't update the access time when we first create the processor in the except block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected. The last_access_time
represents the last time the cached_bundle_processors
for some bundle_descriptor
is accessed. If it exceeds the time limit, the remaining bundle processors cached in the cached_bundle_processors
will be shutdown. What's your thought?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my eyes, the first time it is accessed is when it is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The strategy here is that the bundle processors which are unused after an amount of time will be shutdown. When a bundle processor is created in the exception block, there are no cached(unused) bundle processors. The bundle processors become unused only when they are added to the cached bundle processors list. What about rename last_access_time to cached_bundle_processors_last_access_time
to make it more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still do not understand, the method is named get
, so we access the bundle processor independently of whether we create it or not. It is cached, regardless of whether it is created and added to the cache, or retrieved from the cache.
Logically, you might want to update the time when putting the processor into the cache. That would be in release
.
What is the advantage of updating the time here? It should be sufficient to update it in release
, directly before putting it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, If a bundle processor is retrieved from the cache, there is a high possibility that the remaining cached bundle processors will be needed in the future and so the last access time is updated.
If the bundle processor is newly created, it means that the cached bundle processor list is empty. This is the main reason that the last access time is only updated when bundle processor is retrieved from the cache. However, I think that it does no harm to update the last access time in both cases if it makes the code more readable.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the bundle processor is newly created, it means that the cached bundle processor list is empty. This is the main reason that the last access time is only updated when bundle processor is retrieved from the cache.
Consider the case where we just have a single bundle processor. When we call get for the first time, we won't update the last-used time. However, every time we retrieve it afterwards, we will update the time, but the list of cached bundle processors will remain empty.
I think we should either (1) always update the last-used timestamp in get
, regardless of creation or (2) update it only on release
.
I'm leaning towards (2) because while a bundle processor is in-use, it can't be removed anyways. We update the timestamp when we put it back in release
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the single bundle processor case, it doesn't harm to update the time as the cached bundle processors is empty. However, in cases where there are multiple bundle processors, it will update the time for the remaining cached bundle processors and so improve the cache hit rate. I think this is main difference between solution (1) and (2). However, I'm fine with both solutions as I think both of them work. Will update the PR according to solution (2) if you are favor of it according to your experience.
Thanks for the review and valuable comments. I have address the comments and I appreciate if you can have another look :) @mxm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank you for addressing the comments.
for descriptor_id, last_access_time in \ | ||
self.cached_bundle_processors_last_access_time.items(): | ||
if time.time() - last_access_time > \ | ||
DEFAULT_BUNDLE_PROCESSOR_CACHE_THRESHOLD_S: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could surround the condition with paranthesis which would make it easier to read, e.g.
if (time.time() - last_access_time >
DEFAULT_BUNDLE_PROCESSOR_CACHE_THRESHOLD_S):
@@ -69,6 +71,8 @@ | |||
# 5 minutes * 60 seconds * 1020 millis * 1000 micros * 1000 nanoseconds | |||
DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000 | |||
|
|||
DEFAULT_BUNDLE_PROCESSOR_CACHE_THRESHOLD_S = 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DEFAULT_BUNDLE_PROCESSOR_CACHE_THRESHOLD_S = 60 | |
DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60 |
def _schedule_periodic_shutdown(self): | ||
def shutdown_inactive_bundle_processors(): | ||
for descriptor_id, last_access_time in \ | ||
self.cached_bundle_processors_last_access_time.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, use parenthesis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that it doesn't support parenthesis in the for loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should, there are many examples of for loops using parentheses in the code base already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I have added a local variable and it should have no this problem now. Is that make sense to you?
Just to let you know that we've just introduced Python autoformatter. Your merge conflict might be a result of this. |
d3e1b4d
to
8d97572
Compare
squash the commits and rebase the code. |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linting needs to be fixed. See comment from @kamilwu.
Run PythonLint PreCommit |
https://builds.apache.org/job/beam_PreCommit_PythonLint_Phrase/70/ |
Thanks for the info @mxm , I have fix the style issue, and update the PR :) |
Tear down unused DoFns periodically in Python SDK harness.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.