-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[cache-memory-leak] Fix Memory leak in cache server #416
Conversation
4cbba88
to
7b489ee
Compare
log_size = get_log_size(task, logtype) | ||
if log_size > self._max_log_size: | ||
return [( | ||
None, log_size_exceeded_blurb(task, logtype, self._max_log_size) | ||
), ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If logs are larger than some size then it will respond with a standard blurb that explains to the user how to access the logs from their local machine.
This option is very useful when users are generating > 100Mb of logs.
if time.time() - _counter > 30: | ||
self.verify_stale_workers() | ||
_counter = time.time() | ||
|
||
self.cleanup_if_necessary() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Core change that helps wipe the memory leak.
@@ -56,7 +56,7 @@ async def read_message(self, line: str): | |||
|
|||
if self.logger.isEnabledFor(logging.INFO): | |||
self.logger.info( | |||
"Pending stream keys: {}".format(list(self.pending_requests)) | |||
"Pending stream keys: {}".format(len(list(self.pending_requests))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
driveby change for reducing size of logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nits but LGTM.
if time_to_pool_refresh > 0: | ||
return | ||
# if workers are still running 30 seconds after the pool refresh timeout, then cleanup | ||
no_workers_are_running = len(self.workers) == 0 and len(self.pending_requests) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean an idle pool has one request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that is what I observed.
# if workers are still running 30 seconds after the pool refresh timeout, then cleanup | ||
no_workers_are_running = len(self.workers) == 0 and len(self.pending_requests) == 1 | ||
pool_needs_refresh = time_to_pool_refresh <= 0 | ||
pool_force_refresh = time_to_pool_refresh < -60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to make that configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the force refresh setting ? Yes, I can make it configurable.
echo("Refreshing the pool as no workers are running and no pending requests are there.") | ||
self.cleanup_workers() | ||
elif no_workers_are_running and pool_needs_refresh: | ||
echo("Refreshing the pool as no workers are running and no pending requests are there.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there no race here? Can a worker not "get busy" in the meantime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this entire execution is running sync code, and this code block comes after the code path that creates new workers, the only current case is that workers are already created. No new worker will be created during the execution of this block. If workers exist, they are already busy workers.
Generally, if workers are busy at the time of pool-refresh, I didn't want to kill them immediately. If the server is under a lot of load, then an API request on the server may be waiting for some object from the server whose worker may get killed, and will result in the API response being an ugly http-500x.
I have not exactly explained in this PR description how the memory leak came to be so let me more light:
- When API requests come to the server, the UI Server request the cache-server (running in the background) to extract out the cached value. This request is given a stream-id. The API-server is waiting on the response of the stream-id from the cache-server.
- The cache server spawns a new worker for this stream-id and worker does some computing and returns the object in the stream
- When many requests hit the server (especially when a run has a lot of logs), the cache server will spawn multiple individual workers, The API server will be waiting on all of these streams. If the objects(ex. logs) are large, then it will end up loading the whole object in memory and give the response. After the response is passed, the callback handles the removal of the worker and committing information to the cache-store. Even if the worker has been killed, the pool is still holding the reference to it (since we never close/join the pool)
self.verify_stale_workers() | ||
_counter = time.time() | ||
|
||
self.cleanup_if_necessary() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could probably put this in the if statement. Not a huge difference though.
self.verify_stale_workers() | ||
_counter = time.time() | ||
|
||
self.cleanup_if_necessary() | ||
time.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Theory, this sleep could be smaller.
7b489ee
to
c9d4bc7
Compare
if log_size > self._max_log_size: | ||
return [( | ||
None, log_size_exceeded_blurb(task, logtype, self._max_log_size) | ||
), ] | ||
# Note this is inefficient - we will load a 1GB log even if we only want last 100 bytes. | ||
# Doing this efficiently is a step change in complexity and effort - we can do it when justified in future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we've arrived at that future point where it's justified :)
22d7e5d
to
342c437
Compare
- Recreate the multiprocess pool at a regular cadence to avoid memory leaks - Since the pool was never removed it resulted in uncleared memory. - Add a log size constraint to the cache server to avoid memory leaks. - fix test too
342c437
to
5d11f05
Compare
Key changes
tail
based Log cache setting.